avwx.current.pirep
A PIREP (Pilot Report) is an observation made by pilots inflight meant to aid controllers and pilots routing around adverse conditions and other conditions of note. They typically contain icing, turbulence, cloud types/bases/tops, and other info at a known distance and radial from a ground station. They are released as they come in.
1""" 2A PIREP (Pilot Report) is an observation made by pilots inflight meant to aid 3controllers and pilots routing around adverse conditions and other conditions 4of note. They typically contain icing, turbulence, cloud types/bases/tops, and 5other info at a known distance and radial from a ground station. They are 6released as they come in. 7""" 8 9# stdlib 10from __future__ import annotations 11 12from contextlib import suppress 13from typing import TYPE_CHECKING 14 15# module 16from avwx import exceptions 17from avwx.current.base import Reports, get_wx_codes 18from avwx.parsing import core 19from avwx.parsing.sanitization.pirep import clean_pirep_string 20from avwx.service.scrape import NoaaScrapeList 21from avwx.static.core import CARDINALS, CLOUD_LIST 22from avwx.structs import ( 23 Aircraft, 24 Cloud, 25 Code, 26 Coord, 27 Icing, 28 Location, 29 Number, 30 PirepData, 31 Sanitization, 32 Timestamp, 33 Turbulence, 34 Units, 35) 36 37if TYPE_CHECKING: 38 from datetime import date 39 40 41class Pireps(Reports): 42 """ 43 The Pireps class offers an object-oriented approach to managing multiple 44 PIREP reports for a single station. 45 46 Below is typical usage for fetching and pulling PIREP data for KJFK. 47 48 ```python 49 >>> from avwx import Pireps 50 >>> kmco = Pireps("KMCO") 51 >>> kmco.station.name 52 'Orlando International Airport' 53 >>> kmco.update() 54 True 55 >>> kmco.last_updated 56 datetime.datetime(2019, 5, 24, 13, 31, 46, 561732, tzinfo=datetime.timezone.utc) 57 >>> kmco.raw[0] 58 'FLL UA /OV KFLL275015/TM 1241/FL020/TP B737/SK TOP020/RM DURD RY10L' 59 >>> kmco.data[0].location 60 Location(repr='KFLL275015', station='KFLL', direction=Number(repr='275', value=275, spoken='two seven five'), distance=Number(repr='015', value=15, spoken='one five')) 61 ``` 62 63 The `parse` and `from_report` methods can parse a report string if you want 64 to override the normal fetching process. 65 """ 66 67 data: list[PirepData | None] | None = None # type: ignore 68 sanitization: list[Sanitization | None] | None = None # type: ignore 69 70 def __init__(self, code: str | None = None, coord: Coord | None = None): 71 super().__init__(code, coord) 72 self.service = NoaaScrapeList("pirep") 73 74 @staticmethod 75 def _report_filter(reports: list[str]) -> list[str]: 76 """Remove AIREPs before updating raw_reports.""" 77 return [r for r in reports if not r.startswith("ARP")] 78 79 async def _post_update(self) -> None: 80 self.data, self.sanitization = [], [] 81 if self.raw is None: 82 return 83 for report in self.raw: 84 try: 85 data, sans = parse(report, issued=self.issued) 86 self.data.append(data) 87 self.sanitization.append(sans) 88 except Exception as exc: # noqa: BLE001 89 exceptions.exception_intercept(exc, raw=report) # type: ignore 90 91 def _post_parse(self) -> None: 92 self.data, self.sanitization = [], [] 93 if self.raw is None: 94 return 95 for report in self.raw: 96 data, sans = parse(report, issued=self.issued) 97 self.data.append(data) 98 self.sanitization.append(sans) 99 100 @staticmethod 101 def sanitize(report: str) -> str: 102 """Sanitize a PIREP string.""" 103 return sanitize(report)[0] 104 105 106_UNITS = Units.north_american() 107 108 109def _root(report: str) -> tuple[str | None, str | None]: 110 """Parse report root data including station and report type.""" 111 report_type = None 112 station = None 113 for item in report.split(): 114 if item in ("UA", "UUA"): 115 report_type = item 116 elif not station: 117 station = item 118 return station, report_type 119 120 121def _location(item: str) -> Location | None: 122 """Convert a location element to a Location object.""" 123 items = item.split() 124 for target in ("MILES", "OF"): 125 with suppress(ValueError): 126 items.remove(target) 127 if not items: 128 return None 129 station, direction, distance = None, None, None 130 direction_number, distance_number = None, None 131 if len(items) == 1: 132 ilen = len(item) 133 # MLB 134 if ilen < 5: 135 station = item 136 # MKK360002 or KLGA220015 137 elif ilen in {9, 10} and item[-6:].isdigit(): 138 station, direction, distance = item[:-6], item[-6:-3], item[-3:] 139 # 10 WGON 140 # 10 EAST 141 # 15 SW LRP 142 elif items[0].isdigit(): 143 if items[1] in CARDINALS: 144 distance, direction = items[0], items[1] 145 if len(items) == 3: 146 station = items[2] 147 else: 148 station, direction, distance = items[1][-3:], items[1][:-3], items[0] 149 # GON 270010 150 elif items[1].isdigit(): 151 station, direction, distance = items[0], items[1][:3], items[1][3:] 152 # Convert non-null elements 153 if direction: 154 direction_number = core.make_number(direction, literal=True) 155 if distance: 156 distance_number = core.make_number(distance) 157 return Location(item, station, direction_number, distance_number) 158 159 160def _time(item: str | None, target: date | None = None) -> Timestamp | None: 161 """Convert a time element to a Timestamp.""" 162 return core.make_timestamp(item, time_only=True, target_date=target) 163 164 165def _altitude(item: str) -> Number | str | None: 166 """Convert reporting altitude to a Number or string.""" 167 alt = core.make_number(item) if item.isdigit() else item 168 return alt or None 169 170 171def _aircraft(item: str) -> Aircraft | str: 172 """Return the Aircraft from the ICAO code.""" 173 try: 174 return Aircraft.from_icao(item) 175 except ValueError: 176 return item 177 178 179def _non_digit_cloud(cloud: str) -> tuple[str | None, str]: 180 """Return cloud type and altitude for non-digit TOPS BASES cloud elements.""" 181 # 5000FT 182 if cloud.endswith("FT"): 183 cloud = cloud[:-4] 184 if cloud.isdigit(): 185 return None, cloud 186 if "-" not in cloud: 187 return cloud[:3], cloud[3:] 188 # SCT030-035 189 parts = cloud.split("-") 190 return (None, parts[-1]) if parts[0].isdigit() else (parts[0][:3], parts[-1]) 191 192 193def _clouds(item: str) -> list[Cloud]: 194 """Convert cloud element to a list of Clouds.""" 195 clouds = item.replace(",", "").split() 196 # BASES 004 TOPS 016 197 # BASES SCT030 TOPS SCT058 198 if "BASES" in clouds and "TOPS" in clouds: 199 cloud_type = None 200 base = clouds[clouds.index("BASES") + 1] 201 top = clouds[clouds.index("TOPS") + 1] 202 if not base.isdigit(): 203 cloud_type, base = _non_digit_cloud(base) 204 if not top.isdigit(): 205 cloud_type, top = _non_digit_cloud(top) 206 return [Cloud(item, cloud_type, base=int(base), top=int(top))] 207 return [core.make_cloud(cloud) for cloud in clouds] 208 209 210def _number(item: str) -> Number | None: 211 """Convert an element to a Number.""" 212 value = item.strip("CF ") 213 return None if " " in value else core.make_number(value, item) 214 215 216def _separate_floor_ceiling(item: str) -> tuple[Number | None, Number | None]: 217 """Extract floor and ceiling numbers from hyphen string.""" 218 floor_str, ceiling_str = item.split("-") 219 floor = core.make_number(floor_str) 220 ceiling = core.make_number(ceiling_str) 221 if floor and ceiling and floor.value and ceiling.value and floor.value > ceiling.value: 222 return ceiling, floor 223 return floor, ceiling 224 225 226def _find_floor_ceiling( 227 items: list[str], 228) -> tuple[list[str], Number | None, Number | None]: 229 """Extract the floor and ceiling from item list.""" 230 floor: Number | None = None 231 ceiling: Number | None = None 232 233 for i, item in enumerate(items): 234 hloc = item.find("-") 235 # TRACE RIME 070-090 236 if hloc > -1 and item[:hloc].isdigit() and item[hloc + 1 :].isdigit(): 237 floor, ceiling = _separate_floor_ceiling(items.pop(i)) 238 break 239 # CONT LGT CHOP BLO 250 240 if item == "BLO": 241 altitude = items[i + 1] 242 if "-" in altitude: 243 floor, ceiling = _separate_floor_ceiling(altitude) 244 else: 245 ceiling = core.make_number(altitude) 246 items = items[:i] 247 break 248 # LGT RIME 025 249 if item.isdigit(): 250 num = core.make_number(item) 251 floor, ceiling = num, num 252 break 253 return items, floor, ceiling 254 255 256def _turbulence(item: str) -> Turbulence: 257 """Convert reported turbulence to a Turbulence object.""" 258 items, floor, ceiling = _find_floor_ceiling(item.split()) 259 return Turbulence( 260 severity=" ".join(items), 261 floor=floor, 262 ceiling=ceiling, 263 ) 264 265 266def _icing(item: str) -> Icing: 267 """Convert reported icing to an Icing object.""" 268 items, floor, ceiling = _find_floor_ceiling(item.split()) 269 severity = items.pop(0) if items else "" 270 return Icing( 271 severity=severity, 272 floor=floor, 273 ceiling=ceiling, 274 type=items[0] if items else None, 275 ) 276 277 278def _remarks(item: str) -> str: 279 """Returns the remarks. Reserved for later parsing""" 280 return item 281 282 283def _wx(report: str) -> tuple[list[Code], Number | None, list[str]]: 284 """Parse remaining weather elements.""" 285 other: list[str] = [] 286 flight_visibility = None 287 for item in report.split(): 288 if len(item) > 2 and item.startswith("FV"): 289 _, flight_visibility = core.get_visibility([item[2:]], _UNITS) 290 else: 291 other.append(item) 292 other, wx_codes = get_wx_codes(other) 293 return wx_codes, flight_visibility, other 294 295 296def _sanitize_report_list(data: list[str], sans: Sanitization) -> list[str]: 297 """Fix report elements based on neighbor values.""" 298 for i, item in reversed(list(enumerate(data))): 299 # Fix spaced cloud top Ex: BKN030 TOP045 BASE020 TOPS074 300 # But not BASES SCT014 TOPS SCT021 301 if ( 302 item.startswith("TOP") 303 and item != "TOPS" 304 and i > 0 305 and len(data[i - 1]) >= 6 306 and (data[i - 1][:3] in CLOUD_LIST or data[i - 1].startswith("BASE")) 307 ): 308 key = f"{data[i-1]} {item}" 309 data[i - 1] += f"-{data.pop(i)}" 310 sans.log(key, data[i - 1]) 311 # Fix separated clouds Ex: BASES OVC 049 TOPS 055 312 elif item in CLOUD_LIST and i + 1 < len(data) and data[i + 1].isdigit(): 313 data[i] = item + data.pop(i + 1) 314 sans.extra_spaces_found = True 315 deduped = core.dedupe(data, only_neighbors=True) 316 if len(data) != len(deduped): 317 sans.duplicates_found = True 318 return deduped 319 320 321def sanitize(report: str) -> tuple[str, Sanitization]: 322 """Return a sanitized report ready for parsing.""" 323 sans = Sanitization() 324 clean = clean_pirep_string(report, sans) 325 data = _sanitize_report_list(clean.split(), sans) 326 return " ".join(data), sans 327 328 329def parse(report: str, issued: date | None = None) -> tuple[PirepData | None, Sanitization | None]: 330 """Return a PirepData object based on the given report.""" 331 if not report: 332 return None, None 333 sanitized, sans = sanitize(report) 334 data = sanitized.split("/") 335 station, report_type = _root(data.pop(0).strip()) 336 time, location, altitude, aircraft = None, None, None, None 337 clouds, temperature, turbulence, other = None, None, None, None 338 icing, remarks, flight_visibility, wx_codes = None, None, None, None 339 for item in data: 340 if not item or len(item) < 2: 341 continue 342 tag = item[:2] 343 item = item[2:].strip() # noqa: PLW2901 344 if tag == "TM": 345 time = _time(item, issued) 346 elif tag == "OV": 347 location = _location(item) 348 elif tag == "FL": 349 altitude = _altitude(item) 350 elif tag == "TP": 351 aircraft = _aircraft(item) 352 elif tag == "SK": 353 clouds = _clouds(item) 354 elif tag == "TA": 355 temperature = _number(item) 356 elif tag == "TB": 357 turbulence = _turbulence(item) 358 elif tag == "IC": 359 icing = _icing(item) 360 elif tag == "RM": 361 remarks = _remarks(item) 362 elif tag == "WX": 363 wx_codes, flight_visibility, other = _wx(item) 364 return ( 365 PirepData( 366 aircraft=aircraft, 367 altitude=altitude, 368 clouds=clouds, 369 flight_visibility=flight_visibility, 370 icing=icing, 371 location=location, 372 other=other or [], 373 raw=report, 374 remarks=remarks, 375 sanitized=sanitized, 376 station=station, 377 temperature=temperature, 378 time=time, 379 turbulence=turbulence, 380 type=report_type, 381 wx_codes=wx_codes or [], 382 ), 383 sans, 384 )
42class Pireps(Reports): 43 """ 44 The Pireps class offers an object-oriented approach to managing multiple 45 PIREP reports for a single station. 46 47 Below is typical usage for fetching and pulling PIREP data for KJFK. 48 49 ```python 50 >>> from avwx import Pireps 51 >>> kmco = Pireps("KMCO") 52 >>> kmco.station.name 53 'Orlando International Airport' 54 >>> kmco.update() 55 True 56 >>> kmco.last_updated 57 datetime.datetime(2019, 5, 24, 13, 31, 46, 561732, tzinfo=datetime.timezone.utc) 58 >>> kmco.raw[0] 59 'FLL UA /OV KFLL275015/TM 1241/FL020/TP B737/SK TOP020/RM DURD RY10L' 60 >>> kmco.data[0].location 61 Location(repr='KFLL275015', station='KFLL', direction=Number(repr='275', value=275, spoken='two seven five'), distance=Number(repr='015', value=15, spoken='one five')) 62 ``` 63 64 The `parse` and `from_report` methods can parse a report string if you want 65 to override the normal fetching process. 66 """ 67 68 data: list[PirepData | None] | None = None # type: ignore 69 sanitization: list[Sanitization | None] | None = None # type: ignore 70 71 def __init__(self, code: str | None = None, coord: Coord | None = None): 72 super().__init__(code, coord) 73 self.service = NoaaScrapeList("pirep") 74 75 @staticmethod 76 def _report_filter(reports: list[str]) -> list[str]: 77 """Remove AIREPs before updating raw_reports.""" 78 return [r for r in reports if not r.startswith("ARP")] 79 80 async def _post_update(self) -> None: 81 self.data, self.sanitization = [], [] 82 if self.raw is None: 83 return 84 for report in self.raw: 85 try: 86 data, sans = parse(report, issued=self.issued) 87 self.data.append(data) 88 self.sanitization.append(sans) 89 except Exception as exc: # noqa: BLE001 90 exceptions.exception_intercept(exc, raw=report) # type: ignore 91 92 def _post_parse(self) -> None: 93 self.data, self.sanitization = [], [] 94 if self.raw is None: 95 return 96 for report in self.raw: 97 data, sans = parse(report, issued=self.issued) 98 self.data.append(data) 99 self.sanitization.append(sans) 100 101 @staticmethod 102 def sanitize(report: str) -> str: 103 """Sanitize a PIREP string.""" 104 return sanitize(report)[0]
The Pireps class offers an object-oriented approach to managing multiple PIREP reports for a single station.
Below is typical usage for fetching and pulling PIREP data for KJFK.
>>> from avwx import Pireps
>>> kmco = Pireps("KMCO")
>>> kmco.station.name
'Orlando International Airport'
>>> kmco.update()
True
>>> kmco.last_updated
datetime.datetime(2019, 5, 24, 13, 31, 46, 561732, tzinfo=datetime.timezone.utc)
>>> kmco.raw[0]
'FLL UA /OV KFLL275015/TM 1241/FL020/TP B737/SK TOP020/RM DURD RY10L'
>>> kmco.data[0].location
Location(repr='KFLL275015', station='KFLL', direction=Number(repr='275', value=275, spoken='two seven five'), distance=Number(repr='015', value=15, spoken='one five'))
The parse
and from_report
methods can parse a report string if you want
to override the normal fetching process.
Pireps(code: str | None = None, coord: avwx.structs.Coord | None = None)
322def sanitize(report: str) -> tuple[str, Sanitization]: 323 """Return a sanitized report ready for parsing.""" 324 sans = Sanitization() 325 clean = clean_pirep_string(report, sans) 326 data = _sanitize_report_list(clean.split(), sans) 327 return " ".join(data), sans
Return a sanitized report ready for parsing.
def
parse( report: str, issued: datetime.date | None = None) -> tuple[avwx.structs.PirepData | None, avwx.structs.Sanitization | None]:
330def parse(report: str, issued: date | None = None) -> tuple[PirepData | None, Sanitization | None]: 331 """Return a PirepData object based on the given report.""" 332 if not report: 333 return None, None 334 sanitized, sans = sanitize(report) 335 data = sanitized.split("/") 336 station, report_type = _root(data.pop(0).strip()) 337 time, location, altitude, aircraft = None, None, None, None 338 clouds, temperature, turbulence, other = None, None, None, None 339 icing, remarks, flight_visibility, wx_codes = None, None, None, None 340 for item in data: 341 if not item or len(item) < 2: 342 continue 343 tag = item[:2] 344 item = item[2:].strip() # noqa: PLW2901 345 if tag == "TM": 346 time = _time(item, issued) 347 elif tag == "OV": 348 location = _location(item) 349 elif tag == "FL": 350 altitude = _altitude(item) 351 elif tag == "TP": 352 aircraft = _aircraft(item) 353 elif tag == "SK": 354 clouds = _clouds(item) 355 elif tag == "TA": 356 temperature = _number(item) 357 elif tag == "TB": 358 turbulence = _turbulence(item) 359 elif tag == "IC": 360 icing = _icing(item) 361 elif tag == "RM": 362 remarks = _remarks(item) 363 elif tag == "WX": 364 wx_codes, flight_visibility, other = _wx(item) 365 return ( 366 PirepData( 367 aircraft=aircraft, 368 altitude=altitude, 369 clouds=clouds, 370 flight_visibility=flight_visibility, 371 icing=icing, 372 location=location, 373 other=other or [], 374 raw=report, 375 remarks=remarks, 376 sanitized=sanitized, 377 station=station, 378 temperature=temperature, 379 time=time, 380 turbulence=turbulence, 381 type=report_type, 382 wx_codes=wx_codes or [], 383 ), 384 sans, 385 )
Return a PirepData object based on the given report.