avwx.current.pirep

A PIREP (Pilot Report) is an observation made by pilots inflight meant to aid controllers and pilots routing around adverse conditions and other conditions of note. They typically contain icing, turbulence, cloud types/bases/tops, and other info at a known distance and radial from a ground station. They are released as they come in.

  1"""
  2A PIREP (Pilot Report) is an observation made by pilots inflight meant to aid
  3controllers and pilots routing around adverse conditions and other conditions
  4of note. They typically contain icing, turbulence, cloud types/bases/tops, and
  5other info at a known distance and radial from a ground station. They are
  6released as they come in.
  7"""
  8
  9# stdlib
 10from __future__ import annotations
 11
 12from contextlib import suppress
 13from typing import TYPE_CHECKING
 14
 15# module
 16from avwx import exceptions
 17from avwx.current.base import Reports, get_wx_codes
 18from avwx.parsing import core
 19from avwx.parsing.sanitization.pirep import clean_pirep_string
 20from avwx.service.scrape import NoaaScrapeList
 21from avwx.static.core import CARDINALS, CLOUD_LIST
 22from avwx.structs import (
 23    Aircraft,
 24    Cloud,
 25    Code,
 26    Coord,
 27    Icing,
 28    Location,
 29    Number,
 30    PirepData,
 31    Sanitization,
 32    Timestamp,
 33    Turbulence,
 34    Units,
 35)
 36
 37if TYPE_CHECKING:
 38    from datetime import date
 39
 40
 41class Pireps(Reports):
 42    """
 43    The Pireps class offers an object-oriented approach to managing multiple
 44    PIREP reports for a single station.
 45
 46    Below is typical usage for fetching and pulling PIREP data for KJFK.
 47
 48    ```python
 49    >>> from avwx import Pireps
 50    >>> kmco = Pireps("KMCO")
 51    >>> kmco.station.name
 52    'Orlando International Airport'
 53    >>> kmco.update()
 54    True
 55    >>> kmco.last_updated
 56    datetime.datetime(2019, 5, 24, 13, 31, 46, 561732, tzinfo=datetime.timezone.utc)
 57    >>> kmco.raw[0]
 58    'FLL UA /OV KFLL275015/TM 1241/FL020/TP B737/SK TOP020/RM DURD RY10L'
 59    >>> kmco.data[0].location
 60    Location(repr='KFLL275015', station='KFLL', direction=Number(repr='275', value=275, spoken='two seven five'), distance=Number(repr='015', value=15, spoken='one five'))
 61    ```
 62
 63    The `parse` and `from_report` methods can parse a report string if you want
 64    to override the normal fetching process.
 65    """
 66
 67    data: list[PirepData | None] | None = None  # type: ignore
 68    sanitization: list[Sanitization | None] | None = None  # type: ignore
 69
 70    def __init__(self, code: str | None = None, coord: Coord | None = None):
 71        super().__init__(code, coord)
 72        self.service = NoaaScrapeList("pirep")
 73
 74    @staticmethod
 75    def _report_filter(reports: list[str]) -> list[str]:
 76        """Remove AIREPs before updating raw_reports."""
 77        return [r for r in reports if not r.startswith("ARP")]
 78
 79    async def _post_update(self) -> None:
 80        self.data, self.sanitization = [], []
 81        if self.raw is None:
 82            return
 83        for report in self.raw:
 84            try:
 85                data, sans = parse(report, issued=self.issued)
 86                self.data.append(data)
 87                self.sanitization.append(sans)
 88            except Exception as exc:  # noqa: BLE001
 89                exceptions.exception_intercept(exc, raw=report)  # type: ignore
 90
 91    def _post_parse(self) -> None:
 92        self.data, self.sanitization = [], []
 93        if self.raw is None:
 94            return
 95        for report in self.raw:
 96            data, sans = parse(report, issued=self.issued)
 97            self.data.append(data)
 98            self.sanitization.append(sans)
 99
100    @staticmethod
101    def sanitize(report: str) -> str:
102        """Sanitize a PIREP string."""
103        return sanitize(report)[0]
104
105
106_UNITS = Units.north_american()
107
108
109def _root(report: str) -> tuple[str | None, str | None]:
110    """Parse report root data including station and report type."""
111    report_type = None
112    station = None
113    for item in report.split():
114        if item in ("UA", "UUA"):
115            report_type = item
116        elif not station:
117            station = item
118    return station, report_type
119
120
121def _location(item: str) -> Location | None:
122    """Convert a location element to a Location object."""
123    items = item.split()
124    for target in ("MILES", "OF"):
125        with suppress(ValueError):
126            items.remove(target)
127    if not items:
128        return None
129    station, direction, distance = None, None, None
130    direction_number, distance_number = None, None
131    if len(items) == 1:
132        ilen = len(item)
133        # MLB
134        if ilen < 5:
135            station = item
136        # MKK360002 or KLGA220015
137        elif ilen in {9, 10} and item[-6:].isdigit():
138            station, direction, distance = item[:-6], item[-6:-3], item[-3:]
139    # 10 WGON
140    # 10 EAST
141    # 15 SW LRP
142    elif items[0].isdigit():
143        if items[1] in CARDINALS:
144            distance, direction = items[0], items[1]
145            if len(items) == 3:
146                station = items[2]
147        else:
148            station, direction, distance = items[1][-3:], items[1][:-3], items[0]
149    # GON 270010
150    elif items[1].isdigit():
151        station, direction, distance = items[0], items[1][:3], items[1][3:]
152    # Convert non-null elements
153    if direction:
154        direction_number = core.make_number(direction, literal=True)
155    if distance:
156        distance_number = core.make_number(distance)
157    return Location(item, station, direction_number, distance_number)
158
159
160def _time(item: str | None, target: date | None = None) -> Timestamp | None:
161    """Convert a time element to a Timestamp."""
162    return core.make_timestamp(item, time_only=True, target_date=target)
163
164
165def _altitude(item: str) -> Number | str | None:
166    """Convert reporting altitude to a Number or string."""
167    alt = core.make_number(item) if item.isdigit() else item
168    return alt or None
169
170
171def _aircraft(item: str) -> Aircraft | str:
172    """Return the Aircraft from the ICAO code."""
173    try:
174        return Aircraft.from_icao(item)
175    except ValueError:
176        return item
177
178
179def _non_digit_cloud(cloud: str) -> tuple[str | None, str]:
180    """Return cloud type and altitude for non-digit TOPS BASES cloud elements."""
181    # 5000FT
182    if cloud.endswith("FT"):
183        cloud = cloud[:-4]
184        if cloud.isdigit():
185            return None, cloud
186    if "-" not in cloud:
187        return cloud[:3], cloud[3:]
188    # SCT030-035
189    parts = cloud.split("-")
190    return (None, parts[-1]) if parts[0].isdigit() else (parts[0][:3], parts[-1])
191
192
193def _clouds(item: str) -> list[Cloud]:
194    """Convert cloud element to a list of Clouds."""
195    clouds = item.replace(",", "").split()
196    # BASES 004 TOPS 016
197    # BASES SCT030 TOPS SCT058
198    if "BASES" in clouds and "TOPS" in clouds:
199        cloud_type = None
200        base = clouds[clouds.index("BASES") + 1]
201        top = clouds[clouds.index("TOPS") + 1]
202        if not base.isdigit():
203            cloud_type, base = _non_digit_cloud(base)
204        if not top.isdigit():
205            cloud_type, top = _non_digit_cloud(top)
206        return [Cloud(item, cloud_type, base=int(base), top=int(top))]
207    return [core.make_cloud(cloud) for cloud in clouds]
208
209
210def _number(item: str) -> Number | None:
211    """Convert an element to a Number."""
212    value = item.strip("CF ")
213    return None if " " in value else core.make_number(value, item)
214
215
216def _separate_floor_ceiling(item: str) -> tuple[Number | None, Number | None]:
217    """Extract floor and ceiling numbers from hyphen string."""
218    floor_str, ceiling_str = item.split("-")
219    floor = core.make_number(floor_str)
220    ceiling = core.make_number(ceiling_str)
221    if floor and ceiling and floor.value and ceiling.value and floor.value > ceiling.value:
222        return ceiling, floor
223    return floor, ceiling
224
225
226def _find_floor_ceiling(
227    items: list[str],
228) -> tuple[list[str], Number | None, Number | None]:
229    """Extract the floor and ceiling from item list."""
230    floor: Number | None = None
231    ceiling: Number | None = None
232
233    for i, item in enumerate(items):
234        hloc = item.find("-")
235        # TRACE RIME 070-090
236        if hloc > -1 and item[:hloc].isdigit() and item[hloc + 1 :].isdigit():
237            floor, ceiling = _separate_floor_ceiling(items.pop(i))
238            break
239        # CONT LGT CHOP BLO 250
240        if item == "BLO":
241            altitude = items[i + 1]
242            if "-" in altitude:
243                floor, ceiling = _separate_floor_ceiling(altitude)
244            else:
245                ceiling = core.make_number(altitude)
246            items = items[:i]
247            break
248        # LGT RIME 025
249        if item.isdigit():
250            num = core.make_number(item)
251            floor, ceiling = num, num
252            break
253    return items, floor, ceiling
254
255
256def _turbulence(item: str) -> Turbulence:
257    """Convert reported turbulence to a Turbulence object."""
258    items, floor, ceiling = _find_floor_ceiling(item.split())
259    return Turbulence(
260        severity=" ".join(items),
261        floor=floor,
262        ceiling=ceiling,
263    )
264
265
266def _icing(item: str) -> Icing:
267    """Convert reported icing to an Icing object."""
268    items, floor, ceiling = _find_floor_ceiling(item.split())
269    severity = items.pop(0) if items else ""
270    return Icing(
271        severity=severity,
272        floor=floor,
273        ceiling=ceiling,
274        type=items[0] if items else None,
275    )
276
277
278def _remarks(item: str) -> str:
279    """Returns the remarks. Reserved for later parsing"""
280    return item
281
282
283def _wx(report: str) -> tuple[list[Code], Number | None, list[str]]:
284    """Parse remaining weather elements."""
285    other: list[str] = []
286    flight_visibility = None
287    for item in report.split():
288        if len(item) > 2 and item.startswith("FV"):
289            _, flight_visibility = core.get_visibility([item[2:]], _UNITS)
290        else:
291            other.append(item)
292    other, wx_codes = get_wx_codes(other)
293    return wx_codes, flight_visibility, other
294
295
296def _sanitize_report_list(data: list[str], sans: Sanitization) -> list[str]:
297    """Fix report elements based on neighbor values."""
298    for i, item in reversed(list(enumerate(data))):
299        # Fix spaced cloud top Ex: BKN030 TOP045   BASE020 TOPS074
300        # But not BASES SCT014 TOPS SCT021
301        if (
302            item.startswith("TOP")
303            and item != "TOPS"
304            and i > 0
305            and len(data[i - 1]) >= 6
306            and (data[i - 1][:3] in CLOUD_LIST or data[i - 1].startswith("BASE"))
307        ):
308            key = f"{data[i-1]} {item}"
309            data[i - 1] += f"-{data.pop(i)}"
310            sans.log(key, data[i - 1])
311        # Fix separated clouds Ex: BASES OVC 049 TOPS 055
312        elif item in CLOUD_LIST and i + 1 < len(data) and data[i + 1].isdigit():
313            data[i] = item + data.pop(i + 1)
314            sans.extra_spaces_found = True
315    deduped = core.dedupe(data, only_neighbors=True)
316    if len(data) != len(deduped):
317        sans.duplicates_found = True
318    return deduped
319
320
321def sanitize(report: str) -> tuple[str, Sanitization]:
322    """Return a sanitized report ready for parsing."""
323    sans = Sanitization()
324    clean = clean_pirep_string(report, sans)
325    data = _sanitize_report_list(clean.split(), sans)
326    return " ".join(data), sans
327
328
329def parse(report: str, issued: date | None = None) -> tuple[PirepData | None, Sanitization | None]:
330    """Return a PirepData object based on the given report."""
331    if not report:
332        return None, None
333    sanitized, sans = sanitize(report)
334    data = sanitized.split("/")
335    station, report_type = _root(data.pop(0).strip())
336    time, location, altitude, aircraft = None, None, None, None
337    clouds, temperature, turbulence, other = None, None, None, None
338    icing, remarks, flight_visibility, wx_codes = None, None, None, None
339    for item in data:
340        if not item or len(item) < 2:
341            continue
342        tag = item[:2]
343        item = item[2:].strip()  # noqa: PLW2901
344        if tag == "TM":
345            time = _time(item, issued)
346        elif tag == "OV":
347            location = _location(item)
348        elif tag == "FL":
349            altitude = _altitude(item)
350        elif tag == "TP":
351            aircraft = _aircraft(item)
352        elif tag == "SK":
353            clouds = _clouds(item)
354        elif tag == "TA":
355            temperature = _number(item)
356        elif tag == "TB":
357            turbulence = _turbulence(item)
358        elif tag == "IC":
359            icing = _icing(item)
360        elif tag == "RM":
361            remarks = _remarks(item)
362        elif tag == "WX":
363            wx_codes, flight_visibility, other = _wx(item)
364    return (
365        PirepData(
366            aircraft=aircraft,
367            altitude=altitude,
368            clouds=clouds,
369            flight_visibility=flight_visibility,
370            icing=icing,
371            location=location,
372            other=other or [],
373            raw=report,
374            remarks=remarks,
375            sanitized=sanitized,
376            station=station,
377            temperature=temperature,
378            time=time,
379            turbulence=turbulence,
380            type=report_type,
381            wx_codes=wx_codes or [],
382        ),
383        sans,
384    )
class Pireps(avwx.current.base.Reports):
 42class Pireps(Reports):
 43    """
 44    The Pireps class offers an object-oriented approach to managing multiple
 45    PIREP reports for a single station.
 46
 47    Below is typical usage for fetching and pulling PIREP data for KJFK.
 48
 49    ```python
 50    >>> from avwx import Pireps
 51    >>> kmco = Pireps("KMCO")
 52    >>> kmco.station.name
 53    'Orlando International Airport'
 54    >>> kmco.update()
 55    True
 56    >>> kmco.last_updated
 57    datetime.datetime(2019, 5, 24, 13, 31, 46, 561732, tzinfo=datetime.timezone.utc)
 58    >>> kmco.raw[0]
 59    'FLL UA /OV KFLL275015/TM 1241/FL020/TP B737/SK TOP020/RM DURD RY10L'
 60    >>> kmco.data[0].location
 61    Location(repr='KFLL275015', station='KFLL', direction=Number(repr='275', value=275, spoken='two seven five'), distance=Number(repr='015', value=15, spoken='one five'))
 62    ```
 63
 64    The `parse` and `from_report` methods can parse a report string if you want
 65    to override the normal fetching process.
 66    """
 67
 68    data: list[PirepData | None] | None = None  # type: ignore
 69    sanitization: list[Sanitization | None] | None = None  # type: ignore
 70
 71    def __init__(self, code: str | None = None, coord: Coord | None = None):
 72        super().__init__(code, coord)
 73        self.service = NoaaScrapeList("pirep")
 74
 75    @staticmethod
 76    def _report_filter(reports: list[str]) -> list[str]:
 77        """Remove AIREPs before updating raw_reports."""
 78        return [r for r in reports if not r.startswith("ARP")]
 79
 80    async def _post_update(self) -> None:
 81        self.data, self.sanitization = [], []
 82        if self.raw is None:
 83            return
 84        for report in self.raw:
 85            try:
 86                data, sans = parse(report, issued=self.issued)
 87                self.data.append(data)
 88                self.sanitization.append(sans)
 89            except Exception as exc:  # noqa: BLE001
 90                exceptions.exception_intercept(exc, raw=report)  # type: ignore
 91
 92    def _post_parse(self) -> None:
 93        self.data, self.sanitization = [], []
 94        if self.raw is None:
 95            return
 96        for report in self.raw:
 97            data, sans = parse(report, issued=self.issued)
 98            self.data.append(data)
 99            self.sanitization.append(sans)
100
101    @staticmethod
102    def sanitize(report: str) -> str:
103        """Sanitize a PIREP string."""
104        return sanitize(report)[0]

The Pireps class offers an object-oriented approach to managing multiple PIREP reports for a single station.

Below is typical usage for fetching and pulling PIREP data for KJFK.

>>> from avwx import Pireps
>>> kmco = Pireps("KMCO")
>>> kmco.station.name
'Orlando International Airport'
>>> kmco.update()
True
>>> kmco.last_updated
datetime.datetime(2019, 5, 24, 13, 31, 46, 561732, tzinfo=datetime.timezone.utc)
>>> kmco.raw[0]
'FLL UA /OV KFLL275015/TM 1241/FL020/TP B737/SK TOP020/RM DURD RY10L'
>>> kmco.data[0].location
Location(repr='KFLL275015', station='KFLL', direction=Number(repr='275', value=275, spoken='two seven five'), distance=Number(repr='015', value=15, spoken='one five'))

The parse and from_report methods can parse a report string if you want to override the normal fetching process.

Pireps(code: str | None = None, coord: avwx.structs.Coord | None = None)
71    def __init__(self, code: str | None = None, coord: Coord | None = None):
72        super().__init__(code, coord)
73        self.service = NoaaScrapeList("pirep")
data: list[avwx.structs.PirepData | None] | None = None
sanitization: list[avwx.structs.Sanitization | None] | None = None
service
@staticmethod
def sanitize(report: str) -> str:
101    @staticmethod
102    def sanitize(report: str) -> str:
103        """Sanitize a PIREP string."""
104        return sanitize(report)[0]

Sanitize a PIREP string.

def sanitize(report: str) -> tuple[str, avwx.structs.Sanitization]:
322def sanitize(report: str) -> tuple[str, Sanitization]:
323    """Return a sanitized report ready for parsing."""
324    sans = Sanitization()
325    clean = clean_pirep_string(report, sans)
326    data = _sanitize_report_list(clean.split(), sans)
327    return " ".join(data), sans

Return a sanitized report ready for parsing.

def parse( report: str, issued: datetime.date | None = None) -> tuple[avwx.structs.PirepData | None, avwx.structs.Sanitization | None]:
330def parse(report: str, issued: date | None = None) -> tuple[PirepData | None, Sanitization | None]:
331    """Return a PirepData object based on the given report."""
332    if not report:
333        return None, None
334    sanitized, sans = sanitize(report)
335    data = sanitized.split("/")
336    station, report_type = _root(data.pop(0).strip())
337    time, location, altitude, aircraft = None, None, None, None
338    clouds, temperature, turbulence, other = None, None, None, None
339    icing, remarks, flight_visibility, wx_codes = None, None, None, None
340    for item in data:
341        if not item or len(item) < 2:
342            continue
343        tag = item[:2]
344        item = item[2:].strip()  # noqa: PLW2901
345        if tag == "TM":
346            time = _time(item, issued)
347        elif tag == "OV":
348            location = _location(item)
349        elif tag == "FL":
350            altitude = _altitude(item)
351        elif tag == "TP":
352            aircraft = _aircraft(item)
353        elif tag == "SK":
354            clouds = _clouds(item)
355        elif tag == "TA":
356            temperature = _number(item)
357        elif tag == "TB":
358            turbulence = _turbulence(item)
359        elif tag == "IC":
360            icing = _icing(item)
361        elif tag == "RM":
362            remarks = _remarks(item)
363        elif tag == "WX":
364            wx_codes, flight_visibility, other = _wx(item)
365    return (
366        PirepData(
367            aircraft=aircraft,
368            altitude=altitude,
369            clouds=clouds,
370            flight_visibility=flight_visibility,
371            icing=icing,
372            location=location,
373            other=other or [],
374            raw=report,
375            remarks=remarks,
376            sanitized=sanitized,
377            station=station,
378            temperature=temperature,
379            time=time,
380            turbulence=turbulence,
381            type=report_type,
382            wx_codes=wx_codes or [],
383        ),
384        sans,
385    )

Return a PirepData object based on the given report.