
A SIGMET (Significant Meteorological Information) is a weather advisory for the safety of all aircraft. They are divided into:

  • Convective - thunderstorms, hail, and cyclones
  • Non-Convective - turbulence, icing, dust clouds, volcanic activity, and radiation

An AIRMET (Airman's Meteorological Information) is a weather advisory for smaller aircraft or VFR navigation. They are divided into:

  • Sierra - IFR conditions like low ceilings and mountain obscuration
  • Tango - turbulence and high surface winds
  • Zulu - icing and freezing levels

Both types share a similar report format and therefore are combined into a single handling class. The Bulletin and weather type can be used to classify each as a SIGMET or AIRMET for filtering purposes.

 20# stdlib
 21from __future__ import annotations
 23import asyncio as aio
 24import re
 25from contextlib import suppress
 26from datetime import date, datetime, timezone
 27from itertools import chain
 29# library
 30from geopy.distance import distance as geo_distance  # type: ignore
 32# module
 33from avwx import exceptions
 34from avwx.base import AVWXBase
 35from avwx.exceptions import MissingExtraModule
 36from avwx.flight_path import to_coordinates
 37from avwx.load_utils import LazyLoad
 38from avwx.parsing import core
 39from avwx.service.bulk import NoaaBulk, NoaaIntl, Service
 40from avwx.static.airsigmet import BULLETIN_TYPES, INTENSITY, WEATHER_TYPES
 41from avwx.static.core import CARDINAL_DEGREES, CARDINALS
 42from avwx.structs import (
 43    AirSigmetData,
 44    AirSigObservation,
 45    Bulletin,
 46    Code,
 47    Coord,
 48    Movement,
 49    Number,
 50    Timestamp,
 51    Units,
 55    from shapely.geometry import LineString  # type: ignore
 56except ModuleNotFoundError:
 57    LineString = None
 60class AirSigmet(AVWXBase):
 61    """
 62    In addition to the manager, you can use the `avwx.AirSigmet` class like any
 63    other report when you supply the report string via `parse` or
 64    `from_report`.
 66    ```python
 67    >>> from avwx import AirSigmet
 68    >>> report = 'WSPR31 SPJC 270529 SPIM SIGMET 3 VALID 270530/270830 SPJC- SPIM LIMA FIR EMBD TS OBS AT 0510Z NE OF LINE S0406 W07103 - S0358 W07225 - S0235 W07432 - S0114 W07503 TOP FL410 MOV SW NC='
 69    >>> sigmet = AirSigmet.from_report(report)
 70    True
 71    >>> sigmet.last_updated
 72    datetime.datetime(2022, 3, 27, 6, 29, 33, 300935, tzinfo=datetime.timezone.utc)
 73    >>> sigmet.data.observation.coords
 74    [Coord(lat=-4.06, lon=-71.03, repr='S0406 W07103'),
 75    Coord(lat=-3.58, lon=-72.25, repr='S0358 W07225'),
 76    Coord(lat=-2.35, lon=-74.32, repr='S0235 W07432'),
 77    Coord(lat=-1.14, lon=-75.03, repr='S0114 W07503')]
 78    >>> sigmet.data.observation.intensity
 79    Code(repr='NC', value='No change')
 80    >>> sigmet.data.observation.ceiling
 81    Number(repr='FL410', value=410, spoken='flight level four one zero')
 82    ```
 83    """
 85    data: AirSigmetData | None = None
 87    def _post_parse(self) -> None:
 88        if self.raw:
 89            self.data, self.units = parse(self.raw, self.issued)
 91    @staticmethod
 92    def sanitize(report: str) -> str:
 93        """Sanitizes the report string"""
 94        return sanitize(report)
 96    def intersects(self, path: LineString) -> bool:
 97        """Returns True if the report area intersects a flight path"""
 98        if LineString is None:
 99            extra = "shape"
100            raise MissingExtraModule(extra)
101        if not self.data:
102            return False
103        for data in (self.data.observation, self.data.forecast):
104            if data:
105                poly = data.poly
106                if poly and path.intersects(poly):
107                    return True
108        return False
110    def contains(self, coord: Coord) -> bool:
111        """Returns True if the report area contains a coordinate"""
112        if not self.data:
113            return False
114        for data in (self.data.observation, self.data.forecast):
115            if data:
116                poly = data.poly
117                if poly and coord.point.within(poly):
118                    return True
119        return False
122class AirSigManager:
123    """
124    Because of the global nature of these report types, we don't initialize a
125    report class with a station ident like the other report types. Instead, we
126    use a class to manage and update the list of all active SIGMET and AIRMET
127    reports.
129    ```python
130    >>> from avwx import AirSigManager
131    >>> from avwx.structs import Coord
132    >>> manager = AirSigManager()
133    >>> manager.update()
134    True
135    >>> manager.last_updated
136    datetime.datetime(2022, 3, 27, 5, 54, 21, 516741, tzinfo=datetime.timezone.utc)
137    >>> len(manager.reports)
138    113
139    >>> len(manager.contains(Coord(lat=33.12, lon=-105)))
140    5
141    >>> manager.reports[0].data.bulletin.type
142    Code(repr='WA', value='airmet')
143    >>> manager.reports[0].data.type
145    ```
146    """
148    _services: list[Service]
149    _raw: list[tuple[str, str | None]]
150    last_updated: datetime | None = None
151    raw: list[str]
152    reports: list[AirSigmet] | None = None
154    def __init__(self):  # type: ignore
155        self._services = [NoaaBulk("airsigmet"), NoaaIntl("airsigmet")]
156        self._raw, self.raw = [], []
158    async def _update(self, index: int, timeout: int) -> list[tuple[str, str | None]]:
159        source = self._services[index].root
160        reports = await self._services[index].async_fetch(timeout=timeout)  # type: ignore
161        raw: list[tuple[str, str | None]] = [(report, source) for report in reports if report]
162        return raw
164    def update(self, timeout: int = 10, *, disable_post: bool = False) -> bool:
165        """Updates fetched reports and returns whether they've changed"""
166        return aio.run(self.async_update(timeout, disable_post=disable_post))
168    async def async_update(self, timeout: int = 10, *, disable_post: bool = False) -> bool:
169        """Updates fetched reports and returns whether they've changed"""
170        coros = [self._update(i, timeout) for i in range(len(self._services))]
171        data = await aio.gather(*coros)
172        raw = list(chain.from_iterable(data))
173        reports = [i[0] for i in raw]
174        if raw == self._raw:
175            return False
176        self._raw, self.raw = raw, reports
177        self.last_updated = datetime.now(tz=timezone.utc)
178        # Parse reports if not disabled
179        if not disable_post:
180            parsed = []
181            for report, source in raw:
182                try:
183                    if obj := AirSigmet.from_report(report):
184                        obj.source = source
185                        parsed.append(obj)
186                except Exception as exc:  # noqa: BLE001
187                    exceptions.exception_intercept(exc, raw={"report": report})
188            self.reports = parsed
189        return True
191    def along(self, coords: list[Coord]) -> list[AirSigmet]:
192        """Returns available reports the intersect a flight path"""
193        if LineString is None:
194            extra = "shape"
195            raise MissingExtraModule(extra)
196        if self.reports is None:
197            return []
198        path = LineString([c.pair for c in coords])
199        return [r for r in self.reports if r.intersects(path)]
201    def contains(self, coord: Coord) -> list[AirSigmet]:
202        """Returns available reports that contain a coordinate"""
203        if self.reports is None:
204            return []
205        return [r for r in self.reports if r.contains(coord)]
208# N1429 W09053 - N1427 W09052 - N1411 W09139 - N1417 W09141
209_COORD_PATTERN = re.compile(r"\b[NS]\d{4} [EW]\d{5}\b( -)?")
213_NAVAID_PATTERN = re.compile(r"\b(\d{1,3}[NESW]{1,3} [A-z]{3}-?\b)|((-|(TO )|(FROM ))[A-z]{3}\b)")
215# N OF N2050 AND S OF N2900
216_LATTERAL_PATTERN = re.compile(r"\b([NS] OF [NS]\d{2,4})|([EW] OF [EW]\d{3,5})( AND)?\b")
218NAVAIDS = LazyLoad("navaids")
220# Used to assist parsing after sanitized. Removed after parse
221_FLAGS = {
222    "...": " <elip> ",
223    "..": " <elip> ",
224    ". ": " <break> ",
225    "/VIS ": " <vis> VIS ",
229def _parse_prep(report: str) -> list[str]:
230    """Prepares sanitized string by replacing elements with flags"""
231    report = report.rstrip(".")
232    for key, val in _FLAGS.items():
233        report = report.replace(key, val)
234    return report.split()
237def _clean_flags(data: list[str]) -> list[str]:
238    return [i for i in data if i[0] != "<"]
241def _bulletin(value: str) -> Bulletin:
242    # if len(value) != 6:
243    #     return None
244    type_key = value[:2]
245    return Bulletin(
246        repr=value,
247        type=Code(repr=type_key, value=BULLETIN_TYPES[type_key]),
248        country=value[2:4],
249        number=int(value[4:]),
250    )
253def _header(data: list[str]) -> tuple[list[str], Bulletin, str, str, str | None]:
254    bulletin = _bulletin(data[0])
255    correction, end = (data[3], 4) if len(data[3]) == 3 else (None, 3)
256    return data[end:], bulletin, data[1], data[2], correction
259def _spacetime(
260    data: list[str],
261) -> tuple[list[str], str, str, str | None, str, str | None]:
262    area = data.pop(0)
263    # Skip airmet type + time repeat
264    if data[0] == "WA" and data[1].isdigit():
265        data = data[2:]
266        area = area[:-1]  # Remove type label from 3-letter ident
267    valid_index = data.index("VALID")
268    report_type = " ".join(data[:valid_index])
269    data = data[valid_index + 1 :]
270    if data[0] == "UNTIL":
271        start_time = None
272        end_time = data[1]
273        data = data[2:]
274    else:
275        target = "-" if "-" in data[0] else "/"
276        start_time, end_time = data.pop(0).split(target)
277    # KMCO- ORL FIR
278    if data[0][-1] == "-":
279        station = data.pop(0)[:-1]
280    # KMCO - KMCO
281    elif data[1] == "-" and len(data[0]) == 4:
282        station = data.pop(0)
283        data.pop(0)
284    else:
285        station = None
286    return data, area, report_type, start_time, end_time, station
289def _first_index(data: list[str], *targets: str) -> int:
290    for target in targets:
291        with suppress(ValueError):
292            return data.index(target)
293    return -1
296def _region(data: list[str]) -> tuple[list[str], str]:
297    # FIR/CTA region name
298    # Or non-standard name using lookahead Ex: FL CSTL WTRS FROM 100SSW
299    name_end = _first_index(data, "FIR", "CTA") + 1 or _first_index(data, "FROM")
300    # State list
301    if not name_end:
302        for item in data:
303            if len(item) == 2:
304                name_end += 1
305            else:
306                break
307    name = " ".join(data[:name_end])
308    return data[name_end:], name
311def _time(data: list[str], issued: date | None = None) -> tuple[list[str], Timestamp | None, Timestamp | None]:
312    """Extracts the start and/or end time based on a couple starting elements"""
313    index = _first_index(data, "AT", "FCST", "UNTIL", "VALID", "OUTLOOK", "OTLK")
314    if index == -1:
315        return data, None, None
316    start_item = data.pop(index)
317    start, end, observed = None, None, None
318    if "-" in data[index]:
319        start_item, end_item = data.pop(index).split("-")
320        start = core.make_timestamp(start_item, time_only=len(start_item) < 6, target_date=issued)
321        end = core.make_timestamp(end_item, time_only=len(end_item) < 6, target_date=issued)
322    elif len(data[index]) >= 4 and data[index][:4].isdigit():
323        observed = core.make_timestamp(data.pop(index), time_only=True, target_date=issued)
324        if index > 0 and data[index - 1] == "OBS":
325            data.pop(index - 1)
326    for remv in ("FCST", "OUTLOOK", "OTLK", "VALID"):
327        with suppress(ValueError):
328            data.remove(remv)
329    if observed:
330        if start_item in ("UNTIL", "VALID"):
331            end = observed
332        else:
333            start = observed
334    return data, start, end
337def _coord_value(value: str) -> float:
338    if value[0] in ("N", "S"):
339        index, strip, replace = 3, "N", "S"
340    else:
341        index, strip, replace = 4, "E", "W"
342    num = f"{value[:index]}.{value[index:]}".lstrip(strip).replace(replace, "-")
343    return float(num)
346def _position(data: list[str]) -> tuple[list[str], Coord | None]:
347    try:
348        index = data.index("PSN")
349    except ValueError:
350        return data, None
351    data.pop(index)
352    raw = f"{data[index]} {data[index + 1]}"
353    lat = _coord_value(data.pop(index))
354    lon = _coord_value(data.pop(index))
355    return data, Coord(lat=lat, lon=lon, repr=raw)
358def _movement(data: list[str], units: Units) -> tuple[list[str], Units, Movement | None]:
359    with suppress(ValueError):
360        data.remove("STNR")
361        speed = core.make_number("STNR")
362        return data, units, Movement(repr="STNR", direction=None, speed=speed)
363    try:
364        index = data.index("MOV")
365    except ValueError:
366        return data, units, None
367    raw = data.pop(index)
368    direction_str = data.pop(index)
369    # MOV CNL
370    if direction_str == "CNL":
371        return data, units, None
372    raw += f" {direction_str} "
373    # MOV FROM 23040KT
374    if direction_str == "FROM":
375        value = data[index][:3]
376        raw += value
377        direction = core.make_number(value)
378        data[index] = data[index][3:]
379    # MOV E 45KMH
380    else:
381        direction = core.make_number(direction_str.replace("/", ""), literal=True, special=CARDINAL_DEGREES)
382    speed = None
383    with suppress(IndexError):
384        kt_unit, kmh_unit = data[index].endswith("KT"), data[index].endswith("KMH")
385        if kt_unit or kmh_unit:
386            units.wind_speed = "kmh" if kmh_unit else "kt"
387            speed_str = data.pop(index)
388            raw += speed_str
389            # Remove bottom speed Ex: MOV W 05-10KT
390            if "-" in speed_str:
391                speed_str = speed_str[speed_str.find("-") + 1 :]
392            speed = core.make_number(speed_str[: -3 if kmh_unit else -2])
393    return data, units, Movement(repr=raw.strip(), direction=direction, speed=speed)
396def _info_from_match(match: re.Match, start: int) -> tuple[str, int]:
397    """Returns the matching text and starting location if none yet available"""
398    if start == -1:
399        start = match.start()
400    return match.group(), start
403def _pre_break(report: str) -> str:
404    break_index = report.find(" <break> ")
405    return report[:break_index] if break_index != -1 else report
408def _bounds_from_latterals(report: str, start: int) -> tuple[str, list[str], int]:
409    """Extract coordinate latterals from report Ex: N OF N2050"""
410    bounds = []
411    for match in _LATTERAL_PATTERN.finditer(_pre_break(report)):
412        group, start = _info_from_match(match, start)
413        bounds.append(group.removesuffix(" AND"))
414        report = report.replace(group, " ")
415    return report, bounds, start
418def _coords_from_text(report: str, start: int) -> tuple[str, list[Coord], int]:
419    """Extract raw coordinate values from report Ex: N4409 E01506"""
420    coords = []
421    for match in _COORD_PATTERN.finditer(_pre_break(report)):
422        group, start = _info_from_match(match, start)
423        text = group.strip(" -")
424        lat, lon = text.split()
425        coord = Coord(lat=_coord_value(lat), lon=_coord_value(lon), repr=text)
426        coords.append(coord)
427        report = report.replace(group, " ")
428    return report, coords, start
431def _coords_from_navaids(report: str, start: int) -> tuple[str, list[Coord], int]:
432    """Extract navaid referenced coordinates from report Ex: 30SSW BNA"""
433    coords, navs = [], []
434    for match in _NAVAID_PATTERN.finditer(_pre_break(report)):
435        group, start = _info_from_match(match, start)
436        report = report.replace(group, " ")
437        group = group.strip("-").removeprefix("FROM ").removeprefix("TO ")
438        navs.append((group, *group.split()))
439    locs = to_coordinates([n[2 if len(n) == 3 else 1] for n in navs])
440    for i, nav in enumerate(navs):
441        value = nav[0]
442        if len(nav) == 3:
443            vector, num_index = nav[1], 0
444            while vector[num_index].isdigit():
445                num_index += 1
446            distance, bearing = (
447                int(vector[:num_index]),
448                CARDINAL_DEGREES[vector[num_index:]],
449            )
450            loc = geo_distance(nautical=distance).destination(locs[i].pair, bearing=bearing)
451            coord = Coord(lat=loc.latitude, lon=loc.longitude, repr=value)
452        else:
453            coord = locs[i]
454            coord.repr = value
455        coords.append(coord)
456    return report, coords, start
459def _bounds(data: list[str]) -> tuple[list[str], list[Coord], list[str]]:
460    """Extract coordinate bounds by coord, navaid, and latterals"""
461    report, start = " ".join(data), -1
462    report, bounds, start = _bounds_from_latterals(report, start)
463    report, coords, start = _coords_from_text(report, start)
464    report, navs, start = _coords_from_navaids(report, start)
465    coords += navs
466    for target in ("FROM", "WI", "BOUNDED", "OBS"):
467        index = report.find(f"{target} ")
468        if index != -1 and index < start:
469            start = index
470    report = report[:start] + report[report.rfind("  ") :]
471    data = [s for s in report.split() if s]
472    return data, coords, bounds
475def _altitudes(data: list[str], units: Units) -> tuple[list[str], Units, Number | None, Number | None]:
476    """Extract the floor and ceiling altitudes"""
477    floor, ceiling = None, None
478    for i, item in enumerate(data):
479        # BTN FL180 AND FL330
480        if item == "BTN" and len(data) > i + 2 and data[i + 2] == "AND":
481            floor, units = core.make_altitude(data[i + 1], units)
482            ceiling, units = core.make_altitude(data[i + 3], units)
483            data = data[:i] + data[i + 4 :]
484            break
485        # TOPS ABV FL450
486        if item in ("TOP", "TOPS", "BLW"):
487            if data[i + 1] == "ABV":
488                ceiling = core.make_number(f"ABV {data[i + 2]}")
489                data = data[:i] + data[i + 3 :]
490                break
491            if data[i + 1] == "BLW":
492                ceiling = core.make_number(f"BLW {data[i + 2]}")
493                data = data[:i] + data[i + 3 :]
494                break
495            # TOPS TO FL310
496            if data[i + 1] == "TO":
497                data.pop(i)
498            ceiling, units = core.make_altitude(data[i + 1], units)
499            data = data[:i] + data[i + 2 :]
500            # CIG BLW 010
501            if data[i - 1] == "CIG":
502                data.pop(i - 1)
503            break
504        # FL060/300 SFC/FL160
505        if core.is_altitude(item):
506            if "/" in item:
507                floor_val, ceiling_val = item.split("/")
508                floor, units = core.make_altitude(floor_val, units)
509                if (floor_val == "SFC" or floor_val[:2] == "FL") and ceiling_val[:2] != "FL":
510                    ceiling, units = core.make_altitude(ceiling_val, units, force_fl=True)
511                else:
512                    ceiling, units = core.make_altitude(ceiling_val, units)
513            else:
514                ceiling, units = core.make_altitude(item, units)
515            data.pop(i)
516            break
517    return data, units, floor, ceiling
520def _weather_type(data: list[str]) -> tuple[list[str], Code | None]:
521    weather = None
522    report = " ".join(data)
523    for key, val in WEATHER_TYPES.items():
524        if key in report:
525            weather = Code(repr=key, value=val)
526            data = [i for i in report.replace(key, "").split() if i]
527            break
528    return data, weather
531def _intensity(data: list[str]) -> tuple[list[str], Code | None]:
532    if not data:
533        return data, None
534    try:
535        value = INTENSITY[data[-1]]
536        code = data.pop()
537        return data, Code(repr=code, value=value)
538    except KeyError:
539        return data, None
542def _sigmet_observation(data: list[str], units: Units, issued: date | None = None) -> tuple[AirSigObservation, Units]:
543    data, start_time, end_time = _time(data, issued)
544    data, position = _position(data)
545    data, coords, bounds = _bounds(data)
546    data, units, movement = _movement(data, units)
547    data, intensity = _intensity(data)
548    data, units, floor, ceiling = _altitudes(data, units)
549    data, weather = _weather_type(data)
550    struct = AirSigObservation(
551        type=weather,
552        start_time=start_time,
553        end_time=end_time,
554        position=position,
555        floor=floor,
556        ceiling=ceiling,
557        coords=coords,
558        bounds=bounds,
559        movement=movement,
560        intensity=intensity,
561        other=_clean_flags(data),
562    )
563    return struct, units
566def _observations(
567    data: list[str], units: Units, issued: date | None = None
568) -> tuple[Units, AirSigObservation | None, AirSigObservation | None]:
569    observation, forecast, forecast_index = None, None, -1
570    forecast_index = _first_index(data, "FCST", "OUTLOOK", "OTLK")
571    if forecast_index == -1:
572        observation, units = _sigmet_observation(data, units, issued)
573    # 6 is arbitrary. Will likely change or be more precise later
574    elif forecast_index < 6:
575        forecast, units = _sigmet_observation(data, units, issued)
576    else:
577        observation, units = _sigmet_observation(data[:forecast_index], units, issued)
578        forecast, units = _sigmet_observation(data[forecast_index:], units, issued)
579    return units, observation, forecast
582_REPLACE = {
583    " MO V ": " MOV ",
584    " STNRY": " STNR",
585    " STCNRY": " STNR",
586    " N-NE ": " NNE ",
587    " N-NW ": " NNW ",
588    " E-NE ": " ENE ",
589    " E-SE ": " ESE ",
590    " S-SE ": " SSE ",
591    " S-SW ": " SSW ",
592    " W-SW ": " WSW ",
593    " W-NW ": " WNW ",
597def _find_first_digit(item: str) -> int:
598    return next((i for i, char in enumerate(item) if char.isdigit()), -1)
601def sanitize(report: str) -> str:
602    """Sanitized AIRMET / SIGMET report string"""
603    report = report.strip(" =")
604    for key, val in _REPLACE.items():
605        report = report.replace(key, val)
606    data = report.split()
607    for i, item in reversed(list(enumerate(data))):
608        # Remove extra element on altitude Ex: FL450Z skip 1000FT
609        if (
610            len(item) > 4
611            and not item[-1].isdigit()
612            and item[-2:] != "FT"
613            and item[-1] != "M"
614            and core.is_altitude(item[:-1])
615        ):
616            data[i] = item[:-1]
617        # Split attached movement direction Ex: NE05KT
618        if len(item) >= 4 and item.endswith(("KT", "KMH")) and item[: _find_first_digit(item)] in CARDINALS:
619            index = _find_first_digit(item)
620            direction = item[:index]
621            data.insert(i + 1, item[index:])
622            data[i] = direction
623    return " ".join(data)
626def parse(report: str, issued: date | None = None) -> tuple[AirSigmetData, Units]:
627    """Parse AIRMET / SIGMET report string"""
628    units = Units.international()
629    sanitized = sanitize(report)
630    data, bulletin, issuer, time, correction = _header(_parse_prep(sanitized))
631    data, area, report_type, start_time, end_time, station = _spacetime(data)
632    body = sanitized[sanitized.find(" ".join(data[:2])) :]
633    # Trim AIRMET type
634    if data[0] == "AIRMET":
635        with suppress(ValueError):
636            data = data[data.index("<elip>") + 1 :]
637    data, region = _region(data)
638    units, observation, forecast = _observations(data, units, issued)
639    struct = AirSigmetData(
640        raw=report,
641        sanitized=sanitized,
642        station=station,
643        time=core.make_timestamp(time, target_date=issued),
644        remarks=None,
645        bulletin=bulletin,
646        issuer=issuer,
647        correction=correction,
648        area=area,
649        type=report_type,
650        start_time=core.make_timestamp(start_time, target_date=issued),
651        end_time=core.make_timestamp(end_time, target_date=issued),
652        body=body,
653        region=region,
654        observation=observation,
655        forecast=forecast,
656    )
657    return struct, units
