avwx.current.airsigmet

A SIGMET (Significant Meteorological Information) is a weather advisory for the safety of all aircraft. They are divided into:

  • Convective - thunderstorms, hail, and cyclones
  • Non-Convective - turbulence, icing, dust clouds, volcanic activity, and radiation

An AIRMET (Airman's Meteorological Information) is a weather advisory for smaller aircraft or VFR navigation. They are divided into:

  • Sierra - IFR conditions like low ceilings and mountain obscuration
  • Tango - turbulence and high surface winds
  • Zulu - icing and freezing levels

Both types share a similar report format and therefore are combined into a single handling class. The Bulletin and weather type can be used to classify each as a SIGMET or AIRMET for filtering purposes.

  1"""
  2A SIGMET (Significant Meteorological Information) is a weather advisory for the
  3safety of all aircraft. They are divided into:
  4
  5- Convective - thunderstorms, hail, and cyclones
  6- Non-Convective - turbulence, icing, dust clouds, volcanic activity, and radiation
  7
  8An AIRMET (Airman's Meteorological Information) is a weather advisory for
  9smaller aircraft or VFR navigation. They are divided into:
 10
 11- Sierra - IFR conditions like low ceilings and mountain obscuration
 12- Tango - turbulence and high surface winds
 13- Zulu - icing and freezing levels
 14
 15Both types share a similar report format and therefore are combined into a
 16single handling class. The `Bulletin` and weather type can be used to classify
 17each as a SIGMET or AIRMET for filtering purposes.
 18"""
 19
 20# stdlib
 21import asyncio as aio
 22import re
 23from contextlib import suppress
 24from datetime import date, datetime, timezone
 25from itertools import chain
 26from typing import List, Optional, Tuple
 27
 28# library
 29from geopy.distance import distance as geo_distance  # type: ignore
 30
 31# module
 32from avwx import exceptions
 33from avwx.base import AVWXBase
 34from avwx.flight_path import to_coordinates
 35from avwx.load_utils import LazyLoad
 36from avwx.parsing import core
 37from avwx.service.bulk import NOAA_Bulk, NOAA_Intl, Service
 38from avwx.static.core import CARDINAL_DEGREES, CARDINALS
 39from avwx.static.airsigmet import BULLETIN_TYPES, INTENSITY, WEATHER_TYPES
 40from avwx.structs import (
 41    AirSigmetData,
 42    Bulletin,
 43    Code,
 44    Coord,
 45    Movement,
 46    Number,
 47    AirSigObservation,
 48    Timestamp,
 49    Units,
 50)
 51
 52try:
 53    from shapely.geometry import LineString  # type: ignore
 54except ModuleNotFoundError:
 55    LineString = None  # pylint: disable=invalid-name
 56
 57
 58class AirSigmet(AVWXBase):
 59    """
 60    In addition to the manager, you can use the `avwx.AirSigmet` class like any
 61    other report when you supply the report string via `parse` or
 62    `from_report`.
 63
 64    ```python
 65    >>> from avwx import AirSigmet
 66    >>> report = 'WSPR31 SPJC 270529 SPIM SIGMET 3 VALID 270530/270830 SPJC- SPIM LIMA FIR EMBD TS OBS AT 0510Z NE OF LINE S0406 W07103 - S0358 W07225 - S0235 W07432 - S0114 W07503 TOP FL410 MOV SW NC='
 67    >>> sigmet = AirSigmet.from_report(report)
 68    True
 69    >>> sigmet.last_updated
 70    datetime.datetime(2022, 3, 27, 6, 29, 33, 300935, tzinfo=datetime.timezone.utc)
 71    >>> sigmet.data.observation.coords
 72    [Coord(lat=-4.06, lon=-71.03, repr='S0406 W07103'),
 73    Coord(lat=-3.58, lon=-72.25, repr='S0358 W07225'),
 74    Coord(lat=-2.35, lon=-74.32, repr='S0235 W07432'),
 75    Coord(lat=-1.14, lon=-75.03, repr='S0114 W07503')]
 76    >>> sigmet.data.observation.intensity
 77    Code(repr='NC', value='No change')
 78    >>> sigmet.data.observation.ceiling
 79    Number(repr='FL410', value=410, spoken='flight level four one zero')
 80    ```
 81    """
 82
 83    data: Optional[AirSigmetData] = None
 84
 85    def _post_parse(self) -> None:
 86        if self.raw:
 87            self.data, self.units = parse(self.raw, self.issued)
 88
 89    @staticmethod
 90    def sanitize(report: str) -> str:
 91        """Sanitizes the report string"""
 92        return sanitize(report)
 93
 94    def intersects(self, path: LineString) -> bool:
 95        """Returns True if the report area intersects a flight path"""
 96        if LineString is None:
 97            raise ModuleNotFoundError("Install avwx-engine[shape] to use this feature")
 98        if not self.data:
 99            return False
100        for data in (self.data.observation, self.data.forecast):
101            if data:
102                poly = data.poly
103                if poly and path.intersects(poly):
104                    return True
105        return False
106
107    def contains(self, coord: Coord) -> bool:
108        """Returns True if the report area contains a coordinate"""
109        if not self.data:
110            return False
111        for data in (self.data.observation, self.data.forecast):
112            if data:
113                poly = data.poly
114                if poly and coord.point.within(poly):
115                    return True
116        return False
117
118
119class AirSigManager:
120    """
121    Because of the global nature of these report types, we don't initialize a
122    report class with a station ident like the other report types. Instead, we
123    use a class to manage and update the list of all active SIGMET and AIRMET
124    reports.
125
126    ```python
127    >>> from avwx import AirSigManager
128    >>> from avwx.structs import Coord
129    >>> manager = AirSigManager()
130    >>> manager.update()
131    True
132    >>> manager.last_updated
133    datetime.datetime(2022, 3, 27, 5, 54, 21, 516741, tzinfo=datetime.timezone.utc)
134    >>> len(manager.reports)
135    113
136    >>> len(manager.contains(Coord(lat=33.12, lon=-105)))
137    5
138    >>> manager.reports[0].data.bulletin.type
139    Code(repr='WA', value='airmet')
140    >>> manager.reports[0].data.type
141    'AIRMET SIERRA FOR IFR AND MTN OBSCN'
142    ```
143    """
144
145    _services: List[Service]
146    _raw: List[Tuple[str, str]]
147    last_updated: Optional[datetime] = None
148    raw: List[str]
149    reports: Optional[List[AirSigmet]] = None
150
151    def __init__(self):  # type: ignore
152        self._services = [NOAA_Bulk("airsigmet"), NOAA_Intl("airsigmet")]
153        self._raw, self.raw = [], []
154
155    async def _update(
156        self, index: int, timeout: int
157    ) -> List[Tuple[str, Optional[str]]]:
158        source = self._services[index].root
159        reports = await self._services[index].async_fetch(timeout=timeout)  # type: ignore
160        raw: List[Tuple[str, Optional[str]]] = [
161            (report, source) for report in reports if report
162        ]
163        return raw
164
165    def update(self, timeout: int = 10, disable_post: bool = False) -> bool:
166        """Updates fetched reports and returns whether they've changed"""
167        return aio.run(self.async_update(timeout, disable_post))
168
169    async def async_update(self, timeout: int = 10, disable_post: bool = False) -> bool:
170        """Updates fetched reports and returns whether they've changed"""
171        coros = [self._update(i, timeout) for i in range(len(self._services))]
172        data = await aio.gather(*coros)
173        raw = list(chain.from_iterable(data))
174        reports = [i[0] for i in raw]
175        changed = raw != self.raw
176        if changed:
177            self._raw, self.raw = raw, reports
178            self.last_updated = datetime.now(tz=timezone.utc)
179        # Parse reports if not disabled
180        if not disable_post:
181            parsed = []
182            for report, source in raw:
183                try:
184                    if obj := AirSigmet.from_report(report):
185                        obj.source = source
186                        parsed.append(obj)
187                except Exception as exc:  # pylint: disable=broad-except
188                    exceptions.exception_intercept(exc, raw=report)
189            self.reports = parsed
190        return changed
191
192    def along(self, coords: List[Coord]) -> List[AirSigmet]:
193        """Returns available reports the intersect a flight path"""
194        if LineString is None:
195            raise ModuleNotFoundError("Install avwx-engine[shape] to use this feature")
196        if self.reports is None:
197            return []
198        path = LineString([c.pair for c in coords])
199        return [r for r in self.reports if r.intersects(path)]
200
201    def contains(self, coord: Coord) -> List[AirSigmet]:
202        """Returns available reports that contain a coordinate"""
203        if self.reports is None:
204            return []
205        return [r for r in self.reports if r.contains(coord)]
206
207
208# N1429 W09053 - N1427 W09052 - N1411 W09139 - N1417 W09141
209_COORD_PATTERN = re.compile(r"\b[NS]\d{4} [EW]\d{5}\b( -)?")
210
211# FROM 60NW ISN-INL-TVC-GIJ-UIN-FSD-BIL-60NW ISN
212# FROM 70SSW ISN TO 20NNW FAR TO 70E DLH TO 40SE EAU TO 80SE RAP TO 40NNW BFF TO 70SSW
213_NAVAID_PATTERN = re.compile(
214    r"\b(\d{1,3}[NESW]{1,3} [A-z]{3}-?\b)|((-|(TO )|(FROM ))[A-z]{3}\b)"
215)
216
217# N OF N2050 AND S OF N2900
218_LATTERAL_PATTERN = re.compile(
219    r"\b([NS] OF [NS]\d{2,4})|([EW] OF [EW]\d{3,5})( AND)?\b"
220)
221
222NAVAIDS = LazyLoad("navaids")
223
224# Used to assist parsing after sanitized. Removed after parse
225_FLAGS = {
226    "...": " <elip> ",
227    "..": " <elip> ",
228    ". ": " <break> ",
229    "/VIS ": " <vis> VIS ",
230}
231
232
233def _parse_prep(report: str) -> List[str]:
234    """Prepares sanitized string by replacing elements with flags"""
235    report = report.rstrip(".")
236    for key, val in _FLAGS.items():
237        report = report.replace(key, val)
238    return report.split()
239
240
241def _clean_flags(data: List[str]) -> List[str]:
242    return [i for i in data if i[0] != "<"]
243
244
245def _bulletin(value: str) -> Bulletin:
246    # if len(value) != 6:
247    #     return None
248    type_key = value[:2]
249    return Bulletin(
250        repr=value,
251        type=Code(repr=type_key, value=BULLETIN_TYPES[type_key]),
252        country=value[2:4],
253        number=int(value[4:]),
254    )
255
256
257def _header(data: List[str]) -> Tuple[List[str], Bulletin, str, str, Optional[str]]:
258    bulletin = _bulletin(data[0])
259    correction, end = (data[3], 4) if len(data[3]) == 3 else (None, 3)
260    return data[end:], bulletin, data[1], data[2], correction
261
262
263def _spacetime(
264    data: List[str],
265) -> Tuple[List[str], str, str, Optional[str], str, Optional[str]]:
266    area = data.pop(0)
267    # Skip airmet type + time repeat
268    if data[0] == "WA" and data[1].isdigit():
269        data = data[2:]
270        area = area[:-1]  # Remove type label from 3-letter ident
271    valid_index = data.index("VALID")
272    report_type = " ".join(data[:valid_index])
273    data = data[valid_index + 1 :]
274    if data[0] == "UNTIL":
275        start_time = None
276        end_time = data[1]
277        data = data[2:]
278    else:
279        target = "-" if "-" in data[0] else "/"
280        start_time, end_time = data.pop(0).split(target)
281    # KMCO- ORL FIR
282    if data[0][-1] == "-":
283        station = data.pop(0)[:-1]
284    # KMCO - KMCO
285    elif data[1] == "-" and len(data[0]) == 4:
286        station = data.pop(0)
287        data.pop(0)
288    else:
289        station = None
290    return data, area, report_type, start_time, end_time, station
291
292
293def _first_index(data: List[str], *targets: str) -> int:
294    for target in targets:
295        with suppress(ValueError):
296            return data.index(target)
297    return -1
298
299
300def _region(data: List[str]) -> Tuple[List[str], str]:
301    # FIR/CTA region name
302    name_end = _first_index(data, "FIR", "CTA") + 1
303    # Non-standard name using lookahead Ex: FL CSTL WTRS FROM 100SSW
304    if not name_end:
305        name_end = _first_index(data, "FROM")
306    # State list
307    if not name_end:
308        for item in data:
309            if len(item) == 2:
310                name_end += 1
311            else:
312                break
313    name = " ".join(data[:name_end])
314    return data[name_end:], name
315
316
317def _time(
318    data: List[str], issued: Optional[date] = None
319) -> Tuple[List[str], Optional[Timestamp], Optional[Timestamp]]:
320    """Extracts the start and/or end time based on a couple starting elements"""
321    index = _first_index(data, "AT", "FCST", "UNTIL", "VALID", "OUTLOOK", "OTLK")
322    if index == -1:
323        return data, None, None
324    start_item = data.pop(index)
325    start, end, observed = None, None, None
326    if "-" in data[index]:
327        start_item, end_item = data.pop(index).split("-")
328        start = core.make_timestamp(
329            start_item, time_only=len(start_item) < 6, target_date=issued
330        )
331        end = core.make_timestamp(
332            end_item, time_only=len(end_item) < 6, target_date=issued
333        )
334    elif len(data[index]) >= 4 and data[index][:4].isdigit():
335        observed = core.make_timestamp(
336            data.pop(index), time_only=True, target_date=issued
337        )
338        if index > 0 and data[index - 1] == "OBS":
339            data.pop(index - 1)
340    for remv in ("FCST", "OUTLOOK", "OTLK", "VALID"):
341        with suppress(ValueError):
342            data.remove(remv)
343    if observed:
344        if start_item in ("UNTIL", "VALID"):
345            end = observed
346        else:
347            start = observed
348    return data, start, end
349
350
351def _coord_value(value: str) -> float:
352    if value[0] in ("N", "S"):
353        index, strip, replace = 3, "N", "S"
354    else:
355        index, strip, replace = 4, "E", "W"
356    num = f"{value[:index]}.{value[index:]}".lstrip(strip).replace(replace, "-")
357    return float(num)
358
359
360def _position(data: List[str]) -> Tuple[List[str], Optional[Coord]]:
361    try:
362        index = data.index("PSN")
363    except ValueError:
364        return data, None
365    data.pop(index)
366    raw = f"{data[index]} {data[index + 1]}"
367    lat = _coord_value(data.pop(index))
368    lon = _coord_value(data.pop(index))
369    return data, Coord(lat=lat, lon=lon, repr=raw)
370
371
372def _movement(
373    data: List[str], units: Units
374) -> Tuple[List[str], Units, Optional[Movement]]:
375    with suppress(ValueError):
376        data.remove("STNR")
377        speed = core.make_number("STNR")
378        return data, units, Movement(repr="STNR", direction=None, speed=speed)
379    try:
380        index = data.index("MOV")
381    except ValueError:
382        return data, units, None
383    raw = data.pop(index)
384    direction_str = data.pop(index)
385    # MOV CNL
386    if direction_str == "CNL":
387        return data, units, None
388    raw += f" {direction_str} "
389    # MOV FROM 23040KT
390    if direction_str == "FROM":
391        value = data[index][:3]
392        raw += value
393        direction = core.make_number(value)
394        data[index] = data[index][3:]
395    # MOV E 45KMH
396    else:
397        direction = core.make_number(
398            direction_str.replace("/", ""), literal=True, special=CARDINAL_DEGREES
399        )
400    speed = None
401    with suppress(IndexError):
402        kt_unit, kmh_unit = data[index].endswith("KT"), data[index].endswith("KMH")
403        if kt_unit or kmh_unit:
404            units.wind_speed = "kmh" if kmh_unit else "kt"
405            speed_str = data.pop(index)
406            raw += speed_str
407            # Remove bottom speed Ex: MOV W 05-10KT
408            if "-" in speed_str:
409                speed_str = speed_str[speed_str.find("-") + 1 :]
410            speed = core.make_number(speed_str[: -3 if kmh_unit else -2])
411    return data, units, Movement(repr=raw.strip(), direction=direction, speed=speed)
412
413
414def _info_from_match(match: re.Match, start: int) -> Tuple[str, int]:
415    """Returns the matching text and starting location if none yet available"""
416    if start == -1:
417        start = match.start()
418    return match.group(), start
419
420
421def _pre_break(report: str) -> str:
422    break_index = report.find(" <break> ")
423    return report[:break_index] if break_index != -1 else report
424
425
426def _bounds_from_latterals(report: str, start: int) -> Tuple[str, List[str], int]:
427    """Extract coordinate latterals from report Ex: N OF N2050"""
428    bounds = []
429    for match in _LATTERAL_PATTERN.finditer(_pre_break(report)):
430        group, start = _info_from_match(match, start)
431        # post 3.8 bounds.append(group.removesuffix(" AND"))
432        if group.endswith(" AND"):
433            group = group[:-4]
434        bounds.append(group)
435        report = report.replace(group, " ")
436    return report, bounds, start
437
438
439def _coords_from_text(report: str, start: int) -> Tuple[str, List[Coord], int]:
440    """Extract raw coordinate values from report Ex: N4409 E01506"""
441    coords = []
442    for match in _COORD_PATTERN.finditer(_pre_break(report)):
443        group, start = _info_from_match(match, start)
444        text = group.strip(" -")
445        lat, lon = text.split()
446        coord = Coord(lat=_coord_value(lat), lon=_coord_value(lon), repr=text)
447        coords.append(coord)
448        report = report.replace(group, " ")
449    return report, coords, start
450
451
452def _coords_from_navaids(report: str, start: int) -> Tuple[str, List[Coord], int]:
453    """Extract navaid referenced coordinates from report Ex: 30SSW BNA"""
454    # pylint: disable=too-many-locals
455    coords, navs = [], []
456    for match in _NAVAID_PATTERN.finditer(_pre_break(report)):
457        group, start = _info_from_match(match, start)
458        report = report.replace(group, " ")
459        group = group.strip("-")  # post 3.8 .removeprefix("FROM ").removeprefix("TO ")
460        for end in ("FROM", "TO"):
461            if group.startswith(f"{end} "):
462                group = group[(len(end) + 1) :]
463        navs.append((group, *group.split()))
464    locs = to_coordinates([n[2 if len(n) == 3 else 1] for n in navs])
465    for i, nav in enumerate(navs):
466        value = nav[0]
467        if len(nav) == 3:
468            vector, num_index = nav[1], 0
469            while vector[num_index].isdigit():
470                num_index += 1
471            distance, bearing = (
472                int(vector[:num_index]),
473                CARDINAL_DEGREES[vector[num_index:]],
474            )
475            loc = geo_distance(nautical=distance).destination(
476                locs[i].pair, bearing=bearing
477            )
478            coord = Coord(lat=loc.latitude, lon=loc.longitude, repr=value)
479        else:
480            coord = locs[i]
481            coord.repr = value
482        coords.append(coord)
483    return report, coords, start
484
485
486def _bounds(data: List[str]) -> Tuple[List[str], List[Coord], List[str]]:
487    """Extract coordinate bounds by coord, navaid, and latterals"""
488    report, start = " ".join(data), -1
489    report, bounds, start = _bounds_from_latterals(report, start)
490    report, coords, start = _coords_from_text(report, start)
491    report, navs, start = _coords_from_navaids(report, start)
492    coords += navs
493    for target in ("FROM", "WI", "BOUNDED", "OBS"):
494        index = report.find(f"{target} ")
495        if index != -1 and index < start:
496            start = index
497    report = report[:start] + report[report.rfind("  ") :]
498    data = [s for s in report.split() if s]
499    return data, coords, bounds
500
501
502def _altitudes(
503    data: List[str], units: Units
504) -> Tuple[List[str], Units, Optional[Number], Optional[Number]]:
505    """Extract the floor and ceiling altitudes"""
506    floor, ceiling = None, None
507    for i, item in enumerate(data):
508        # BTN FL180 AND FL330
509        if item == "BTN" and len(data) > i + 2 and data[i + 2] == "AND":
510            floor, units = core.make_altitude(data[i + 1], units)
511            ceiling, units = core.make_altitude(data[i + 3], units)
512            data = data[:i] + data[i + 4 :]
513            break
514        # TOPS ABV FL450
515        if item in ("TOP", "TOPS", "BLW"):
516            if data[i + 1] == "ABV":
517                ceiling = core.make_number(f"ABV {data[i + 2]}")
518                data = data[:i] + data[i + 3 :]
519                break
520            if data[i + 1] == "BLW":
521                ceiling = core.make_number(f"BLW {data[i + 2]}")
522                data = data[:i] + data[i + 3 :]
523                break
524            # TOPS TO FL310
525            if data[i + 1] == "TO":
526                data.pop(i)
527            ceiling, units = core.make_altitude(data[i + 1], units)
528            data = data[:i] + data[i + 2 :]
529            # CIG BLW 010
530            if data[i - 1] == "CIG":
531                data.pop(i - 1)
532            break
533        # FL060/300 SFC/FL160
534        if core.is_altitude(item):
535            if "/" in item:
536                floor_val, ceiling_val = item.split("/")
537                floor, units = core.make_altitude(floor_val, units)
538                if (floor_val == "SFC" or floor_val[:2] == "FL") and ceiling_val[
539                    :2
540                ] != "FL":
541                    ceiling, units = core.make_altitude(
542                        ceiling_val, units, force_fl=True
543                    )
544                else:
545                    ceiling, units = core.make_altitude(ceiling_val, units)
546            else:
547                ceiling, units = core.make_altitude(item, units)
548            data.pop(i)
549            break
550    return data, units, floor, ceiling
551
552
553def _weather_type(data: List[str]) -> Tuple[List[str], Optional[Code]]:
554    weather = None
555    report = " ".join(data)
556    for key, val in WEATHER_TYPES.items():
557        if key in report:
558            weather = Code(repr=key, value=val)
559            data = [i for i in report.replace(key, "").split() if i]
560            break
561    return data, weather
562
563
564def _intensity(data: List[str]) -> Tuple[List[str], Optional[Code]]:
565    if not data:
566        return data, None
567    try:
568        value = INTENSITY[data[-1]]
569        code = data.pop()
570        return data, Code(repr=code, value=value)
571    except KeyError:
572        return data, None
573
574
575def _sigmet_observation(
576    data: List[str], units: Units, issued: Optional[date] = None
577) -> Tuple[AirSigObservation, Units]:
578    data, start_time, end_time = _time(data, issued)
579    data, position = _position(data)
580    data, coords, bounds = _bounds(data)
581    data, units, movement = _movement(data, units)
582    data, intensity = _intensity(data)
583    data, units, floor, ceiling = _altitudes(data, units)
584    data, weather = _weather_type(data)
585    struct = AirSigObservation(
586        type=weather,
587        start_time=start_time,
588        end_time=end_time,
589        position=position,
590        floor=floor,
591        ceiling=ceiling,
592        coords=coords,
593        bounds=bounds,
594        movement=movement,
595        intensity=intensity,
596        other=_clean_flags(data),
597    )
598    return struct, units
599
600
601def _observations(
602    data: List[str], units: Units, issued: Optional[date] = None
603) -> Tuple[Units, Optional[AirSigObservation], Optional[AirSigObservation]]:
604    observation, forecast, forecast_index = None, None, -1
605    forecast_index = _first_index(data, "FCST", "OUTLOOK", "OTLK")
606    if forecast_index == -1:
607        observation, units = _sigmet_observation(data, units, issued)
608    # 6 is arbitrary. Will likely change or be more precise later
609    elif forecast_index < 6:
610        forecast, units = _sigmet_observation(data, units, issued)
611    else:
612        observation, units = _sigmet_observation(data[:forecast_index], units, issued)
613        forecast, units = _sigmet_observation(data[forecast_index:], units, issued)
614    return units, observation, forecast
615
616
617_REPLACE = {
618    " MO V ": " MOV ",
619    " STNRY": " STNR",
620    " STCNRY": " STNR",
621    " N-NE ": " NNE ",
622    " N-NW ": " NNW ",
623    " E-NE ": " ENE ",
624    " E-SE ": " ESE ",
625    " S-SE ": " SSE ",
626    " S-SW ": " SSW ",
627    " W-SW ": " WSW ",
628    " W-NW ": " WNW ",
629}
630
631
632def _find_first_digit(item: str) -> int:
633    return next((i for i, char in enumerate(item) if char.isdigit()), -1)
634
635
636def sanitize(report: str) -> str:
637    """Sanitized AIRMET / SIGMET report string"""
638    report = report.strip(" =")
639    for key, val in _REPLACE.items():
640        report = report.replace(key, val)
641    data = report.split()
642    for i, item in reversed(list(enumerate(data))):
643        # Remove extra element on altitude Ex: FL450Z skip 1000FT
644        if (
645            len(item) > 4
646            and not item[-1].isdigit()
647            and item[-2:] != "FT"
648            and item[-1] != "M"
649            and core.is_altitude(item[:-1])
650        ):
651            data[i] = item[:-1]
652        # Split attached movement direction Ex: NE05KT
653        if (
654            len(item) > 4
655            and (item.endswith("KT") or item.endswith("KMH"))
656            and item[: _find_first_digit(item)] in CARDINALS
657        ):
658            index = _find_first_digit(item)
659            direction = item[:index]
660            data.insert(i + 1, item[index:])
661            data[i] = direction
662    return " ".join(data)
663
664
665def parse(report: str, issued: Optional[date] = None) -> Tuple[AirSigmetData, Units]:
666    """Parse AIRMET / SIGMET report string"""
667    # pylint: disable=too-many-locals
668    units = Units.international()
669    sanitized = sanitize(report)
670    data, bulletin, issuer, time, correction = _header(_parse_prep(sanitized))
671    data, area, report_type, start_time, end_time, station = _spacetime(data)
672    body = sanitized[sanitized.find(" ".join(data[:2])) :]
673    # Trim AIRMET type
674    if data[0] == "AIRMET":
675        with suppress(ValueError):
676            data = data[data.index("<elip>") + 1 :]
677    data, region = _region(data)
678    units, observation, forecast = _observations(data, units, issued)
679    struct = AirSigmetData(
680        raw=report,
681        sanitized=sanitized,
682        station=station,
683        time=core.make_timestamp(time, target_date=issued),
684        remarks=None,
685        bulletin=bulletin,
686        issuer=issuer,
687        correction=correction,
688        area=area,
689        type=report_type,
690        start_time=core.make_timestamp(start_time, target_date=issued),
691        end_time=core.make_timestamp(end_time, target_date=issued),
692        body=body,
693        region=region,
694        observation=observation,
695        forecast=forecast,
696    )
697    return struct, units
class AirSigmet(avwx.base.AVWXBase):
 59class AirSigmet(AVWXBase):
 60    """
 61    In addition to the manager, you can use the `avwx.AirSigmet` class like any
 62    other report when you supply the report string via `parse` or
 63    `from_report`.
 64
 65    ```python
 66    >>> from avwx import AirSigmet
 67    >>> report = 'WSPR31 SPJC 270529 SPIM SIGMET 3 VALID 270530/270830 SPJC- SPIM LIMA FIR EMBD TS OBS AT 0510Z NE OF LINE S0406 W07103 - S0358 W07225 - S0235 W07432 - S0114 W07503 TOP FL410 MOV SW NC='
 68    >>> sigmet = AirSigmet.from_report(report)
 69    True
 70    >>> sigmet.last_updated
 71    datetime.datetime(2022, 3, 27, 6, 29, 33, 300935, tzinfo=datetime.timezone.utc)
 72    >>> sigmet.data.observation.coords
 73    [Coord(lat=-4.06, lon=-71.03, repr='S0406 W07103'),
 74    Coord(lat=-3.58, lon=-72.25, repr='S0358 W07225'),
 75    Coord(lat=-2.35, lon=-74.32, repr='S0235 W07432'),
 76    Coord(lat=-1.14, lon=-75.03, repr='S0114 W07503')]
 77    >>> sigmet.data.observation.intensity
 78    Code(repr='NC', value='No change')
 79    >>> sigmet.data.observation.ceiling
 80    Number(repr='FL410', value=410, spoken='flight level four one zero')
 81    ```
 82    """
 83
 84    data: Optional[AirSigmetData] = None
 85
 86    def _post_parse(self) -> None:
 87        if self.raw:
 88            self.data, self.units = parse(self.raw, self.issued)
 89
 90    @staticmethod
 91    def sanitize(report: str) -> str:
 92        """Sanitizes the report string"""
 93        return sanitize(report)
 94
 95    def intersects(self, path: LineString) -> bool:
 96        """Returns True if the report area intersects a flight path"""
 97        if LineString is None:
 98            raise ModuleNotFoundError("Install avwx-engine[shape] to use this feature")
 99        if not self.data:
100            return False
101        for data in (self.data.observation, self.data.forecast):
102            if data:
103                poly = data.poly
104                if poly and path.intersects(poly):
105                    return True
106        return False
107
108    def contains(self, coord: Coord) -> bool:
109        """Returns True if the report area contains a coordinate"""
110        if not self.data:
111            return False
112        for data in (self.data.observation, self.data.forecast):
113            if data:
114                poly = data.poly
115                if poly and coord.point.within(poly):
116                    return True
117        return False

In addition to the manager, you can use the avwx.AirSigmet class like any other report when you supply the report string via parse or from_report.

>>> from avwx import AirSigmet
>>> report = 'WSPR31 SPJC 270529 SPIM SIGMET 3 VALID 270530/270830 SPJC- SPIM LIMA FIR EMBD TS OBS AT 0510Z NE OF LINE S0406 W07103 - S0358 W07225 - S0235 W07432 - S0114 W07503 TOP FL410 MOV SW NC='
>>> sigmet = AirSigmet.from_report(report)
True
>>> sigmet.last_updated
datetime.datetime(2022, 3, 27, 6, 29, 33, 300935, tzinfo=datetime.timezone.utc)
>>> sigmet.data.observation.coords
[Coord(lat=-4.06, lon=-71.03, repr='S0406 W07103'),
Coord(lat=-3.58, lon=-72.25, repr='S0358 W07225'),
Coord(lat=-2.35, lon=-74.32, repr='S0235 W07432'),
Coord(lat=-1.14, lon=-75.03, repr='S0114 W07503')]
>>> sigmet.data.observation.intensity
Code(repr='NC', value='No change')
>>> sigmet.data.observation.ceiling
Number(repr='FL410', value=410, spoken='flight level four one zero')
data: Optional[avwx.structs.AirSigmetData] = None
@staticmethod
def sanitize(report: str) -> str:
90    @staticmethod
91    def sanitize(report: str) -> str:
92        """Sanitizes the report string"""
93        return sanitize(report)

Sanitizes the report string

def intersects(self, path: shapely.geometry.linestring.LineString) -> bool:
 95    def intersects(self, path: LineString) -> bool:
 96        """Returns True if the report area intersects a flight path"""
 97        if LineString is None:
 98            raise ModuleNotFoundError("Install avwx-engine[shape] to use this feature")
 99        if not self.data:
100            return False
101        for data in (self.data.observation, self.data.forecast):
102            if data:
103                poly = data.poly
104                if poly and path.intersects(poly):
105                    return True
106        return False

Returns True if the report area intersects a flight path

def contains(self, coord: avwx.structs.Coord) -> bool:
108    def contains(self, coord: Coord) -> bool:
109        """Returns True if the report area contains a coordinate"""
110        if not self.data:
111            return False
112        for data in (self.data.observation, self.data.forecast):
113            if data:
114                poly = data.poly
115                if poly and coord.point.within(poly):
116                    return True
117        return False

Returns True if the report area contains a coordinate

class AirSigManager:
120class AirSigManager:
121    """
122    Because of the global nature of these report types, we don't initialize a
123    report class with a station ident like the other report types. Instead, we
124    use a class to manage and update the list of all active SIGMET and AIRMET
125    reports.
126
127    ```python
128    >>> from avwx import AirSigManager
129    >>> from avwx.structs import Coord
130    >>> manager = AirSigManager()
131    >>> manager.update()
132    True
133    >>> manager.last_updated
134    datetime.datetime(2022, 3, 27, 5, 54, 21, 516741, tzinfo=datetime.timezone.utc)
135    >>> len(manager.reports)
136    113
137    >>> len(manager.contains(Coord(lat=33.12, lon=-105)))
138    5
139    >>> manager.reports[0].data.bulletin.type
140    Code(repr='WA', value='airmet')
141    >>> manager.reports[0].data.type
142    'AIRMET SIERRA FOR IFR AND MTN OBSCN'
143    ```
144    """
145
146    _services: List[Service]
147    _raw: List[Tuple[str, str]]
148    last_updated: Optional[datetime] = None
149    raw: List[str]
150    reports: Optional[List[AirSigmet]] = None
151
152    def __init__(self):  # type: ignore
153        self._services = [NOAA_Bulk("airsigmet"), NOAA_Intl("airsigmet")]
154        self._raw, self.raw = [], []
155
156    async def _update(
157        self, index: int, timeout: int
158    ) -> List[Tuple[str, Optional[str]]]:
159        source = self._services[index].root
160        reports = await self._services[index].async_fetch(timeout=timeout)  # type: ignore
161        raw: List[Tuple[str, Optional[str]]] = [
162            (report, source) for report in reports if report
163        ]
164        return raw
165
166    def update(self, timeout: int = 10, disable_post: bool = False) -> bool:
167        """Updates fetched reports and returns whether they've changed"""
168        return aio.run(self.async_update(timeout, disable_post))
169
170    async def async_update(self, timeout: int = 10, disable_post: bool = False) -> bool:
171        """Updates fetched reports and returns whether they've changed"""
172        coros = [self._update(i, timeout) for i in range(len(self._services))]
173        data = await aio.gather(*coros)
174        raw = list(chain.from_iterable(data))
175        reports = [i[0] for i in raw]
176        changed = raw != self.raw
177        if changed:
178            self._raw, self.raw = raw, reports
179            self.last_updated = datetime.now(tz=timezone.utc)
180        # Parse reports if not disabled
181        if not disable_post:
182            parsed = []
183            for report, source in raw:
184                try:
185                    if obj := AirSigmet.from_report(report):
186                        obj.source = source
187                        parsed.append(obj)
188                except Exception as exc:  # pylint: disable=broad-except
189                    exceptions.exception_intercept(exc, raw=report)
190            self.reports = parsed
191        return changed
192
193    def along(self, coords: List[Coord]) -> List[AirSigmet]:
194        """Returns available reports the intersect a flight path"""
195        if LineString is None:
196            raise ModuleNotFoundError("Install avwx-engine[shape] to use this feature")
197        if self.reports is None:
198            return []
199        path = LineString([c.pair for c in coords])
200        return [r for r in self.reports if r.intersects(path)]
201
202    def contains(self, coord: Coord) -> List[AirSigmet]:
203        """Returns available reports that contain a coordinate"""
204        if self.reports is None:
205            return []
206        return [r for r in self.reports if r.contains(coord)]

Because of the global nature of these report types, we don't initialize a report class with a station ident like the other report types. Instead, we use a class to manage and update the list of all active SIGMET and AIRMET reports.

>>> from avwx import AirSigManager
>>> from avwx.structs import Coord
>>> manager = AirSigManager()
>>> manager.update()
True
>>> manager.last_updated
datetime.datetime(2022, 3, 27, 5, 54, 21, 516741, tzinfo=datetime.timezone.utc)
>>> len(manager.reports)
113
>>> len(manager.contains(Coord(lat=33.12, lon=-105)))
5
>>> manager.reports[0].data.bulletin.type
Code(repr='WA', value='airmet')
>>> manager.reports[0].data.type
'AIRMET SIERRA FOR IFR AND MTN OBSCN'
last_updated: Optional[datetime.datetime] = None
raw: List[str]
reports: Optional[List[AirSigmet]] = None
def update(self, timeout: int = 10, disable_post: bool = False) -> bool:
166    def update(self, timeout: int = 10, disable_post: bool = False) -> bool:
167        """Updates fetched reports and returns whether they've changed"""
168        return aio.run(self.async_update(timeout, disable_post))

Updates fetched reports and returns whether they've changed

async def async_update(self, timeout: int = 10, disable_post: bool = False) -> bool:
170    async def async_update(self, timeout: int = 10, disable_post: bool = False) -> bool:
171        """Updates fetched reports and returns whether they've changed"""
172        coros = [self._update(i, timeout) for i in range(len(self._services))]
173        data = await aio.gather(*coros)
174        raw = list(chain.from_iterable(data))
175        reports = [i[0] for i in raw]
176        changed = raw != self.raw
177        if changed:
178            self._raw, self.raw = raw, reports
179            self.last_updated = datetime.now(tz=timezone.utc)
180        # Parse reports if not disabled
181        if not disable_post:
182            parsed = []
183            for report, source in raw:
184                try:
185                    if obj := AirSigmet.from_report(report):
186                        obj.source = source
187                        parsed.append(obj)
188                except Exception as exc:  # pylint: disable=broad-except
189                    exceptions.exception_intercept(exc, raw=report)
190            self.reports = parsed
191        return changed

Updates fetched reports and returns whether they've changed

def along( self, coords: List[avwx.structs.Coord]) -> List[AirSigmet]:
193    def along(self, coords: List[Coord]) -> List[AirSigmet]:
194        """Returns available reports the intersect a flight path"""
195        if LineString is None:
196            raise ModuleNotFoundError("Install avwx-engine[shape] to use this feature")
197        if self.reports is None:
198            return []
199        path = LineString([c.pair for c in coords])
200        return [r for r in self.reports if r.intersects(path)]

Returns available reports the intersect a flight path

def contains( self, coord: avwx.structs.Coord) -> List[AirSigmet]:
202    def contains(self, coord: Coord) -> List[AirSigmet]:
203        """Returns available reports that contain a coordinate"""
204        if self.reports is None:
205            return []
206        return [r for r in self.reports if r.contains(coord)]

Returns available reports that contain a coordinate

def sanitize(report: str) -> str:
637def sanitize(report: str) -> str:
638    """Sanitized AIRMET / SIGMET report string"""
639    report = report.strip(" =")
640    for key, val in _REPLACE.items():
641        report = report.replace(key, val)
642    data = report.split()
643    for i, item in reversed(list(enumerate(data))):
644        # Remove extra element on altitude Ex: FL450Z skip 1000FT
645        if (
646            len(item) > 4
647            and not item[-1].isdigit()
648            and item[-2:] != "FT"
649            and item[-1] != "M"
650            and core.is_altitude(item[:-1])
651        ):
652            data[i] = item[:-1]
653        # Split attached movement direction Ex: NE05KT
654        if (
655            len(item) > 4
656            and (item.endswith("KT") or item.endswith("KMH"))
657            and item[: _find_first_digit(item)] in CARDINALS
658        ):
659            index = _find_first_digit(item)
660            direction = item[:index]
661            data.insert(i + 1, item[index:])
662            data[i] = direction
663    return " ".join(data)

Sanitized AIRMET / SIGMET report string

def parse( report: str, issued: Optional[datetime.date] = None) -> Tuple[avwx.structs.AirSigmetData, avwx.structs.Units]:
666def parse(report: str, issued: Optional[date] = None) -> Tuple[AirSigmetData, Units]:
667    """Parse AIRMET / SIGMET report string"""
668    # pylint: disable=too-many-locals
669    units = Units.international()
670    sanitized = sanitize(report)
671    data, bulletin, issuer, time, correction = _header(_parse_prep(sanitized))
672    data, area, report_type, start_time, end_time, station = _spacetime(data)
673    body = sanitized[sanitized.find(" ".join(data[:2])) :]
674    # Trim AIRMET type
675    if data[0] == "AIRMET":
676        with suppress(ValueError):
677            data = data[data.index("<elip>") + 1 :]
678    data, region = _region(data)
679    units, observation, forecast = _observations(data, units, issued)
680    struct = AirSigmetData(
681        raw=report,
682        sanitized=sanitized,
683        station=station,
684        time=core.make_timestamp(time, target_date=issued),
685        remarks=None,
686        bulletin=bulletin,
687        issuer=issuer,
688        correction=correction,
689        area=area,
690        type=report_type,
691        start_time=core.make_timestamp(start_time, target_date=issued),
692        end_time=core.make_timestamp(end_time, target_date=issued),
693        body=body,
694        region=region,
695        observation=observation,
696        forecast=forecast,
697    )
698    return struct, units

Parse AIRMET / SIGMET report string