avwx.current.pirep

A PIREP (Pilot Report) is an observation made by pilots inflight meant to aid controllers and pilots routing around adverse conditions and other conditions of note. They typically contain icing, turbulence, cloud types/bases/tops, and other info at a known distance and radial from a ground station. They are released as they come in.

  1"""
  2A PIREP (Pilot Report) is an observation made by pilots inflight meant to aid
  3controllers and pilots routing around adverse conditions and other conditions
  4of note. They typically contain icing, turbulence, cloud types/bases/tops, and
  5other info at a known distance and radial from a ground station. They are
  6released as they come in.
  7"""
  8
  9# pylint: disable=too-many-boolean-expressions
 10
 11# stdlib
 12from contextlib import suppress
 13from datetime import date
 14from typing import List, Optional, Tuple, Union
 15
 16# module
 17from avwx import exceptions
 18from avwx.current.base import Reports, get_wx_codes
 19from avwx.parsing import core
 20from avwx.parsing.sanitization.pirep import clean_pirep_string
 21from avwx.service.scrape import NOAA_ScrapeList
 22from avwx.static.core import CARDINALS, CLOUD_LIST
 23from avwx.structs import (
 24    Aircraft,
 25    Cloud,
 26    Code,
 27    Coord,
 28    Icing,
 29    Location,
 30    Number,
 31    PirepData,
 32    Sanitization,
 33    Timestamp,
 34    Turbulence,
 35    Units,
 36)
 37
 38
 39class Pireps(Reports):
 40    """
 41    The Pireps class offers an object-oriented approach to managing multiple
 42    PIREP reports for a single station.
 43
 44    Below is typical usage for fetching and pulling PIREP data for KJFK.
 45
 46    ```python
 47    >>> from avwx import Pireps
 48    >>> kmco = Pireps("KMCO")
 49    >>> kmco.station.name
 50    'Orlando International Airport'
 51    >>> kmco.update()
 52    True
 53    >>> kmco.last_updated
 54    datetime.datetime(2019, 5, 24, 13, 31, 46, 561732, tzinfo=datetime.timezone.utc)
 55    >>> kmco.raw[0]
 56    'FLL UA /OV KFLL275015/TM 1241/FL020/TP B737/SK TOP020/RM DURD RY10L'
 57    >>> kmco.data[0].location
 58    Location(repr='KFLL275015', station='KFLL', direction=Number(repr='275', value=275, spoken='two seven five'), distance=Number(repr='015', value=15, spoken='one five'))
 59    ```
 60
 61    The `parse` and `from_report` methods can parse a report string if you want
 62    to override the normal fetching process.
 63    """
 64
 65    data: Optional[List[Optional[PirepData]]] = None  # type: ignore
 66    sanitization: Optional[List[Optional[Sanitization]]] = None  # type: ignore
 67
 68    def __init__(self, code: Optional[str] = None, coord: Optional[Coord] = None):
 69        super().__init__(code, coord)
 70        self.service = NOAA_ScrapeList("pirep")
 71
 72    @staticmethod
 73    def _report_filter(reports: List[str]) -> List[str]:
 74        """Removes AIREPs before updating raw_reports"""
 75        return [r for r in reports if not r.startswith("ARP")]
 76
 77    async def _post_update(self) -> None:
 78        self.data, self.sanitization = [], []
 79        if self.raw is None:
 80            return
 81        for report in self.raw:
 82            try:
 83                data, sans = parse(report, issued=self.issued)
 84                self.data.append(data)
 85                self.sanitization.append(sans)
 86            except Exception as exc:  # pylint: disable=broad-except
 87                exceptions.exception_intercept(exc, raw=report)  # type: ignore
 88
 89    def _post_parse(self) -> None:
 90        self.data, self.sanitization = [], []
 91        if self.raw is None:
 92            return
 93        for report in self.raw:
 94            data, sans = parse(report, issued=self.issued)
 95            self.data.append(data)
 96            self.sanitization.append(sans)
 97
 98    @staticmethod
 99    def sanitize(report: str) -> str:
100        """Sanitizes a PIREP string"""
101        return sanitize(report)[0]
102
103
104_UNITS = Units.north_american()
105
106
107def _root(item: str) -> Tuple[Optional[str], Optional[str]]:
108    """Parses report root data including station and report type"""
109    # pylint: disable=redefined-argument-from-local
110    report_type = None
111    station = None
112    for item in item.split():
113        if item in ("UA", "UUA"):
114            report_type = item
115        elif not station:
116            station = item
117    return station, report_type
118
119
120def _location(item: str) -> Optional[Location]:
121    """Convert a location element to a Location object"""
122    items = item.split()
123    for target in ("MILES", "OF"):
124        with suppress(ValueError):
125            items.remove(target)
126    if not items:
127        return None
128    station, direction, distance = None, None, None
129    direction_number, distance_number = None, None
130    if len(items) == 1:
131        ilen = len(item)
132        # MLB
133        if ilen < 5:
134            station = item
135        # MKK360002 or KLGA220015
136        elif ilen in {9, 10} and item[-6:].isdigit():
137            station, direction, distance = item[:-6], item[-6:-3], item[-3:]
138    # 10 WGON
139    # 10 EAST
140    # 15 SW LRP
141    elif items[0].isdigit():
142        if items[1] in CARDINALS:
143            distance, direction = items[0], items[1]
144            if len(items) == 3:
145                station = items[2]
146        else:
147            station, direction, distance = items[1][-3:], items[1][:-3], items[0]
148    # GON 270010
149    elif items[1].isdigit():
150        station, direction, distance = items[0], items[1][:3], items[1][3:]
151    # Convert non-null elements
152    if direction:
153        direction_number = core.make_number(direction, literal=True)
154    if distance:
155        distance_number = core.make_number(distance)
156    return Location(item, station, direction_number, distance_number)
157
158
159def _time(item: str, target: Optional[date] = None) -> Optional[Timestamp]:
160    """Convert a time element to a Timestamp"""
161    return core.make_timestamp(item, time_only=True, target_date=target)
162
163
164def _altitude(item: str) -> Union[Number, str, None]:
165    """Convert reporting altitude to a Number or string"""
166    alt = core.make_number(item) if item.isdigit() else item
167    return alt or None
168
169
170def _aircraft(item: str) -> Union[Aircraft, str]:
171    """Returns the Aircraft from the ICAO code"""
172    try:
173        return Aircraft.from_icao(item)
174    except ValueError:
175        return item
176
177
178def _non_digit_cloud(cloud: str) -> Tuple[Optional[str], str]:
179    """Returns cloud type and altitude for non-digit TOPS BASES cloud elements"""
180    # 5000FT
181    if cloud.endswith("FT"):
182        cloud = cloud[:-4]
183        if cloud.isdigit():
184            return None, cloud
185    if "-" not in cloud:
186        return cloud[:3], cloud[3:]
187    # SCT030-035
188    parts = cloud.split("-")
189    return (None, parts[-1]) if parts[0].isdigit() else (parts[0][:3], parts[-1])
190
191
192def _clouds(item: str) -> List[Cloud]:
193    """Convert cloud element to a list of Clouds"""
194    clouds = item.replace(",", "").split()
195    # BASES 004 TOPS 016
196    # BASES SCT030 TOPS SCT058
197    if "BASES" in clouds and "TOPS" in clouds:
198        cloud_type = None
199        base = clouds[clouds.index("BASES") + 1]
200        top = clouds[clouds.index("TOPS") + 1]
201        if not base.isdigit():
202            cloud_type, base = _non_digit_cloud(base)
203        if not top.isdigit():
204            cloud_type, top = _non_digit_cloud(top)
205        return [Cloud(item, cloud_type, base=int(base), top=int(top))]
206    return [core.make_cloud(cloud) for cloud in clouds]
207
208
209def _number(item: str) -> Optional[Number]:
210    """Convert an element to a Number"""
211    value = item.strip("CF ")
212    return None if " " in value else core.make_number(value, item)
213
214
215def _separate_floor_ceiling(item: str) -> Tuple[Optional[Number], Optional[Number]]:
216    """Extract floor and ceiling numbers from hyphen string"""
217    floor_str, ceiling_str = item.split("-")
218    floor = core.make_number(floor_str)
219    ceiling = core.make_number(ceiling_str)
220    if (
221        floor
222        and ceiling
223        and floor.value
224        and ceiling.value
225        and floor.value > ceiling.value
226    ):
227        return ceiling, floor
228    return floor, ceiling
229
230
231def _find_floor_ceiling(
232    items: List[str],
233) -> Tuple[List[str], Optional[Number], Optional[Number]]:
234    """Extracts the floor and ceiling from item list"""
235    floor: Optional[Number] = None
236    ceiling: Optional[Number] = None
237
238    for i, item in enumerate(items):
239        hloc = item.find("-")
240        # TRACE RIME 070-090
241        if hloc > -1 and item[:hloc].isdigit() and item[hloc + 1 :].isdigit():
242            floor, ceiling = _separate_floor_ceiling(items.pop(i))
243            break
244        # CONT LGT CHOP BLO 250
245        if item == "BLO":
246            altitude = items[i + 1]
247            if "-" in altitude:
248                floor, ceiling = _separate_floor_ceiling(altitude)
249            else:
250                ceiling = core.make_number(altitude)
251            items = items[:i]
252            break
253        # LGT RIME 025
254        if item.isdigit():
255            num = core.make_number(item)
256            floor, ceiling = num, num
257            break
258    return items, floor, ceiling
259
260
261def _turbulence(item: str) -> Turbulence:
262    """Convert reported turbulence to a Turbulence object"""
263    items, floor, ceiling = _find_floor_ceiling(item.split())
264    return Turbulence(
265        severity=" ".join(items),
266        floor=floor,
267        ceiling=ceiling,
268    )
269
270
271def _icing(item: str) -> Icing:
272    """Convert reported icing to an Icing object"""
273    items, floor, ceiling = _find_floor_ceiling(item.split())
274    severity = items.pop(0) if items else ""
275    return Icing(
276        severity=severity,
277        floor=floor,
278        ceiling=ceiling,
279        type=items[0] if items else None,
280    )
281
282
283def _remarks(item: str) -> str:
284    """Returns the remarks. Reserved for later parsing"""
285    return item
286
287
288def _wx(item: str) -> Tuple[List[Code], Optional[Number], List[str]]:
289    """Parses remaining weather elements"""
290    # pylint: disable=redefined-argument-from-local
291    other: List[str] = []
292    flight_visibility = None
293    for item in item.split():
294        if len(item) > 2 and item.startswith("FV"):
295            _, flight_visibility = core.get_visibility([item[2:]], _UNITS)
296        else:
297            other.append(item)
298    other, wx_codes = get_wx_codes(other)
299    return wx_codes, flight_visibility, other
300
301
302def _sanitize_report_list(data: List[str], sans: Sanitization) -> List[str]:
303    """Fixes report elements based on neighbor values"""
304    for i, item in reversed(list(enumerate(data))):
305        # Fix spaced cloud top Ex: BKN030 TOP045   BASE020 TOPS074
306        # But not BASES SCT014 TOPS SCT021
307        if (
308            item.startswith("TOP")
309            and item != "TOPS"
310            and i > 0
311            and len(data[i - 1]) >= 6
312            and (data[i - 1][:3] in CLOUD_LIST or data[i - 1].startswith("BASE"))
313        ):
314            key = f"{data[i-1]} {item}"
315            data[i - 1] += f"-{data.pop(i)}"
316            sans.log(key, data[i - 1])
317        # Fix separated clouds Ex: BASES OVC 049 TOPS 055
318        elif item in CLOUD_LIST and i + 1 < len(data) and data[i + 1].isdigit():
319            data[i] = item + data.pop(i + 1)
320            sans.extra_spaces_found = True
321    deduped = core.dedupe(data, only_neighbors=True)
322    if len(data) != len(deduped):
323        sans.duplicates_found = True
324    return deduped
325
326
327def sanitize(report: str) -> Tuple[str, Sanitization]:
328    """Returns a sanitized report ready for parsing"""
329    sans = Sanitization()
330    clean = clean_pirep_string(report, sans)
331    data = _sanitize_report_list(clean.split(), sans)
332    return " ".join(data), sans
333
334
335def parse(
336    report: str, issued: Optional[date] = None
337) -> Tuple[Optional[PirepData], Optional[Sanitization]]:
338    """Returns a PirepData object based on the given report"""
339    # pylint: disable=too-many-locals,too-many-branches
340    if not report:
341        return None, None
342    sanitized, sans = sanitize(report)
343    data = sanitized.split("/")
344    station, report_type = _root(data.pop(0).strip())
345    time, location, altitude, aircraft = None, None, None, None
346    clouds, temperature, turbulence, other = None, None, None, None
347    icing, remarks, flight_visibility, wx_codes = None, None, None, None
348    for item in data:
349        if not item or len(item) < 2:
350            continue
351        tag = item[:2]
352        item = item[2:].strip()
353        if tag == "TM":
354            time = _time(item, issued)
355        elif tag == "OV":
356            location = _location(item)
357        elif tag == "FL":
358            altitude = _altitude(item)
359        elif tag == "TP":
360            aircraft = _aircraft(item)
361        elif tag == "SK":
362            clouds = _clouds(item)
363        elif tag == "TA":
364            temperature = _number(item)
365        elif tag == "TB":
366            turbulence = _turbulence(item)
367        elif tag == "IC":
368            icing = _icing(item)
369        elif tag == "RM":
370            remarks = _remarks(item)
371        elif tag == "WX":
372            wx_codes, flight_visibility, other = _wx(item)
373    return (
374        PirepData(
375            aircraft=aircraft,
376            altitude=altitude,
377            clouds=clouds,
378            flight_visibility=flight_visibility,
379            icing=icing,
380            location=location,
381            other=other or [],
382            raw=report,
383            remarks=remarks,
384            sanitized=sanitized,
385            station=station,
386            temperature=temperature,
387            time=time,
388            turbulence=turbulence,
389            type=report_type,
390            wx_codes=wx_codes or [],
391        ),
392        sans,
393    )
class Pireps(avwx.current.base.Reports):
 40class Pireps(Reports):
 41    """
 42    The Pireps class offers an object-oriented approach to managing multiple
 43    PIREP reports for a single station.
 44
 45    Below is typical usage for fetching and pulling PIREP data for KJFK.
 46
 47    ```python
 48    >>> from avwx import Pireps
 49    >>> kmco = Pireps("KMCO")
 50    >>> kmco.station.name
 51    'Orlando International Airport'
 52    >>> kmco.update()
 53    True
 54    >>> kmco.last_updated
 55    datetime.datetime(2019, 5, 24, 13, 31, 46, 561732, tzinfo=datetime.timezone.utc)
 56    >>> kmco.raw[0]
 57    'FLL UA /OV KFLL275015/TM 1241/FL020/TP B737/SK TOP020/RM DURD RY10L'
 58    >>> kmco.data[0].location
 59    Location(repr='KFLL275015', station='KFLL', direction=Number(repr='275', value=275, spoken='two seven five'), distance=Number(repr='015', value=15, spoken='one five'))
 60    ```
 61
 62    The `parse` and `from_report` methods can parse a report string if you want
 63    to override the normal fetching process.
 64    """
 65
 66    data: Optional[List[Optional[PirepData]]] = None  # type: ignore
 67    sanitization: Optional[List[Optional[Sanitization]]] = None  # type: ignore
 68
 69    def __init__(self, code: Optional[str] = None, coord: Optional[Coord] = None):
 70        super().__init__(code, coord)
 71        self.service = NOAA_ScrapeList("pirep")
 72
 73    @staticmethod
 74    def _report_filter(reports: List[str]) -> List[str]:
 75        """Removes AIREPs before updating raw_reports"""
 76        return [r for r in reports if not r.startswith("ARP")]
 77
 78    async def _post_update(self) -> None:
 79        self.data, self.sanitization = [], []
 80        if self.raw is None:
 81            return
 82        for report in self.raw:
 83            try:
 84                data, sans = parse(report, issued=self.issued)
 85                self.data.append(data)
 86                self.sanitization.append(sans)
 87            except Exception as exc:  # pylint: disable=broad-except
 88                exceptions.exception_intercept(exc, raw=report)  # type: ignore
 89
 90    def _post_parse(self) -> None:
 91        self.data, self.sanitization = [], []
 92        if self.raw is None:
 93            return
 94        for report in self.raw:
 95            data, sans = parse(report, issued=self.issued)
 96            self.data.append(data)
 97            self.sanitization.append(sans)
 98
 99    @staticmethod
100    def sanitize(report: str) -> str:
101        """Sanitizes a PIREP string"""
102        return sanitize(report)[0]

The Pireps class offers an object-oriented approach to managing multiple PIREP reports for a single station.

Below is typical usage for fetching and pulling PIREP data for KJFK.

>>> from avwx import Pireps
>>> kmco = Pireps("KMCO")
>>> kmco.station.name
'Orlando International Airport'
>>> kmco.update()
True
>>> kmco.last_updated
datetime.datetime(2019, 5, 24, 13, 31, 46, 561732, tzinfo=datetime.timezone.utc)
>>> kmco.raw[0]
'FLL UA /OV KFLL275015/TM 1241/FL020/TP B737/SK TOP020/RM DURD RY10L'
>>> kmco.data[0].location
Location(repr='KFLL275015', station='KFLL', direction=Number(repr='275', value=275, spoken='two seven five'), distance=Number(repr='015', value=15, spoken='one five'))

The parse and from_report methods can parse a report string if you want to override the normal fetching process.

Pireps( code: Optional[str] = None, coord: Optional[avwx.structs.Coord] = None)
69    def __init__(self, code: Optional[str] = None, coord: Optional[Coord] = None):
70        super().__init__(code, coord)
71        self.service = NOAA_ScrapeList("pirep")
data: Optional[List[Optional[avwx.structs.PirepData]]] = None
sanitization: Optional[List[Optional[avwx.structs.Sanitization]]] = None
service
@staticmethod
def sanitize(report: str) -> str:
 99    @staticmethod
100    def sanitize(report: str) -> str:
101        """Sanitizes a PIREP string"""
102        return sanitize(report)[0]

Sanitizes a PIREP string

def sanitize(report: str) -> Tuple[str, avwx.structs.Sanitization]:
328def sanitize(report: str) -> Tuple[str, Sanitization]:
329    """Returns a sanitized report ready for parsing"""
330    sans = Sanitization()
331    clean = clean_pirep_string(report, sans)
332    data = _sanitize_report_list(clean.split(), sans)
333    return " ".join(data), sans

Returns a sanitized report ready for parsing

def parse( report: str, issued: Optional[datetime.date] = None) -> Tuple[Optional[avwx.structs.PirepData], Optional[avwx.structs.Sanitization]]:
336def parse(
337    report: str, issued: Optional[date] = None
338) -> Tuple[Optional[PirepData], Optional[Sanitization]]:
339    """Returns a PirepData object based on the given report"""
340    # pylint: disable=too-many-locals,too-many-branches
341    if not report:
342        return None, None
343    sanitized, sans = sanitize(report)
344    data = sanitized.split("/")
345    station, report_type = _root(data.pop(0).strip())
346    time, location, altitude, aircraft = None, None, None, None
347    clouds, temperature, turbulence, other = None, None, None, None
348    icing, remarks, flight_visibility, wx_codes = None, None, None, None
349    for item in data:
350        if not item or len(item) < 2:
351            continue
352        tag = item[:2]
353        item = item[2:].strip()
354        if tag == "TM":
355            time = _time(item, issued)
356        elif tag == "OV":
357            location = _location(item)
358        elif tag == "FL":
359            altitude = _altitude(item)
360        elif tag == "TP":
361            aircraft = _aircraft(item)
362        elif tag == "SK":
363            clouds = _clouds(item)
364        elif tag == "TA":
365            temperature = _number(item)
366        elif tag == "TB":
367            turbulence = _turbulence(item)
368        elif tag == "IC":
369            icing = _icing(item)
370        elif tag == "RM":
371            remarks = _remarks(item)
372        elif tag == "WX":
373            wx_codes, flight_visibility, other = _wx(item)
374    return (
375        PirepData(
376            aircraft=aircraft,
377            altitude=altitude,
378            clouds=clouds,
379            flight_visibility=flight_visibility,
380            icing=icing,
381            location=location,
382            other=other or [],
383            raw=report,
384            remarks=remarks,
385            sanitized=sanitized,
386            station=station,
387            temperature=temperature,
388            time=time,
389            turbulence=turbulence,
390            type=report_type,
391            wx_codes=wx_codes or [],
392        ),
393        sans,
394    )

Returns a PirepData object based on the given report