avwx.current.pirep
A PIREP (Pilot Report) is an observation made by pilots inflight meant to aid controllers and pilots routing around adverse conditions and other conditions of note. They typically contain icing, turbulence, cloud types/bases/tops, and other info at a known distance and radial from a ground station. They are released as they come in.
1""" 2A PIREP (Pilot Report) is an observation made by pilots inflight meant to aid 3controllers and pilots routing around adverse conditions and other conditions 4of note. They typically contain icing, turbulence, cloud types/bases/tops, and 5other info at a known distance and radial from a ground station. They are 6released as they come in. 7""" 8 9# pylint: disable=too-many-boolean-expressions 10 11# stdlib 12from contextlib import suppress 13from datetime import date 14from typing import List, Optional, Tuple, Union 15 16# module 17from avwx import exceptions 18from avwx.current.base import Reports, get_wx_codes 19from avwx.parsing import core 20from avwx.parsing.sanitization.pirep import clean_pirep_string 21from avwx.service.scrape import NOAA_ScrapeList 22from avwx.static.core import CARDINALS, CLOUD_LIST 23from avwx.structs import ( 24 Aircraft, 25 Cloud, 26 Code, 27 Coord, 28 Icing, 29 Location, 30 Number, 31 PirepData, 32 Sanitization, 33 Timestamp, 34 Turbulence, 35 Units, 36) 37 38 39class Pireps(Reports): 40 """ 41 The Pireps class offers an object-oriented approach to managing multiple 42 PIREP reports for a single station. 43 44 Below is typical usage for fetching and pulling PIREP data for KJFK. 45 46 ```python 47 >>> from avwx import Pireps 48 >>> kmco = Pireps("KMCO") 49 >>> kmco.station.name 50 'Orlando International Airport' 51 >>> kmco.update() 52 True 53 >>> kmco.last_updated 54 datetime.datetime(2019, 5, 24, 13, 31, 46, 561732, tzinfo=datetime.timezone.utc) 55 >>> kmco.raw[0] 56 'FLL UA /OV KFLL275015/TM 1241/FL020/TP B737/SK TOP020/RM DURD RY10L' 57 >>> kmco.data[0].location 58 Location(repr='KFLL275015', station='KFLL', direction=Number(repr='275', value=275, spoken='two seven five'), distance=Number(repr='015', value=15, spoken='one five')) 59 ``` 60 61 The `parse` and `from_report` methods can parse a report string if you want 62 to override the normal fetching process. 63 """ 64 65 data: Optional[List[Optional[PirepData]]] = None # type: ignore 66 sanitization: Optional[List[Optional[Sanitization]]] = None # type: ignore 67 68 def __init__(self, code: Optional[str] = None, coord: Optional[Coord] = None): 69 super().__init__(code, coord) 70 self.service = NOAA_ScrapeList("pirep") 71 72 @staticmethod 73 def _report_filter(reports: List[str]) -> List[str]: 74 """Removes AIREPs before updating raw_reports""" 75 return [r for r in reports if not r.startswith("ARP")] 76 77 async def _post_update(self) -> None: 78 self.data, self.sanitization = [], [] 79 if self.raw is None: 80 return 81 for report in self.raw: 82 try: 83 data, sans = parse(report, issued=self.issued) 84 self.data.append(data) 85 self.sanitization.append(sans) 86 except Exception as exc: # pylint: disable=broad-except 87 exceptions.exception_intercept(exc, raw=report) # type: ignore 88 89 def _post_parse(self) -> None: 90 self.data, self.sanitization = [], [] 91 if self.raw is None: 92 return 93 for report in self.raw: 94 data, sans = parse(report, issued=self.issued) 95 self.data.append(data) 96 self.sanitization.append(sans) 97 98 @staticmethod 99 def sanitize(report: str) -> str: 100 """Sanitizes a PIREP string""" 101 return sanitize(report)[0] 102 103 104_UNITS = Units.north_american() 105 106 107def _root(item: str) -> Tuple[Optional[str], Optional[str]]: 108 """Parses report root data including station and report type""" 109 # pylint: disable=redefined-argument-from-local 110 report_type = None 111 station = None 112 for item in item.split(): 113 if item in ("UA", "UUA"): 114 report_type = item 115 elif not station: 116 station = item 117 return station, report_type 118 119 120def _location(item: str) -> Optional[Location]: 121 """Convert a location element to a Location object""" 122 items = item.split() 123 for target in ("MILES", "OF"): 124 with suppress(ValueError): 125 items.remove(target) 126 if not items: 127 return None 128 station, direction, distance = None, None, None 129 direction_number, distance_number = None, None 130 if len(items) == 1: 131 ilen = len(item) 132 # MLB 133 if ilen < 5: 134 station = item 135 # MKK360002 or KLGA220015 136 elif ilen in {9, 10} and item[-6:].isdigit(): 137 station, direction, distance = item[:-6], item[-6:-3], item[-3:] 138 # 10 WGON 139 # 10 EAST 140 # 15 SW LRP 141 elif items[0].isdigit(): 142 if items[1] in CARDINALS: 143 distance, direction = items[0], items[1] 144 if len(items) == 3: 145 station = items[2] 146 else: 147 station, direction, distance = items[1][-3:], items[1][:-3], items[0] 148 # GON 270010 149 elif items[1].isdigit(): 150 station, direction, distance = items[0], items[1][:3], items[1][3:] 151 # Convert non-null elements 152 if direction: 153 direction_number = core.make_number(direction, literal=True) 154 if distance: 155 distance_number = core.make_number(distance) 156 return Location(item, station, direction_number, distance_number) 157 158 159def _time(item: str, target: Optional[date] = None) -> Optional[Timestamp]: 160 """Convert a time element to a Timestamp""" 161 return core.make_timestamp(item, time_only=True, target_date=target) 162 163 164def _altitude(item: str) -> Union[Number, str, None]: 165 """Convert reporting altitude to a Number or string""" 166 alt = core.make_number(item) if item.isdigit() else item 167 return alt or None 168 169 170def _aircraft(item: str) -> Union[Aircraft, str]: 171 """Returns the Aircraft from the ICAO code""" 172 try: 173 return Aircraft.from_icao(item) 174 except ValueError: 175 return item 176 177 178def _non_digit_cloud(cloud: str) -> Tuple[Optional[str], str]: 179 """Returns cloud type and altitude for non-digit TOPS BASES cloud elements""" 180 # 5000FT 181 if cloud.endswith("FT"): 182 cloud = cloud[:-4] 183 if cloud.isdigit(): 184 return None, cloud 185 if "-" not in cloud: 186 return cloud[:3], cloud[3:] 187 # SCT030-035 188 parts = cloud.split("-") 189 return (None, parts[-1]) if parts[0].isdigit() else (parts[0][:3], parts[-1]) 190 191 192def _clouds(item: str) -> List[Cloud]: 193 """Convert cloud element to a list of Clouds""" 194 clouds = item.replace(",", "").split() 195 # BASES 004 TOPS 016 196 # BASES SCT030 TOPS SCT058 197 if "BASES" in clouds and "TOPS" in clouds: 198 cloud_type = None 199 base = clouds[clouds.index("BASES") + 1] 200 top = clouds[clouds.index("TOPS") + 1] 201 if not base.isdigit(): 202 cloud_type, base = _non_digit_cloud(base) 203 if not top.isdigit(): 204 cloud_type, top = _non_digit_cloud(top) 205 return [Cloud(item, cloud_type, base=int(base), top=int(top))] 206 return [core.make_cloud(cloud) for cloud in clouds] 207 208 209def _number(item: str) -> Optional[Number]: 210 """Convert an element to a Number""" 211 value = item.strip("CF ") 212 return None if " " in value else core.make_number(value, item) 213 214 215def _separate_floor_ceiling(item: str) -> Tuple[Optional[Number], Optional[Number]]: 216 """Extract floor and ceiling numbers from hyphen string""" 217 floor_str, ceiling_str = item.split("-") 218 floor = core.make_number(floor_str) 219 ceiling = core.make_number(ceiling_str) 220 if ( 221 floor 222 and ceiling 223 and floor.value 224 and ceiling.value 225 and floor.value > ceiling.value 226 ): 227 return ceiling, floor 228 return floor, ceiling 229 230 231def _find_floor_ceiling( 232 items: List[str], 233) -> Tuple[List[str], Optional[Number], Optional[Number]]: 234 """Extracts the floor and ceiling from item list""" 235 floor: Optional[Number] = None 236 ceiling: Optional[Number] = None 237 238 for i, item in enumerate(items): 239 hloc = item.find("-") 240 # TRACE RIME 070-090 241 if hloc > -1 and item[:hloc].isdigit() and item[hloc + 1 :].isdigit(): 242 floor, ceiling = _separate_floor_ceiling(items.pop(i)) 243 break 244 # CONT LGT CHOP BLO 250 245 if item == "BLO": 246 altitude = items[i + 1] 247 if "-" in altitude: 248 floor, ceiling = _separate_floor_ceiling(altitude) 249 else: 250 ceiling = core.make_number(altitude) 251 items = items[:i] 252 break 253 # LGT RIME 025 254 if item.isdigit(): 255 num = core.make_number(item) 256 floor, ceiling = num, num 257 break 258 return items, floor, ceiling 259 260 261def _turbulence(item: str) -> Turbulence: 262 """Convert reported turbulence to a Turbulence object""" 263 items, floor, ceiling = _find_floor_ceiling(item.split()) 264 return Turbulence( 265 severity=" ".join(items), 266 floor=floor, 267 ceiling=ceiling, 268 ) 269 270 271def _icing(item: str) -> Icing: 272 """Convert reported icing to an Icing object""" 273 items, floor, ceiling = _find_floor_ceiling(item.split()) 274 severity = items.pop(0) if items else "" 275 return Icing( 276 severity=severity, 277 floor=floor, 278 ceiling=ceiling, 279 type=items[0] if items else None, 280 ) 281 282 283def _remarks(item: str) -> str: 284 """Returns the remarks. Reserved for later parsing""" 285 return item 286 287 288def _wx(item: str) -> Tuple[List[Code], Optional[Number], List[str]]: 289 """Parses remaining weather elements""" 290 # pylint: disable=redefined-argument-from-local 291 other: List[str] = [] 292 flight_visibility = None 293 for item in item.split(): 294 if len(item) > 2 and item.startswith("FV"): 295 _, flight_visibility = core.get_visibility([item[2:]], _UNITS) 296 else: 297 other.append(item) 298 other, wx_codes = get_wx_codes(other) 299 return wx_codes, flight_visibility, other 300 301 302def _sanitize_report_list(data: List[str], sans: Sanitization) -> List[str]: 303 """Fixes report elements based on neighbor values""" 304 for i, item in reversed(list(enumerate(data))): 305 # Fix spaced cloud top Ex: BKN030 TOP045 BASE020 TOPS074 306 # But not BASES SCT014 TOPS SCT021 307 if ( 308 item.startswith("TOP") 309 and item != "TOPS" 310 and i > 0 311 and len(data[i - 1]) >= 6 312 and (data[i - 1][:3] in CLOUD_LIST or data[i - 1].startswith("BASE")) 313 ): 314 key = f"{data[i-1]} {item}" 315 data[i - 1] += f"-{data.pop(i)}" 316 sans.log(key, data[i - 1]) 317 # Fix separated clouds Ex: BASES OVC 049 TOPS 055 318 elif item in CLOUD_LIST and i + 1 < len(data) and data[i + 1].isdigit(): 319 data[i] = item + data.pop(i + 1) 320 sans.extra_spaces_found = True 321 deduped = core.dedupe(data, only_neighbors=True) 322 if len(data) != len(deduped): 323 sans.duplicates_found = True 324 return deduped 325 326 327def sanitize(report: str) -> Tuple[str, Sanitization]: 328 """Returns a sanitized report ready for parsing""" 329 sans = Sanitization() 330 clean = clean_pirep_string(report, sans) 331 data = _sanitize_report_list(clean.split(), sans) 332 return " ".join(data), sans 333 334 335def parse( 336 report: str, issued: Optional[date] = None 337) -> Tuple[Optional[PirepData], Optional[Sanitization]]: 338 """Returns a PirepData object based on the given report""" 339 # pylint: disable=too-many-locals,too-many-branches 340 if not report: 341 return None, None 342 sanitized, sans = sanitize(report) 343 data = sanitized.split("/") 344 station, report_type = _root(data.pop(0).strip()) 345 time, location, altitude, aircraft = None, None, None, None 346 clouds, temperature, turbulence, other = None, None, None, None 347 icing, remarks, flight_visibility, wx_codes = None, None, None, None 348 for item in data: 349 if not item or len(item) < 2: 350 continue 351 tag = item[:2] 352 item = item[2:].strip() 353 if tag == "TM": 354 time = _time(item, issued) 355 elif tag == "OV": 356 location = _location(item) 357 elif tag == "FL": 358 altitude = _altitude(item) 359 elif tag == "TP": 360 aircraft = _aircraft(item) 361 elif tag == "SK": 362 clouds = _clouds(item) 363 elif tag == "TA": 364 temperature = _number(item) 365 elif tag == "TB": 366 turbulence = _turbulence(item) 367 elif tag == "IC": 368 icing = _icing(item) 369 elif tag == "RM": 370 remarks = _remarks(item) 371 elif tag == "WX": 372 wx_codes, flight_visibility, other = _wx(item) 373 return ( 374 PirepData( 375 aircraft=aircraft, 376 altitude=altitude, 377 clouds=clouds, 378 flight_visibility=flight_visibility, 379 icing=icing, 380 location=location, 381 other=other or [], 382 raw=report, 383 remarks=remarks, 384 sanitized=sanitized, 385 station=station, 386 temperature=temperature, 387 time=time, 388 turbulence=turbulence, 389 type=report_type, 390 wx_codes=wx_codes or [], 391 ), 392 sans, 393 )
40class Pireps(Reports): 41 """ 42 The Pireps class offers an object-oriented approach to managing multiple 43 PIREP reports for a single station. 44 45 Below is typical usage for fetching and pulling PIREP data for KJFK. 46 47 ```python 48 >>> from avwx import Pireps 49 >>> kmco = Pireps("KMCO") 50 >>> kmco.station.name 51 'Orlando International Airport' 52 >>> kmco.update() 53 True 54 >>> kmco.last_updated 55 datetime.datetime(2019, 5, 24, 13, 31, 46, 561732, tzinfo=datetime.timezone.utc) 56 >>> kmco.raw[0] 57 'FLL UA /OV KFLL275015/TM 1241/FL020/TP B737/SK TOP020/RM DURD RY10L' 58 >>> kmco.data[0].location 59 Location(repr='KFLL275015', station='KFLL', direction=Number(repr='275', value=275, spoken='two seven five'), distance=Number(repr='015', value=15, spoken='one five')) 60 ``` 61 62 The `parse` and `from_report` methods can parse a report string if you want 63 to override the normal fetching process. 64 """ 65 66 data: Optional[List[Optional[PirepData]]] = None # type: ignore 67 sanitization: Optional[List[Optional[Sanitization]]] = None # type: ignore 68 69 def __init__(self, code: Optional[str] = None, coord: Optional[Coord] = None): 70 super().__init__(code, coord) 71 self.service = NOAA_ScrapeList("pirep") 72 73 @staticmethod 74 def _report_filter(reports: List[str]) -> List[str]: 75 """Removes AIREPs before updating raw_reports""" 76 return [r for r in reports if not r.startswith("ARP")] 77 78 async def _post_update(self) -> None: 79 self.data, self.sanitization = [], [] 80 if self.raw is None: 81 return 82 for report in self.raw: 83 try: 84 data, sans = parse(report, issued=self.issued) 85 self.data.append(data) 86 self.sanitization.append(sans) 87 except Exception as exc: # pylint: disable=broad-except 88 exceptions.exception_intercept(exc, raw=report) # type: ignore 89 90 def _post_parse(self) -> None: 91 self.data, self.sanitization = [], [] 92 if self.raw is None: 93 return 94 for report in self.raw: 95 data, sans = parse(report, issued=self.issued) 96 self.data.append(data) 97 self.sanitization.append(sans) 98 99 @staticmethod 100 def sanitize(report: str) -> str: 101 """Sanitizes a PIREP string""" 102 return sanitize(report)[0]
The Pireps class offers an object-oriented approach to managing multiple PIREP reports for a single station.
Below is typical usage for fetching and pulling PIREP data for KJFK.
>>> from avwx import Pireps
>>> kmco = Pireps("KMCO")
>>> kmco.station.name
'Orlando International Airport'
>>> kmco.update()
True
>>> kmco.last_updated
datetime.datetime(2019, 5, 24, 13, 31, 46, 561732, tzinfo=datetime.timezone.utc)
>>> kmco.raw[0]
'FLL UA /OV KFLL275015/TM 1241/FL020/TP B737/SK TOP020/RM DURD RY10L'
>>> kmco.data[0].location
Location(repr='KFLL275015', station='KFLL', direction=Number(repr='275', value=275, spoken='two seven five'), distance=Number(repr='015', value=15, spoken='one five'))
The parse
and from_report
methods can parse a report string if you want
to override the normal fetching process.
Pireps( code: Optional[str] = None, coord: Optional[avwx.structs.Coord] = None)
328def sanitize(report: str) -> Tuple[str, Sanitization]: 329 """Returns a sanitized report ready for parsing""" 330 sans = Sanitization() 331 clean = clean_pirep_string(report, sans) 332 data = _sanitize_report_list(clean.split(), sans) 333 return " ".join(data), sans
Returns a sanitized report ready for parsing
def
parse( report: str, issued: Optional[datetime.date] = None) -> Tuple[Optional[avwx.structs.PirepData], Optional[avwx.structs.Sanitization]]:
336def parse( 337 report: str, issued: Optional[date] = None 338) -> Tuple[Optional[PirepData], Optional[Sanitization]]: 339 """Returns a PirepData object based on the given report""" 340 # pylint: disable=too-many-locals,too-many-branches 341 if not report: 342 return None, None 343 sanitized, sans = sanitize(report) 344 data = sanitized.split("/") 345 station, report_type = _root(data.pop(0).strip()) 346 time, location, altitude, aircraft = None, None, None, None 347 clouds, temperature, turbulence, other = None, None, None, None 348 icing, remarks, flight_visibility, wx_codes = None, None, None, None 349 for item in data: 350 if not item or len(item) < 2: 351 continue 352 tag = item[:2] 353 item = item[2:].strip() 354 if tag == "TM": 355 time = _time(item, issued) 356 elif tag == "OV": 357 location = _location(item) 358 elif tag == "FL": 359 altitude = _altitude(item) 360 elif tag == "TP": 361 aircraft = _aircraft(item) 362 elif tag == "SK": 363 clouds = _clouds(item) 364 elif tag == "TA": 365 temperature = _number(item) 366 elif tag == "TB": 367 turbulence = _turbulence(item) 368 elif tag == "IC": 369 icing = _icing(item) 370 elif tag == "RM": 371 remarks = _remarks(item) 372 elif tag == "WX": 373 wx_codes, flight_visibility, other = _wx(item) 374 return ( 375 PirepData( 376 aircraft=aircraft, 377 altitude=altitude, 378 clouds=clouds, 379 flight_visibility=flight_visibility, 380 icing=icing, 381 location=location, 382 other=other or [], 383 raw=report, 384 remarks=remarks, 385 sanitized=sanitized, 386 station=station, 387 temperature=temperature, 388 time=time, 389 turbulence=turbulence, 390 type=report_type, 391 wx_codes=wx_codes or [], 392 ), 393 sans, 394 )
Returns a PirepData object based on the given report