avwx.current.airsigmet

A SIGMET (Significant Meteorological Information) is a weather advisory for the safety of all aircraft. They are divided into:

  • Convective - thunderstorms, hail, and cyclones
  • Non-Convective - turbulence, icing, dust clouds, volcanic activity, and radiation

An AIRMET (Airman's Meteorological Information) is a weather advisory for smaller aircraft or VFR navigation. They are divided into:

  • Sierra - IFR conditions like low ceilings and mountain obscuration
  • Tango - turbulence and high surface winds
  • Zulu - icing and freezing levels

Both types share a similar report format and therefore are combined into a single handling class. The Bulletin and weather type can be used to classify each as a SIGMET or AIRMET for filtering purposes.

  1"""
  2A SIGMET (Significant Meteorological Information) is a weather advisory for the
  3safety of all aircraft. They are divided into:
  4
  5- Convective - thunderstorms, hail, and cyclones
  6- Non-Convective - turbulence, icing, dust clouds, volcanic activity, and radiation
  7
  8An AIRMET (Airman's Meteorological Information) is a weather advisory for
  9smaller aircraft or VFR navigation. They are divided into:
 10
 11- Sierra - IFR conditions like low ceilings and mountain obscuration
 12- Tango - turbulence and high surface winds
 13- Zulu - icing and freezing levels
 14
 15Both types share a similar report format and therefore are combined into a
 16single handling class. The `Bulletin` and weather type can be used to classify
 17each as a SIGMET or AIRMET for filtering purposes.
 18"""
 19
 20# stdlib
 21import asyncio as aio
 22import re
 23from contextlib import suppress
 24from datetime import date, datetime, timezone
 25from itertools import chain
 26from typing import List, Optional, Tuple
 27
 28# library
 29from geopy.distance import distance as geo_distance  # type: ignore
 30
 31# module
 32from avwx import exceptions
 33from avwx.base import AVWXBase
 34from avwx.flight_path import to_coordinates
 35from avwx.load_utils import LazyLoad
 36from avwx.parsing import core
 37from avwx.service.bulk import NOAA_Bulk, NOAA_Intl, Service
 38from avwx.static.core import CARDINAL_DEGREES, CARDINALS
 39from avwx.static.airsigmet import BULLETIN_TYPES, INTENSITY, WEATHER_TYPES
 40from avwx.structs import (
 41    AirSigmetData,
 42    Bulletin,
 43    Code,
 44    Coord,
 45    Movement,
 46    Number,
 47    AirSigObservation,
 48    Timestamp,
 49    Units,
 50)
 51
 52try:
 53    from shapely.geometry import LineString  # type: ignore
 54except ModuleNotFoundError:
 55    LineString = None  # pylint: disable=invalid-name
 56
 57
 58class AirSigmet(AVWXBase):
 59    """
 60    In addition to the manager, you can use the `avwx.AirSigmet` class like any
 61    other report when you supply the report string via `parse` or
 62    `from_report`.
 63
 64    ```python
 65    >>> from avwx import AirSigmet
 66    >>> report = 'WSPR31 SPJC 270529 SPIM SIGMET 3 VALID 270530/270830 SPJC- SPIM LIMA FIR EMBD TS OBS AT 0510Z NE OF LINE S0406 W07103 - S0358 W07225 - S0235 W07432 - S0114 W07503 TOP FL410 MOV SW NC='
 67    >>> sigmet = AirSigmet.from_report(report)
 68    True
 69    >>> sigmet.last_updated
 70    datetime.datetime(2022, 3, 27, 6, 29, 33, 300935, tzinfo=datetime.timezone.utc)
 71    >>> sigmet.data.observation.coords
 72    [Coord(lat=-4.06, lon=-71.03, repr='S0406 W07103'),
 73    Coord(lat=-3.58, lon=-72.25, repr='S0358 W07225'),
 74    Coord(lat=-2.35, lon=-74.32, repr='S0235 W07432'),
 75    Coord(lat=-1.14, lon=-75.03, repr='S0114 W07503')]
 76    >>> sigmet.data.observation.intensity
 77    Code(repr='NC', value='No change')
 78    >>> sigmet.data.observation.ceiling
 79    Number(repr='FL410', value=410, spoken='flight level four one zero')
 80    ```
 81    """
 82
 83    data: Optional[AirSigmetData] = None
 84
 85    def _post_parse(self) -> None:
 86        if self.raw:
 87            self.data, self.units = parse(self.raw, self.issued)
 88
 89    @staticmethod
 90    def sanitize(report: str) -> str:
 91        """Sanitizes the report string"""
 92        return sanitize(report)
 93
 94    def intersects(self, path: LineString) -> bool:
 95        """Returns True if the report area intersects a flight path"""
 96        if LineString is None:
 97            raise ModuleNotFoundError("Install avwx-engine[shape] to use this feature")
 98        if not self.data:
 99            return False
100        for data in (self.data.observation, self.data.forecast):
101            if data:
102                poly = data.poly
103                if poly and path.intersects(poly):
104                    return True
105        return False
106
107    def contains(self, coord: Coord) -> bool:
108        """Returns True if the report area contains a coordinate"""
109        if not self.data:
110            return False
111        for data in (self.data.observation, self.data.forecast):
112            if data:
113                poly = data.poly
114                if poly and coord.point.within(poly):
115                    return True
116        return False
117
118
119class AirSigManager:
120    """
121    Because of the global nature of these report types, we don't initialize a
122    report class with a station ident like the other report types. Instead, we
123    use a class to manage and update the list of all active SIGMET and AIRMET
124    reports.
125
126    ```python
127    >>> from avwx import AirSigManager
128    >>> from avwx.structs import Coord
129    >>> manager = AirSigManager()
130    >>> manager.update()
131    True
132    >>> manager.last_updated
133    datetime.datetime(2022, 3, 27, 5, 54, 21, 516741, tzinfo=datetime.timezone.utc)
134    >>> len(manager.reports)
135    113
136    >>> len(manager.contains(Coord(lat=33.12, lon=-105)))
137    5
138    >>> manager.reports[0].data.bulletin.type
139    Code(repr='WA', value='airmet')
140    >>> manager.reports[0].data.type
141    'AIRMET SIERRA FOR IFR AND MTN OBSCN'
142    ```
143    """
144
145    _services: List[Service]
146    _raw: List[Tuple[str, Optional[str]]]
147    last_updated: Optional[datetime] = None
148    raw: List[str]
149    reports: Optional[List[AirSigmet]] = None
150
151    def __init__(self):  # type: ignore
152        self._services = [NOAA_Bulk("airsigmet"), NOAA_Intl("airsigmet")]
153        self._raw, self.raw = [], []
154
155    async def _update(
156        self, index: int, timeout: int
157    ) -> List[Tuple[str, Optional[str]]]:
158        source = self._services[index].root
159        reports = await self._services[index].async_fetch(timeout=timeout)  # type: ignore
160        raw: List[Tuple[str, Optional[str]]] = [
161            (report, source) for report in reports if report
162        ]
163        return raw
164
165    def update(self, timeout: int = 10, disable_post: bool = False) -> bool:
166        """Updates fetched reports and returns whether they've changed"""
167        return aio.run(self.async_update(timeout, disable_post))
168
169    async def async_update(self, timeout: int = 10, disable_post: bool = False) -> bool:
170        """Updates fetched reports and returns whether they've changed"""
171        coros = [self._update(i, timeout) for i in range(len(self._services))]
172        data = await aio.gather(*coros)
173        raw = list(chain.from_iterable(data))
174        reports = [i[0] for i in raw]
175        if raw == self._raw:
176            return False
177        self._raw, self.raw = raw, reports
178        self.last_updated = datetime.now(tz=timezone.utc)
179        # Parse reports if not disabled
180        if not disable_post:
181            parsed = []
182            for report, source in raw:
183                try:
184                    if obj := AirSigmet.from_report(report):
185                        obj.source = source
186                        parsed.append(obj)
187                except Exception as exc:  # pylint: disable=broad-except
188                    exceptions.exception_intercept(exc, raw={"report": report})
189            self.reports = parsed
190        return True
191
192    def along(self, coords: List[Coord]) -> List[AirSigmet]:
193        """Returns available reports the intersect a flight path"""
194        if LineString is None:
195            raise ModuleNotFoundError("Install avwx-engine[shape] to use this feature")
196        if self.reports is None:
197            return []
198        path = LineString([c.pair for c in coords])
199        return [r for r in self.reports if r.intersects(path)]
200
201    def contains(self, coord: Coord) -> List[AirSigmet]:
202        """Returns available reports that contain a coordinate"""
203        if self.reports is None:
204            return []
205        return [r for r in self.reports if r.contains(coord)]
206
207
208# N1429 W09053 - N1427 W09052 - N1411 W09139 - N1417 W09141
209_COORD_PATTERN = re.compile(r"\b[NS]\d{4} [EW]\d{5}\b( -)?")
210
211# FROM 60NW ISN-INL-TVC-GIJ-UIN-FSD-BIL-60NW ISN
212# FROM 70SSW ISN TO 20NNW FAR TO 70E DLH TO 40SE EAU TO 80SE RAP TO 40NNW BFF TO 70SSW
213_NAVAID_PATTERN = re.compile(
214    r"\b(\d{1,3}[NESW]{1,3} [A-z]{3}-?\b)|((-|(TO )|(FROM ))[A-z]{3}\b)"
215)
216
217# N OF N2050 AND S OF N2900
218_LATTERAL_PATTERN = re.compile(
219    r"\b([NS] OF [NS]\d{2,4})|([EW] OF [EW]\d{3,5})( AND)?\b"
220)
221
222NAVAIDS = LazyLoad("navaids")
223
224# Used to assist parsing after sanitized. Removed after parse
225_FLAGS = {
226    "...": " <elip> ",
227    "..": " <elip> ",
228    ". ": " <break> ",
229    "/VIS ": " <vis> VIS ",
230}
231
232
233def _parse_prep(report: str) -> List[str]:
234    """Prepares sanitized string by replacing elements with flags"""
235    report = report.rstrip(".")
236    for key, val in _FLAGS.items():
237        report = report.replace(key, val)
238    return report.split()
239
240
241def _clean_flags(data: List[str]) -> List[str]:
242    return [i for i in data if i[0] != "<"]
243
244
245def _bulletin(value: str) -> Bulletin:
246    # if len(value) != 6:
247    #     return None
248    type_key = value[:2]
249    return Bulletin(
250        repr=value,
251        type=Code(repr=type_key, value=BULLETIN_TYPES[type_key]),
252        country=value[2:4],
253        number=int(value[4:]),
254    )
255
256
257def _header(data: List[str]) -> Tuple[List[str], Bulletin, str, str, Optional[str]]:
258    bulletin = _bulletin(data[0])
259    correction, end = (data[3], 4) if len(data[3]) == 3 else (None, 3)
260    return data[end:], bulletin, data[1], data[2], correction
261
262
263def _spacetime(
264    data: List[str],
265) -> Tuple[List[str], str, str, Optional[str], str, Optional[str]]:
266    area = data.pop(0)
267    # Skip airmet type + time repeat
268    if data[0] == "WA" and data[1].isdigit():
269        data = data[2:]
270        area = area[:-1]  # Remove type label from 3-letter ident
271    valid_index = data.index("VALID")
272    report_type = " ".join(data[:valid_index])
273    data = data[valid_index + 1 :]
274    if data[0] == "UNTIL":
275        start_time = None
276        end_time = data[1]
277        data = data[2:]
278    else:
279        target = "-" if "-" in data[0] else "/"
280        start_time, end_time = data.pop(0).split(target)
281    # KMCO- ORL FIR
282    if data[0][-1] == "-":
283        station = data.pop(0)[:-1]
284    # KMCO - KMCO
285    elif data[1] == "-" and len(data[0]) == 4:
286        station = data.pop(0)
287        data.pop(0)
288    else:
289        station = None
290    return data, area, report_type, start_time, end_time, station
291
292
293def _first_index(data: List[str], *targets: str) -> int:
294    for target in targets:
295        with suppress(ValueError):
296            return data.index(target)
297    return -1
298
299
300def _region(data: List[str]) -> Tuple[List[str], str]:
301    # FIR/CTA region name
302    # Or non-standard name using lookahead Ex: FL CSTL WTRS FROM 100SSW
303    name_end = _first_index(data, "FIR", "CTA") + 1 or _first_index(data, "FROM")
304    # State list
305    if not name_end:
306        for item in data:
307            if len(item) == 2:
308                name_end += 1
309            else:
310                break
311    name = " ".join(data[:name_end])
312    return data[name_end:], name
313
314
315def _time(
316    data: List[str], issued: Optional[date] = None
317) -> Tuple[List[str], Optional[Timestamp], Optional[Timestamp]]:
318    """Extracts the start and/or end time based on a couple starting elements"""
319    index = _first_index(data, "AT", "FCST", "UNTIL", "VALID", "OUTLOOK", "OTLK")
320    if index == -1:
321        return data, None, None
322    start_item = data.pop(index)
323    start, end, observed = None, None, None
324    if "-" in data[index]:
325        start_item, end_item = data.pop(index).split("-")
326        start = core.make_timestamp(
327            start_item, time_only=len(start_item) < 6, target_date=issued
328        )
329        end = core.make_timestamp(
330            end_item, time_only=len(end_item) < 6, target_date=issued
331        )
332    elif len(data[index]) >= 4 and data[index][:4].isdigit():
333        observed = core.make_timestamp(
334            data.pop(index), time_only=True, target_date=issued
335        )
336        if index > 0 and data[index - 1] == "OBS":
337            data.pop(index - 1)
338    for remv in ("FCST", "OUTLOOK", "OTLK", "VALID"):
339        with suppress(ValueError):
340            data.remove(remv)
341    if observed:
342        if start_item in ("UNTIL", "VALID"):
343            end = observed
344        else:
345            start = observed
346    return data, start, end
347
348
349def _coord_value(value: str) -> float:
350    if value[0] in ("N", "S"):
351        index, strip, replace = 3, "N", "S"
352    else:
353        index, strip, replace = 4, "E", "W"
354    num = f"{value[:index]}.{value[index:]}".lstrip(strip).replace(replace, "-")
355    return float(num)
356
357
358def _position(data: List[str]) -> Tuple[List[str], Optional[Coord]]:
359    try:
360        index = data.index("PSN")
361    except ValueError:
362        return data, None
363    data.pop(index)
364    raw = f"{data[index]} {data[index + 1]}"
365    lat = _coord_value(data.pop(index))
366    lon = _coord_value(data.pop(index))
367    return data, Coord(lat=lat, lon=lon, repr=raw)
368
369
370def _movement(
371    data: List[str], units: Units
372) -> Tuple[List[str], Units, Optional[Movement]]:
373    with suppress(ValueError):
374        data.remove("STNR")
375        speed = core.make_number("STNR")
376        return data, units, Movement(repr="STNR", direction=None, speed=speed)
377    try:
378        index = data.index("MOV")
379    except ValueError:
380        return data, units, None
381    raw = data.pop(index)
382    direction_str = data.pop(index)
383    # MOV CNL
384    if direction_str == "CNL":
385        return data, units, None
386    raw += f" {direction_str} "
387    # MOV FROM 23040KT
388    if direction_str == "FROM":
389        value = data[index][:3]
390        raw += value
391        direction = core.make_number(value)
392        data[index] = data[index][3:]
393    # MOV E 45KMH
394    else:
395        direction = core.make_number(
396            direction_str.replace("/", ""), literal=True, special=CARDINAL_DEGREES
397        )
398    speed = None
399    with suppress(IndexError):
400        kt_unit, kmh_unit = data[index].endswith("KT"), data[index].endswith("KMH")
401        if kt_unit or kmh_unit:
402            units.wind_speed = "kmh" if kmh_unit else "kt"
403            speed_str = data.pop(index)
404            raw += speed_str
405            # Remove bottom speed Ex: MOV W 05-10KT
406            if "-" in speed_str:
407                speed_str = speed_str[speed_str.find("-") + 1 :]
408            speed = core.make_number(speed_str[: -3 if kmh_unit else -2])
409    return data, units, Movement(repr=raw.strip(), direction=direction, speed=speed)
410
411
412def _info_from_match(match: re.Match, start: int) -> Tuple[str, int]:
413    """Returns the matching text and starting location if none yet available"""
414    if start == -1:
415        start = match.start()
416    return match.group(), start
417
418
419def _pre_break(report: str) -> str:
420    break_index = report.find(" <break> ")
421    return report[:break_index] if break_index != -1 else report
422
423
424def _bounds_from_latterals(report: str, start: int) -> Tuple[str, List[str], int]:
425    """Extract coordinate latterals from report Ex: N OF N2050"""
426    bounds = []
427    for match in _LATTERAL_PATTERN.finditer(_pre_break(report)):
428        group, start = _info_from_match(match, start)
429        # post 3.8 bounds.append(group.removesuffix(" AND"))
430        if group.endswith(" AND"):
431            group = group[:-4]
432        bounds.append(group)
433        report = report.replace(group, " ")
434    return report, bounds, start
435
436
437def _coords_from_text(report: str, start: int) -> Tuple[str, List[Coord], int]:
438    """Extract raw coordinate values from report Ex: N4409 E01506"""
439    coords = []
440    for match in _COORD_PATTERN.finditer(_pre_break(report)):
441        group, start = _info_from_match(match, start)
442        text = group.strip(" -")
443        lat, lon = text.split()
444        coord = Coord(lat=_coord_value(lat), lon=_coord_value(lon), repr=text)
445        coords.append(coord)
446        report = report.replace(group, " ")
447    return report, coords, start
448
449
450def _coords_from_navaids(report: str, start: int) -> Tuple[str, List[Coord], int]:
451    """Extract navaid referenced coordinates from report Ex: 30SSW BNA"""
452    # pylint: disable=too-many-locals
453    coords, navs = [], []
454    for match in _NAVAID_PATTERN.finditer(_pre_break(report)):
455        group, start = _info_from_match(match, start)
456        report = report.replace(group, " ")
457        group = group.strip("-")  # post 3.8 .removeprefix("FROM ").removeprefix("TO ")
458        for end in ("FROM", "TO"):
459            if group.startswith(f"{end} "):
460                group = group[(len(end) + 1) :]
461        navs.append((group, *group.split()))
462    locs = to_coordinates([n[2 if len(n) == 3 else 1] for n in navs])
463    for i, nav in enumerate(navs):
464        value = nav[0]
465        if len(nav) == 3:
466            vector, num_index = nav[1], 0
467            while vector[num_index].isdigit():
468                num_index += 1
469            distance, bearing = (
470                int(vector[:num_index]),
471                CARDINAL_DEGREES[vector[num_index:]],
472            )
473            loc = geo_distance(nautical=distance).destination(
474                locs[i].pair, bearing=bearing
475            )
476            coord = Coord(lat=loc.latitude, lon=loc.longitude, repr=value)
477        else:
478            coord = locs[i]
479            coord.repr = value
480        coords.append(coord)
481    return report, coords, start
482
483
484def _bounds(data: List[str]) -> Tuple[List[str], List[Coord], List[str]]:
485    """Extract coordinate bounds by coord, navaid, and latterals"""
486    report, start = " ".join(data), -1
487    report, bounds, start = _bounds_from_latterals(report, start)
488    report, coords, start = _coords_from_text(report, start)
489    report, navs, start = _coords_from_navaids(report, start)
490    coords += navs
491    for target in ("FROM", "WI", "BOUNDED", "OBS"):
492        index = report.find(f"{target} ")
493        if index != -1 and index < start:
494            start = index
495    report = report[:start] + report[report.rfind("  ") :]
496    data = [s for s in report.split() if s]
497    return data, coords, bounds
498
499
500def _altitudes(
501    data: List[str], units: Units
502) -> Tuple[List[str], Units, Optional[Number], Optional[Number]]:
503    """Extract the floor and ceiling altitudes"""
504    floor, ceiling = None, None
505    for i, item in enumerate(data):
506        # BTN FL180 AND FL330
507        if item == "BTN" and len(data) > i + 2 and data[i + 2] == "AND":
508            floor, units = core.make_altitude(data[i + 1], units)
509            ceiling, units = core.make_altitude(data[i + 3], units)
510            data = data[:i] + data[i + 4 :]
511            break
512        # TOPS ABV FL450
513        if item in ("TOP", "TOPS", "BLW"):
514            if data[i + 1] == "ABV":
515                ceiling = core.make_number(f"ABV {data[i + 2]}")
516                data = data[:i] + data[i + 3 :]
517                break
518            if data[i + 1] == "BLW":
519                ceiling = core.make_number(f"BLW {data[i + 2]}")
520                data = data[:i] + data[i + 3 :]
521                break
522            # TOPS TO FL310
523            if data[i + 1] == "TO":
524                data.pop(i)
525            ceiling, units = core.make_altitude(data[i + 1], units)
526            data = data[:i] + data[i + 2 :]
527            # CIG BLW 010
528            if data[i - 1] == "CIG":
529                data.pop(i - 1)
530            break
531        # FL060/300 SFC/FL160
532        if core.is_altitude(item):
533            if "/" in item:
534                floor_val, ceiling_val = item.split("/")
535                floor, units = core.make_altitude(floor_val, units)
536                if (floor_val == "SFC" or floor_val[:2] == "FL") and ceiling_val[
537                    :2
538                ] != "FL":
539                    ceiling, units = core.make_altitude(
540                        ceiling_val, units, force_fl=True
541                    )
542                else:
543                    ceiling, units = core.make_altitude(ceiling_val, units)
544            else:
545                ceiling, units = core.make_altitude(item, units)
546            data.pop(i)
547            break
548    return data, units, floor, ceiling
549
550
551def _weather_type(data: List[str]) -> Tuple[List[str], Optional[Code]]:
552    weather = None
553    report = " ".join(data)
554    for key, val in WEATHER_TYPES.items():
555        if key in report:
556            weather = Code(repr=key, value=val)
557            data = [i for i in report.replace(key, "").split() if i]
558            break
559    return data, weather
560
561
562def _intensity(data: List[str]) -> Tuple[List[str], Optional[Code]]:
563    if not data:
564        return data, None
565    try:
566        value = INTENSITY[data[-1]]
567        code = data.pop()
568        return data, Code(repr=code, value=value)
569    except KeyError:
570        return data, None
571
572
573def _sigmet_observation(
574    data: List[str], units: Units, issued: Optional[date] = None
575) -> Tuple[AirSigObservation, Units]:
576    data, start_time, end_time = _time(data, issued)
577    data, position = _position(data)
578    data, coords, bounds = _bounds(data)
579    data, units, movement = _movement(data, units)
580    data, intensity = _intensity(data)
581    data, units, floor, ceiling = _altitudes(data, units)
582    data, weather = _weather_type(data)
583    struct = AirSigObservation(
584        type=weather,
585        start_time=start_time,
586        end_time=end_time,
587        position=position,
588        floor=floor,
589        ceiling=ceiling,
590        coords=coords,
591        bounds=bounds,
592        movement=movement,
593        intensity=intensity,
594        other=_clean_flags(data),
595    )
596    return struct, units
597
598
599def _observations(
600    data: List[str], units: Units, issued: Optional[date] = None
601) -> Tuple[Units, Optional[AirSigObservation], Optional[AirSigObservation]]:
602    observation, forecast, forecast_index = None, None, -1
603    forecast_index = _first_index(data, "FCST", "OUTLOOK", "OTLK")
604    if forecast_index == -1:
605        observation, units = _sigmet_observation(data, units, issued)
606    # 6 is arbitrary. Will likely change or be more precise later
607    elif forecast_index < 6:
608        forecast, units = _sigmet_observation(data, units, issued)
609    else:
610        observation, units = _sigmet_observation(data[:forecast_index], units, issued)
611        forecast, units = _sigmet_observation(data[forecast_index:], units, issued)
612    return units, observation, forecast
613
614
615_REPLACE = {
616    " MO V ": " MOV ",
617    " STNRY": " STNR",
618    " STCNRY": " STNR",
619    " N-NE ": " NNE ",
620    " N-NW ": " NNW ",
621    " E-NE ": " ENE ",
622    " E-SE ": " ESE ",
623    " S-SE ": " SSE ",
624    " S-SW ": " SSW ",
625    " W-SW ": " WSW ",
626    " W-NW ": " WNW ",
627}
628
629
630def _find_first_digit(item: str) -> int:
631    return next((i for i, char in enumerate(item) if char.isdigit()), -1)
632
633
634def sanitize(report: str) -> str:
635    """Sanitized AIRMET / SIGMET report string"""
636    report = report.strip(" =")
637    for key, val in _REPLACE.items():
638        report = report.replace(key, val)
639    data = report.split()
640    for i, item in reversed(list(enumerate(data))):
641        # Remove extra element on altitude Ex: FL450Z skip 1000FT
642        if (
643            len(item) > 4
644            and not item[-1].isdigit()
645            and item[-2:] != "FT"
646            and item[-1] != "M"
647            and core.is_altitude(item[:-1])
648        ):
649            data[i] = item[:-1]
650        # Split attached movement direction Ex: NE05KT
651        if (
652            len(item) > 4
653            and (item.endswith("KT") or item.endswith("KMH"))
654            and item[: _find_first_digit(item)] in CARDINALS
655        ):
656            index = _find_first_digit(item)
657            direction = item[:index]
658            data.insert(i + 1, item[index:])
659            data[i] = direction
660    return " ".join(data)
661
662
663def parse(report: str, issued: Optional[date] = None) -> Tuple[AirSigmetData, Units]:
664    """Parse AIRMET / SIGMET report string"""
665    # pylint: disable=too-many-locals
666    units = Units.international()
667    sanitized = sanitize(report)
668    data, bulletin, issuer, time, correction = _header(_parse_prep(sanitized))
669    data, area, report_type, start_time, end_time, station = _spacetime(data)
670    body = sanitized[sanitized.find(" ".join(data[:2])) :]
671    # Trim AIRMET type
672    if data[0] == "AIRMET":
673        with suppress(ValueError):
674            data = data[data.index("<elip>") + 1 :]
675    data, region = _region(data)
676    units, observation, forecast = _observations(data, units, issued)
677    struct = AirSigmetData(
678        raw=report,
679        sanitized=sanitized,
680        station=station,
681        time=core.make_timestamp(time, target_date=issued),
682        remarks=None,
683        bulletin=bulletin,
684        issuer=issuer,
685        correction=correction,
686        area=area,
687        type=report_type,
688        start_time=core.make_timestamp(start_time, target_date=issued),
689        end_time=core.make_timestamp(end_time, target_date=issued),
690        body=body,
691        region=region,
692        observation=observation,
693        forecast=forecast,
694    )
695    return struct, units
class AirSigmet(avwx.base.AVWXBase):
 59class AirSigmet(AVWXBase):
 60    """
 61    In addition to the manager, you can use the `avwx.AirSigmet` class like any
 62    other report when you supply the report string via `parse` or
 63    `from_report`.
 64
 65    ```python
 66    >>> from avwx import AirSigmet
 67    >>> report = 'WSPR31 SPJC 270529 SPIM SIGMET 3 VALID 270530/270830 SPJC- SPIM LIMA FIR EMBD TS OBS AT 0510Z NE OF LINE S0406 W07103 - S0358 W07225 - S0235 W07432 - S0114 W07503 TOP FL410 MOV SW NC='
 68    >>> sigmet = AirSigmet.from_report(report)
 69    True
 70    >>> sigmet.last_updated
 71    datetime.datetime(2022, 3, 27, 6, 29, 33, 300935, tzinfo=datetime.timezone.utc)
 72    >>> sigmet.data.observation.coords
 73    [Coord(lat=-4.06, lon=-71.03, repr='S0406 W07103'),
 74    Coord(lat=-3.58, lon=-72.25, repr='S0358 W07225'),
 75    Coord(lat=-2.35, lon=-74.32, repr='S0235 W07432'),
 76    Coord(lat=-1.14, lon=-75.03, repr='S0114 W07503')]
 77    >>> sigmet.data.observation.intensity
 78    Code(repr='NC', value='No change')
 79    >>> sigmet.data.observation.ceiling
 80    Number(repr='FL410', value=410, spoken='flight level four one zero')
 81    ```
 82    """
 83
 84    data: Optional[AirSigmetData] = None
 85
 86    def _post_parse(self) -> None:
 87        if self.raw:
 88            self.data, self.units = parse(self.raw, self.issued)
 89
 90    @staticmethod
 91    def sanitize(report: str) -> str:
 92        """Sanitizes the report string"""
 93        return sanitize(report)
 94
 95    def intersects(self, path: LineString) -> bool:
 96        """Returns True if the report area intersects a flight path"""
 97        if LineString is None:
 98            raise ModuleNotFoundError("Install avwx-engine[shape] to use this feature")
 99        if not self.data:
100            return False
101        for data in (self.data.observation, self.data.forecast):
102            if data:
103                poly = data.poly
104                if poly and path.intersects(poly):
105                    return True
106        return False
107
108    def contains(self, coord: Coord) -> bool:
109        """Returns True if the report area contains a coordinate"""
110        if not self.data:
111            return False
112        for data in (self.data.observation, self.data.forecast):
113            if data:
114                poly = data.poly
115                if poly and coord.point.within(poly):
116                    return True
117        return False

In addition to the manager, you can use the avwx.AirSigmet class like any other report when you supply the report string via parse or from_report.

>>> from avwx import AirSigmet
>>> report = 'WSPR31 SPJC 270529 SPIM SIGMET 3 VALID 270530/270830 SPJC- SPIM LIMA FIR EMBD TS OBS AT 0510Z NE OF LINE S0406 W07103 - S0358 W07225 - S0235 W07432 - S0114 W07503 TOP FL410 MOV SW NC='
>>> sigmet = AirSigmet.from_report(report)
True
>>> sigmet.last_updated
datetime.datetime(2022, 3, 27, 6, 29, 33, 300935, tzinfo=datetime.timezone.utc)
>>> sigmet.data.observation.coords
[Coord(lat=-4.06, lon=-71.03, repr='S0406 W07103'),
Coord(lat=-3.58, lon=-72.25, repr='S0358 W07225'),
Coord(lat=-2.35, lon=-74.32, repr='S0235 W07432'),
Coord(lat=-1.14, lon=-75.03, repr='S0114 W07503')]
>>> sigmet.data.observation.intensity
Code(repr='NC', value='No change')
>>> sigmet.data.observation.ceiling
Number(repr='FL410', value=410, spoken='flight level four one zero')
data: Optional[avwx.structs.AirSigmetData] = None
@staticmethod
def sanitize(report: str) -> str:
90    @staticmethod
91    def sanitize(report: str) -> str:
92        """Sanitizes the report string"""
93        return sanitize(report)

Sanitizes the report string

def intersects(self, path: shapely.geometry.linestring.LineString) -> bool:
 95    def intersects(self, path: LineString) -> bool:
 96        """Returns True if the report area intersects a flight path"""
 97        if LineString is None:
 98            raise ModuleNotFoundError("Install avwx-engine[shape] to use this feature")
 99        if not self.data:
100            return False
101        for data in (self.data.observation, self.data.forecast):
102            if data:
103                poly = data.poly
104                if poly and path.intersects(poly):
105                    return True
106        return False

Returns True if the report area intersects a flight path

def contains(self, coord: avwx.structs.Coord) -> bool:
108    def contains(self, coord: Coord) -> bool:
109        """Returns True if the report area contains a coordinate"""
110        if not self.data:
111            return False
112        for data in (self.data.observation, self.data.forecast):
113            if data:
114                poly = data.poly
115                if poly and coord.point.within(poly):
116                    return True
117        return False

Returns True if the report area contains a coordinate

class AirSigManager:
120class AirSigManager:
121    """
122    Because of the global nature of these report types, we don't initialize a
123    report class with a station ident like the other report types. Instead, we
124    use a class to manage and update the list of all active SIGMET and AIRMET
125    reports.
126
127    ```python
128    >>> from avwx import AirSigManager
129    >>> from avwx.structs import Coord
130    >>> manager = AirSigManager()
131    >>> manager.update()
132    True
133    >>> manager.last_updated
134    datetime.datetime(2022, 3, 27, 5, 54, 21, 516741, tzinfo=datetime.timezone.utc)
135    >>> len(manager.reports)
136    113
137    >>> len(manager.contains(Coord(lat=33.12, lon=-105)))
138    5
139    >>> manager.reports[0].data.bulletin.type
140    Code(repr='WA', value='airmet')
141    >>> manager.reports[0].data.type
142    'AIRMET SIERRA FOR IFR AND MTN OBSCN'
143    ```
144    """
145
146    _services: List[Service]
147    _raw: List[Tuple[str, Optional[str]]]
148    last_updated: Optional[datetime] = None
149    raw: List[str]
150    reports: Optional[List[AirSigmet]] = None
151
152    def __init__(self):  # type: ignore
153        self._services = [NOAA_Bulk("airsigmet"), NOAA_Intl("airsigmet")]
154        self._raw, self.raw = [], []
155
156    async def _update(
157        self, index: int, timeout: int
158    ) -> List[Tuple[str, Optional[str]]]:
159        source = self._services[index].root
160        reports = await self._services[index].async_fetch(timeout=timeout)  # type: ignore
161        raw: List[Tuple[str, Optional[str]]] = [
162            (report, source) for report in reports if report
163        ]
164        return raw
165
166    def update(self, timeout: int = 10, disable_post: bool = False) -> bool:
167        """Updates fetched reports and returns whether they've changed"""
168        return aio.run(self.async_update(timeout, disable_post))
169
170    async def async_update(self, timeout: int = 10, disable_post: bool = False) -> bool:
171        """Updates fetched reports and returns whether they've changed"""
172        coros = [self._update(i, timeout) for i in range(len(self._services))]
173        data = await aio.gather(*coros)
174        raw = list(chain.from_iterable(data))
175        reports = [i[0] for i in raw]
176        if raw == self._raw:
177            return False
178        self._raw, self.raw = raw, reports
179        self.last_updated = datetime.now(tz=timezone.utc)
180        # Parse reports if not disabled
181        if not disable_post:
182            parsed = []
183            for report, source in raw:
184                try:
185                    if obj := AirSigmet.from_report(report):
186                        obj.source = source
187                        parsed.append(obj)
188                except Exception as exc:  # pylint: disable=broad-except
189                    exceptions.exception_intercept(exc, raw={"report": report})
190            self.reports = parsed
191        return True
192
193    def along(self, coords: List[Coord]) -> List[AirSigmet]:
194        """Returns available reports the intersect a flight path"""
195        if LineString is None:
196            raise ModuleNotFoundError("Install avwx-engine[shape] to use this feature")
197        if self.reports is None:
198            return []
199        path = LineString([c.pair for c in coords])
200        return [r for r in self.reports if r.intersects(path)]
201
202    def contains(self, coord: Coord) -> List[AirSigmet]:
203        """Returns available reports that contain a coordinate"""
204        if self.reports is None:
205            return []
206        return [r for r in self.reports if r.contains(coord)]

Because of the global nature of these report types, we don't initialize a report class with a station ident like the other report types. Instead, we use a class to manage and update the list of all active SIGMET and AIRMET reports.

>>> from avwx import AirSigManager
>>> from avwx.structs import Coord
>>> manager = AirSigManager()
>>> manager.update()
True
>>> manager.last_updated
datetime.datetime(2022, 3, 27, 5, 54, 21, 516741, tzinfo=datetime.timezone.utc)
>>> len(manager.reports)
113
>>> len(manager.contains(Coord(lat=33.12, lon=-105)))
5
>>> manager.reports[0].data.bulletin.type
Code(repr='WA', value='airmet')
>>> manager.reports[0].data.type
'AIRMET SIERRA FOR IFR AND MTN OBSCN'
last_updated: Optional[datetime.datetime] = None
raw: List[str]
reports: Optional[List[AirSigmet]] = None
def update(self, timeout: int = 10, disable_post: bool = False) -> bool:
166    def update(self, timeout: int = 10, disable_post: bool = False) -> bool:
167        """Updates fetched reports and returns whether they've changed"""
168        return aio.run(self.async_update(timeout, disable_post))

Updates fetched reports and returns whether they've changed

async def async_update(self, timeout: int = 10, disable_post: bool = False) -> bool:
170    async def async_update(self, timeout: int = 10, disable_post: bool = False) -> bool:
171        """Updates fetched reports and returns whether they've changed"""
172        coros = [self._update(i, timeout) for i in range(len(self._services))]
173        data = await aio.gather(*coros)
174        raw = list(chain.from_iterable(data))
175        reports = [i[0] for i in raw]
176        if raw == self._raw:
177            return False
178        self._raw, self.raw = raw, reports
179        self.last_updated = datetime.now(tz=timezone.utc)
180        # Parse reports if not disabled
181        if not disable_post:
182            parsed = []
183            for report, source in raw:
184                try:
185                    if obj := AirSigmet.from_report(report):
186                        obj.source = source
187                        parsed.append(obj)
188                except Exception as exc:  # pylint: disable=broad-except
189                    exceptions.exception_intercept(exc, raw={"report": report})
190            self.reports = parsed
191        return True

Updates fetched reports and returns whether they've changed

def along( self, coords: List[avwx.structs.Coord]) -> List[AirSigmet]:
193    def along(self, coords: List[Coord]) -> List[AirSigmet]:
194        """Returns available reports the intersect a flight path"""
195        if LineString is None:
196            raise ModuleNotFoundError("Install avwx-engine[shape] to use this feature")
197        if self.reports is None:
198            return []
199        path = LineString([c.pair for c in coords])
200        return [r for r in self.reports if r.intersects(path)]

Returns available reports the intersect a flight path

def contains( self, coord: avwx.structs.Coord) -> List[AirSigmet]:
202    def contains(self, coord: Coord) -> List[AirSigmet]:
203        """Returns available reports that contain a coordinate"""
204        if self.reports is None:
205            return []
206        return [r for r in self.reports if r.contains(coord)]

Returns available reports that contain a coordinate

def sanitize(report: str) -> str:
635def sanitize(report: str) -> str:
636    """Sanitized AIRMET / SIGMET report string"""
637    report = report.strip(" =")
638    for key, val in _REPLACE.items():
639        report = report.replace(key, val)
640    data = report.split()
641    for i, item in reversed(list(enumerate(data))):
642        # Remove extra element on altitude Ex: FL450Z skip 1000FT
643        if (
644            len(item) > 4
645            and not item[-1].isdigit()
646            and item[-2:] != "FT"
647            and item[-1] != "M"
648            and core.is_altitude(item[:-1])
649        ):
650            data[i] = item[:-1]
651        # Split attached movement direction Ex: NE05KT
652        if (
653            len(item) > 4
654            and (item.endswith("KT") or item.endswith("KMH"))
655            and item[: _find_first_digit(item)] in CARDINALS
656        ):
657            index = _find_first_digit(item)
658            direction = item[:index]
659            data.insert(i + 1, item[index:])
660            data[i] = direction
661    return " ".join(data)

Sanitized AIRMET / SIGMET report string

def parse( report: str, issued: Optional[datetime.date] = None) -> Tuple[avwx.structs.AirSigmetData, avwx.structs.Units]:
664def parse(report: str, issued: Optional[date] = None) -> Tuple[AirSigmetData, Units]:
665    """Parse AIRMET / SIGMET report string"""
666    # pylint: disable=too-many-locals
667    units = Units.international()
668    sanitized = sanitize(report)
669    data, bulletin, issuer, time, correction = _header(_parse_prep(sanitized))
670    data, area, report_type, start_time, end_time, station = _spacetime(data)
671    body = sanitized[sanitized.find(" ".join(data[:2])) :]
672    # Trim AIRMET type
673    if data[0] == "AIRMET":
674        with suppress(ValueError):
675            data = data[data.index("<elip>") + 1 :]
676    data, region = _region(data)
677    units, observation, forecast = _observations(data, units, issued)
678    struct = AirSigmetData(
679        raw=report,
680        sanitized=sanitized,
681        station=station,
682        time=core.make_timestamp(time, target_date=issued),
683        remarks=None,
684        bulletin=bulletin,
685        issuer=issuer,
686        correction=correction,
687        area=area,
688        type=report_type,
689        start_time=core.make_timestamp(start_time, target_date=issued),
690        end_time=core.make_timestamp(end_time, target_date=issued),
691        body=body,
692        region=region,
693        observation=observation,
694        forecast=forecast,
695    )
696    return struct, units

Parse AIRMET / SIGMET report string