avwx.current.airsigmet

A SIGMET (Significant Meteorological Information) is a weather advisory for the safety of all aircraft. They are divided into:

  • Convective - thunderstorms, hail, and cyclones
  • Non-Convective - turbulence, icing, dust clouds, volcanic activity, and radiation

An AIRMET (Airman's Meteorological Information) is a weather advisory for smaller aircraft or VFR navigation. They are divided into:

  • Sierra - IFR conditions like low ceilings and mountain obscuration
  • Tango - turbulence and high surface winds
  • Zulu - icing and freezing levels

Both types share a similar report format and therefore are combined into a single handling class. The Bulletin and weather type can be used to classify each as a SIGMET or AIRMET for filtering purposes.

  1"""
  2A SIGMET (Significant Meteorological Information) is a weather advisory for the
  3safety of all aircraft. They are divided into:
  4
  5- Convective - thunderstorms, hail, and cyclones
  6- Non-Convective - turbulence, icing, dust clouds, volcanic activity, and radiation
  7
  8An AIRMET (Airman's Meteorological Information) is a weather advisory for
  9smaller aircraft or VFR navigation. They are divided into:
 10
 11- Sierra - IFR conditions like low ceilings and mountain obscuration
 12- Tango - turbulence and high surface winds
 13- Zulu - icing and freezing levels
 14
 15Both types share a similar report format and therefore are combined into a
 16single handling class. The `Bulletin` and weather type can be used to classify
 17each as a SIGMET or AIRMET for filtering purposes.
 18"""
 19
 20# stdlib
 21from __future__ import annotations
 22
 23import asyncio as aio
 24import re
 25from contextlib import suppress
 26from datetime import date, datetime, timezone
 27from itertools import chain
 28from typing import TypeAlias
 29
 30# library
 31from geopy.distance import distance as geo_distance  # type: ignore
 32
 33# module
 34from avwx import exceptions
 35from avwx.base import AVWXBase
 36from avwx.exceptions import MissingExtraModule
 37from avwx.flight_path import to_coordinates
 38from avwx.load_utils import LazyLoad
 39from avwx.parsing import core
 40from avwx.service.bulk import NoaaBulk, NoaaIntl, Service
 41from avwx.static.airsigmet import BULLETIN_TYPES, INTENSITY, WEATHER_TYPES
 42from avwx.static.core import CARDINAL_DEGREES, CARDINALS
 43from avwx.structs import (
 44    AirSigmetData,
 45    AirSigObservation,
 46    Bulletin,
 47    Code,
 48    Coord,
 49    Movement,
 50    Number,
 51    Timestamp,
 52    Units,
 53)
 54
 55try:
 56    from shapely.geometry import LineString
 57except ModuleNotFoundError:
 58    LineString = TypeAlias  # type: ignore
 59
 60
 61class AirSigmet(AVWXBase):
 62    """
 63    In addition to the manager, you can use the `avwx.AirSigmet` class like any
 64    other report when you supply the report string via `parse` or
 65    `from_report`.
 66
 67    ```python
 68    >>> from avwx import AirSigmet
 69    >>> report = 'WSPR31 SPJC 270529 SPIM SIGMET 3 VALID 270530/270830 SPJC- SPIM LIMA FIR EMBD TS OBS AT 0510Z NE OF LINE S0406 W07103 - S0358 W07225 - S0235 W07432 - S0114 W07503 TOP FL410 MOV SW NC='
 70    >>> sigmet = AirSigmet.from_report(report)
 71    True
 72    >>> sigmet.last_updated
 73    datetime.datetime(2022, 3, 27, 6, 29, 33, 300935, tzinfo=datetime.timezone.utc)
 74    >>> sigmet.data.observation.coords
 75    [Coord(lat=-4.06, lon=-71.03, repr='S0406 W07103'),
 76    Coord(lat=-3.58, lon=-72.25, repr='S0358 W07225'),
 77    Coord(lat=-2.35, lon=-74.32, repr='S0235 W07432'),
 78    Coord(lat=-1.14, lon=-75.03, repr='S0114 W07503')]
 79    >>> sigmet.data.observation.intensity
 80    Code(repr='NC', value='No change')
 81    >>> sigmet.data.observation.ceiling
 82    Number(repr='FL410', value=410, spoken='flight level four one zero')
 83    ```
 84    """
 85
 86    data: AirSigmetData | None = None
 87
 88    def _post_parse(self) -> None:
 89        if self.raw:
 90            self.data, self.units = parse(self.raw, self.issued)
 91
 92    @staticmethod
 93    def sanitize(report: str) -> str:
 94        """Sanitizes the report string"""
 95        return sanitize(report)
 96
 97    def intersects(self, path: LineString) -> bool:
 98        """Returns True if the report area intersects a flight path"""
 99        if LineString is None:
100            extra = "shape"
101            raise MissingExtraModule(extra)
102        if not self.data:
103            return False
104        for data in (self.data.observation, self.data.forecast):
105            if data:
106                poly = data.poly
107                if poly and path.intersects(poly):
108                    return True
109        return False
110
111    def contains(self, coord: Coord) -> bool:
112        """Returns True if the report area contains a coordinate"""
113        if not self.data:
114            return False
115        for data in (self.data.observation, self.data.forecast):
116            if data:
117                poly = data.poly
118                if poly and coord.point.within(poly):
119                    return True
120        return False
121
122
123class AirSigManager:
124    """
125    Because of the global nature of these report types, we don't initialize a
126    report class with a station ident like the other report types. Instead, we
127    use a class to manage and update the list of all active SIGMET and AIRMET
128    reports.
129
130    ```python
131    >>> from avwx import AirSigManager
132    >>> from avwx.structs import Coord
133    >>> manager = AirSigManager()
134    >>> manager.update()
135    True
136    >>> manager.last_updated
137    datetime.datetime(2022, 3, 27, 5, 54, 21, 516741, tzinfo=datetime.timezone.utc)
138    >>> len(manager.reports)
139    113
140    >>> len(manager.contains(Coord(lat=33.12, lon=-105)))
141    5
142    >>> manager.reports[0].data.bulletin.type
143    Code(repr='WA', value='airmet')
144    >>> manager.reports[0].data.type
145    'AIRMET SIERRA FOR IFR AND MTN OBSCN'
146    ```
147    """
148
149    _services: list[Service]
150    _raw: list[tuple[str, str | None]]
151    last_updated: datetime | None = None
152    raw: list[str]
153    reports: list[AirSigmet] | None = None
154
155    def __init__(self):  # type: ignore
156        self._services = [NoaaBulk("airsigmet"), NoaaIntl("airsigmet")]
157        self._raw, self.raw = [], []
158
159    async def _update(self, index: int, timeout: int) -> list[tuple[str, str | None]]:
160        source = self._services[index].root
161        reports = await self._services[index].async_fetch(timeout=timeout)  # type: ignore
162        raw: list[tuple[str, str | None]] = [(report, source) for report in reports if report]
163        return raw
164
165    def update(self, timeout: int = 10, *, disable_post: bool = False) -> bool:
166        """Updates fetched reports and returns whether they've changed"""
167        return aio.run(self.async_update(timeout, disable_post=disable_post))
168
169    async def async_update(self, timeout: int = 10, *, disable_post: bool = False) -> bool:
170        """Updates fetched reports and returns whether they've changed"""
171        coros = [self._update(i, timeout) for i in range(len(self._services))]
172        data = await aio.gather(*coros)
173        raw = list(chain.from_iterable(data))
174        reports = [i[0] for i in raw]
175        if raw == self._raw:
176            return False
177        self._raw, self.raw = raw, reports
178        self.last_updated = datetime.now(tz=timezone.utc)
179        # Parse reports if not disabled
180        if not disable_post:
181            parsed = []
182            for report, source in raw:
183                try:
184                    if obj := AirSigmet.from_report(report):
185                        obj.source = source
186                        parsed.append(obj)
187                except Exception as exc:  # noqa: BLE001
188                    exceptions.exception_intercept(exc, raw={"report": report})
189            self.reports = parsed
190        return True
191
192    def along(self, coords: list[Coord]) -> list[AirSigmet]:
193        """Returns available reports the intersect a flight path"""
194        if LineString is None:
195            extra = "shape"
196            raise MissingExtraModule(extra)
197        if self.reports is None:
198            return []
199        path = LineString([c.pair for c in coords])
200        return [r for r in self.reports if r.intersects(path)]
201
202    def contains(self, coord: Coord) -> list[AirSigmet]:
203        """Returns available reports that contain a coordinate"""
204        if self.reports is None:
205            return []
206        return [r for r in self.reports if r.contains(coord)]
207
208
209# N1429 W09053 - N1427 W09052 - N1411 W09139 - N1417 W09141
210_COORD_PATTERN = re.compile(r"\b[NS]\d{4} [EW]\d{5}\b( -)?")
211
212# FROM 60NW ISN-INL-TVC-GIJ-UIN-FSD-BIL-60NW ISN
213# FROM 70SSW ISN TO 20NNW FAR TO 70E DLH TO 40SE EAU TO 80SE RAP TO 40NNW BFF TO 70SSW
214_NAVAID_PATTERN = re.compile(r"\b(\d{1,3}[NESW]{1,3} [A-z]{3}-?\b)|((-|(TO )|(FROM ))[A-z]{3}\b)")
215
216# N OF N2050 AND S OF N2900
217_LATTERAL_PATTERN = re.compile(r"\b([NS] OF [NS]\d{2,4})|([EW] OF [EW]\d{3,5})( AND)?\b")
218
219NAVAIDS = LazyLoad("navaids")
220
221# Used to assist parsing after sanitized. Removed after parse
222_FLAGS = {
223    "...": " <elip> ",
224    "..": " <elip> ",
225    ". ": " <break> ",
226    "/VIS ": " <vis> VIS ",
227}
228
229
230def _parse_prep(report: str) -> list[str]:
231    """Prepares sanitized string by replacing elements with flags"""
232    report = report.rstrip(".")
233    for key, val in _FLAGS.items():
234        report = report.replace(key, val)
235    return report.split()
236
237
238def _clean_flags(data: list[str]) -> list[str]:
239    return [i for i in data if i[0] != "<"]
240
241
242def _bulletin(value: str) -> Bulletin:
243    # if len(value) != 6:
244    #     return None
245    type_key = value[:2]
246    return Bulletin(
247        repr=value,
248        type=Code(repr=type_key, value=BULLETIN_TYPES[type_key]),
249        country=value[2:4],
250        number=int(value[4:]),
251    )
252
253
254def _header(data: list[str]) -> tuple[list[str], Bulletin, str, str, str | None]:
255    bulletin = _bulletin(data[0])
256    correction, end = (data[3], 4) if len(data[3]) == 3 else (None, 3)
257    return data[end:], bulletin, data[1], data[2], correction
258
259
260def _spacetime(
261    data: list[str],
262) -> tuple[list[str], str, str, str | None, str, str | None]:
263    area = data.pop(0)
264    # Skip airmet type + time repeat
265    if data[0] == "WA" and data[1].isdigit():
266        data = data[2:]
267        area = area[:-1]  # Remove type label from 3-letter ident
268    valid_index = data.index("VALID")
269    report_type = " ".join(data[:valid_index])
270    data = data[valid_index + 1 :]
271    if data[0] == "UNTIL":
272        start_time = None
273        end_time = data[1]
274        data = data[2:]
275    else:
276        target = "-" if "-" in data[0] else "/"
277        start_time, end_time = data.pop(0).split(target)
278    # KMCO- ORL FIR
279    if data[0][-1] == "-":
280        station = data.pop(0)[:-1]
281    # KMCO - KMCO
282    elif data[1] == "-" and len(data[0]) == 4:
283        station = data.pop(0)
284        data.pop(0)
285    else:
286        station = None
287    return data, area, report_type, start_time, end_time, station
288
289
290def _first_index(data: list[str], *targets: str) -> int:
291    for target in targets:
292        with suppress(ValueError):
293            return data.index(target)
294    return -1
295
296
297def _region(data: list[str]) -> tuple[list[str], str]:
298    # FIR/CTA region name
299    # Or non-standard name using lookahead Ex: FL CSTL WTRS FROM 100SSW
300    name_end = _first_index(data, "FIR", "CTA") + 1 or _first_index(data, "FROM")
301    # State list
302    if not name_end:
303        for item in data:
304            if len(item) == 2:
305                name_end += 1
306            else:
307                break
308    name = " ".join(data[:name_end])
309    return data[name_end:], name
310
311
312def _time(data: list[str], issued: date | None = None) -> tuple[list[str], Timestamp | None, Timestamp | None]:
313    """Extracts the start and/or end time based on a couple starting elements"""
314    index = _first_index(data, "AT", "FCST", "UNTIL", "VALID", "OUTLOOK", "OTLK")
315    if index == -1:
316        return data, None, None
317    start_item = data.pop(index)
318    start, end, observed = None, None, None
319    if "-" in data[index]:
320        start_item, end_item = data.pop(index).split("-")
321        start = core.make_timestamp(start_item, time_only=len(start_item) < 6, target_date=issued)
322        end = core.make_timestamp(end_item, time_only=len(end_item) < 6, target_date=issued)
323    elif len(data[index]) >= 4 and data[index][:4].isdigit():
324        observed = core.make_timestamp(data.pop(index), time_only=True, target_date=issued)
325        if index > 0 and data[index - 1] == "OBS":
326            data.pop(index - 1)
327    for remv in ("FCST", "OUTLOOK", "OTLK", "VALID"):
328        with suppress(ValueError):
329            data.remove(remv)
330    if observed:
331        if start_item in ("UNTIL", "VALID"):
332            end = observed
333        else:
334            start = observed
335    return data, start, end
336
337
338def _coord_value(value: str) -> float:
339    if value[0] in ("N", "S"):
340        index, strip, replace = 3, "N", "S"
341    else:
342        index, strip, replace = 4, "E", "W"
343    num = f"{value[:index]}.{value[index:]}".lstrip(strip).replace(replace, "-")
344    return float(num)
345
346
347def _position(data: list[str]) -> tuple[list[str], Coord | None]:
348    try:
349        index = data.index("PSN")
350    except ValueError:
351        return data, None
352    data.pop(index)
353    raw = f"{data[index]} {data[index + 1]}"
354    lat = _coord_value(data.pop(index))
355    lon = _coord_value(data.pop(index))
356    return data, Coord(lat=lat, lon=lon, repr=raw)
357
358
359def _movement(data: list[str], units: Units) -> tuple[list[str], Units, Movement | None]:
360    with suppress(ValueError):
361        data.remove("STNR")
362        speed = core.make_number("STNR")
363        return data, units, Movement(repr="STNR", direction=None, speed=speed)
364    try:
365        index = data.index("MOV")
366    except ValueError:
367        return data, units, None
368    raw = data.pop(index)
369    direction_str = data.pop(index)
370    # MOV CNL
371    if direction_str == "CNL":
372        return data, units, None
373    raw += f" {direction_str} "
374    # MOV FROM 23040KT
375    if direction_str == "FROM":
376        value = data[index][:3]
377        raw += value
378        direction = core.make_number(value)
379        data[index] = data[index][3:]
380    # MOV E 45KMH
381    else:
382        direction = core.make_number(direction_str.replace("/", ""), literal=True, special=CARDINAL_DEGREES)
383    speed = None
384    with suppress(IndexError):
385        kt_unit, kmh_unit = data[index].endswith("KT"), data[index].endswith("KMH")
386        if kt_unit or kmh_unit:
387            units.wind_speed = "kmh" if kmh_unit else "kt"
388            speed_str = data.pop(index)
389            raw += speed_str
390            # Remove bottom speed Ex: MOV W 05-10KT
391            if "-" in speed_str:
392                speed_str = speed_str[speed_str.find("-") + 1 :]
393            speed = core.make_number(speed_str[: -3 if kmh_unit else -2])
394    return data, units, Movement(repr=raw.strip(), direction=direction, speed=speed)
395
396
397def _info_from_match(match: re.Match, start: int) -> tuple[str, int]:
398    """Returns the matching text and starting location if none yet available"""
399    if start == -1:
400        start = match.start()
401    return match.group(), start
402
403
404def _pre_break(report: str) -> str:
405    break_index = report.find(" <break> ")
406    return report[:break_index] if break_index != -1 else report
407
408
409def _bounds_from_latterals(report: str, start: int) -> tuple[str, list[str], int]:
410    """Extract coordinate latterals from report Ex: N OF N2050"""
411    bounds = []
412    for match in _LATTERAL_PATTERN.finditer(_pre_break(report)):
413        group, start = _info_from_match(match, start)
414        bounds.append(group.removesuffix(" AND"))
415        report = report.replace(group, " ")
416    return report, bounds, start
417
418
419def _coords_from_text(report: str, start: int) -> tuple[str, list[Coord], int]:
420    """Extract raw coordinate values from report Ex: N4409 E01506"""
421    coords = []
422    for match in _COORD_PATTERN.finditer(_pre_break(report)):
423        group, start = _info_from_match(match, start)
424        text = group.strip(" -")
425        lat, lon = text.split()
426        coord = Coord(lat=_coord_value(lat), lon=_coord_value(lon), repr=text)
427        coords.append(coord)
428        report = report.replace(group, " ")
429    return report, coords, start
430
431
432def _coords_from_navaids(report: str, start: int) -> tuple[str, list[Coord], int]:
433    """Extract navaid referenced coordinates from report Ex: 30SSW BNA"""
434    coords, navs = [], []
435    for match in _NAVAID_PATTERN.finditer(_pre_break(report)):
436        group, start = _info_from_match(match, start)
437        report = report.replace(group, " ")
438        group = group.strip("-").removeprefix("FROM ").removeprefix("TO ")
439        navs.append((group, *group.split()))
440    locs = to_coordinates([n[2 if len(n) == 3 else 1] for n in navs])
441    for i, nav in enumerate(navs):
442        value = nav[0]
443        if len(nav) == 3:
444            vector, num_index = nav[1], 0
445            while vector[num_index].isdigit():
446                num_index += 1
447            distance, bearing = (
448                int(vector[:num_index]),
449                CARDINAL_DEGREES[vector[num_index:]],
450            )
451            loc = geo_distance(nautical=distance).destination(locs[i].pair, bearing=bearing)
452            coord = Coord(lat=loc.latitude, lon=loc.longitude, repr=value)
453        else:
454            coord = locs[i]
455            coord.repr = value
456        coords.append(coord)
457    return report, coords, start
458
459
460def _bounds(data: list[str]) -> tuple[list[str], list[Coord], list[str]]:
461    """Extract coordinate bounds by coord, navaid, and latterals"""
462    report, start = " ".join(data), -1
463    report, bounds, start = _bounds_from_latterals(report, start)
464    report, coords, start = _coords_from_text(report, start)
465    report, navs, start = _coords_from_navaids(report, start)
466    coords += navs
467    for target in ("FROM", "WI", "BOUNDED", "OBS"):
468        index = report.find(f"{target} ")
469        if index != -1 and index < start:
470            start = index
471    report = report[:start] + report[report.rfind("  ") :]
472    data = [s for s in report.split() if s]
473    return data, coords, bounds
474
475
476def _altitudes(data: list[str], units: Units) -> tuple[list[str], Units, Number | None, Number | None]:
477    """Extract the floor and ceiling altitudes"""
478    floor, ceiling = None, None
479    for i, item in enumerate(data):
480        # BTN FL180 AND FL330
481        if item == "BTN" and len(data) > i + 2 and data[i + 2] == "AND":
482            floor, units = core.make_altitude(data[i + 1], units)
483            ceiling, units = core.make_altitude(data[i + 3], units)
484            data = data[:i] + data[i + 4 :]
485            break
486        # TOPS ABV FL450
487        if item in ("TOP", "TOPS", "BLW"):
488            if data[i + 1] == "ABV":
489                ceiling = core.make_number(f"ABV {data[i + 2]}")
490                data = data[:i] + data[i + 3 :]
491                break
492            if data[i + 1] == "BLW":
493                ceiling = core.make_number(f"BLW {data[i + 2]}")
494                data = data[:i] + data[i + 3 :]
495                break
496            # TOPS TO FL310
497            if data[i + 1] == "TO":
498                data.pop(i)
499            ceiling, units = core.make_altitude(data[i + 1], units)
500            data = data[:i] + data[i + 2 :]
501            # CIG BLW 010
502            if data[i - 1] == "CIG":
503                data.pop(i - 1)
504            break
505        # FL060/300 SFC/FL160
506        if core.is_altitude(item):
507            if "/" in item:
508                floor_val, ceiling_val = item.split("/")
509                floor, units = core.make_altitude(floor_val, units)
510                if (floor_val == "SFC" or floor_val[:2] == "FL") and ceiling_val[:2] != "FL":
511                    ceiling, units = core.make_altitude(ceiling_val, units, force_fl=True)
512                else:
513                    ceiling, units = core.make_altitude(ceiling_val, units)
514            else:
515                ceiling, units = core.make_altitude(item, units)
516            data.pop(i)
517            break
518    return data, units, floor, ceiling
519
520
521def _weather_type(data: list[str]) -> tuple[list[str], Code | None]:
522    weather = None
523    report = " ".join(data)
524    for key, val in WEATHER_TYPES.items():
525        if key in report:
526            weather = Code(repr=key, value=val)
527            data = [i for i in report.replace(key, "").split() if i]
528            break
529    return data, weather
530
531
532def _intensity(data: list[str]) -> tuple[list[str], Code | None]:
533    if not data:
534        return data, None
535    try:
536        value = INTENSITY[data[-1]]
537        code = data.pop()
538        return data, Code(repr=code, value=value)
539    except KeyError:
540        return data, None
541
542
543def _sigmet_observation(data: list[str], units: Units, issued: date | None = None) -> tuple[AirSigObservation, Units]:
544    data, start_time, end_time = _time(data, issued)
545    data, position = _position(data)
546    data, coords, bounds = _bounds(data)
547    data, units, movement = _movement(data, units)
548    data, intensity = _intensity(data)
549    data, units, floor, ceiling = _altitudes(data, units)
550    data, weather = _weather_type(data)
551    struct = AirSigObservation(
552        type=weather,
553        start_time=start_time,
554        end_time=end_time,
555        position=position,
556        floor=floor,
557        ceiling=ceiling,
558        coords=coords,
559        bounds=bounds,
560        movement=movement,
561        intensity=intensity,
562        other=_clean_flags(data),
563    )
564    return struct, units
565
566
567def _observations(
568    data: list[str], units: Units, issued: date | None = None
569) -> tuple[Units, AirSigObservation | None, AirSigObservation | None]:
570    observation, forecast, forecast_index = None, None, -1
571    forecast_index = _first_index(data, "FCST", "OUTLOOK", "OTLK")
572    if forecast_index == -1:
573        observation, units = _sigmet_observation(data, units, issued)
574    # 6 is arbitrary. Will likely change or be more precise later
575    elif forecast_index < 6:
576        forecast, units = _sigmet_observation(data, units, issued)
577    else:
578        observation, units = _sigmet_observation(data[:forecast_index], units, issued)
579        forecast, units = _sigmet_observation(data[forecast_index:], units, issued)
580    return units, observation, forecast
581
582
583_REPLACE = {
584    " MO V ": " MOV ",
585    " STNRY": " STNR",
586    " STCNRY": " STNR",
587    " N-NE ": " NNE ",
588    " N-NW ": " NNW ",
589    " E-NE ": " ENE ",
590    " E-SE ": " ESE ",
591    " S-SE ": " SSE ",
592    " S-SW ": " SSW ",
593    " W-SW ": " WSW ",
594    " W-NW ": " WNW ",
595}
596
597
598def _find_first_digit(item: str) -> int:
599    return next((i for i, char in enumerate(item) if char.isdigit()), -1)
600
601
602def sanitize(report: str) -> str:
603    """Sanitized AIRMET / SIGMET report string"""
604    report = report.strip(" =")
605    for key, val in _REPLACE.items():
606        report = report.replace(key, val)
607    data = report.split()
608    for i, item in reversed(list(enumerate(data))):
609        # Remove extra element on altitude Ex: FL450Z skip 1000FT
610        if (
611            len(item) > 4
612            and not item[-1].isdigit()
613            and item[-2:] != "FT"
614            and item[-1] != "M"
615            and core.is_altitude(item[:-1])
616        ):
617            data[i] = item[:-1]
618        # Split attached movement direction Ex: NE05KT
619        if len(item) >= 4 and item.endswith(("KT", "KMH")) and item[: _find_first_digit(item)] in CARDINALS:
620            index = _find_first_digit(item)
621            direction = item[:index]
622            data.insert(i + 1, item[index:])
623            data[i] = direction
624    return " ".join(data)
625
626
627def parse(report: str, issued: date | None = None) -> tuple[AirSigmetData, Units]:
628    """Parse AIRMET / SIGMET report string"""
629    units = Units.international()
630    sanitized = sanitize(report)
631    data, bulletin, issuer, time, correction = _header(_parse_prep(sanitized))
632    data, area, report_type, start_time, end_time, station = _spacetime(data)
633    body = sanitized[sanitized.find(" ".join(data[:2])) :]
634    # Trim AIRMET type
635    if data[0] == "AIRMET":
636        with suppress(ValueError):
637            data = data[data.index("<elip>") + 1 :]
638    data, region = _region(data)
639    units, observation, forecast = _observations(data, units, issued)
640    struct = AirSigmetData(
641        raw=report,
642        sanitized=sanitized,
643        station=station,
644        time=core.make_timestamp(time, target_date=issued),
645        remarks=None,
646        bulletin=bulletin,
647        issuer=issuer,
648        correction=correction,
649        area=area,
650        type=report_type,
651        start_time=core.make_timestamp(start_time, target_date=issued),
652        end_time=core.make_timestamp(end_time, target_date=issued),
653        body=body,
654        region=region,
655        observation=observation,
656        forecast=forecast,
657    )
658    return struct, units
class AirSigmet(avwx.base.AVWXBase):
 62class AirSigmet(AVWXBase):
 63    """
 64    In addition to the manager, you can use the `avwx.AirSigmet` class like any
 65    other report when you supply the report string via `parse` or
 66    `from_report`.
 67
 68    ```python
 69    >>> from avwx import AirSigmet
 70    >>> report = 'WSPR31 SPJC 270529 SPIM SIGMET 3 VALID 270530/270830 SPJC- SPIM LIMA FIR EMBD TS OBS AT 0510Z NE OF LINE S0406 W07103 - S0358 W07225 - S0235 W07432 - S0114 W07503 TOP FL410 MOV SW NC='
 71    >>> sigmet = AirSigmet.from_report(report)
 72    True
 73    >>> sigmet.last_updated
 74    datetime.datetime(2022, 3, 27, 6, 29, 33, 300935, tzinfo=datetime.timezone.utc)
 75    >>> sigmet.data.observation.coords
 76    [Coord(lat=-4.06, lon=-71.03, repr='S0406 W07103'),
 77    Coord(lat=-3.58, lon=-72.25, repr='S0358 W07225'),
 78    Coord(lat=-2.35, lon=-74.32, repr='S0235 W07432'),
 79    Coord(lat=-1.14, lon=-75.03, repr='S0114 W07503')]
 80    >>> sigmet.data.observation.intensity
 81    Code(repr='NC', value='No change')
 82    >>> sigmet.data.observation.ceiling
 83    Number(repr='FL410', value=410, spoken='flight level four one zero')
 84    ```
 85    """
 86
 87    data: AirSigmetData | None = None
 88
 89    def _post_parse(self) -> None:
 90        if self.raw:
 91            self.data, self.units = parse(self.raw, self.issued)
 92
 93    @staticmethod
 94    def sanitize(report: str) -> str:
 95        """Sanitizes the report string"""
 96        return sanitize(report)
 97
 98    def intersects(self, path: LineString) -> bool:
 99        """Returns True if the report area intersects a flight path"""
100        if LineString is None:
101            extra = "shape"
102            raise MissingExtraModule(extra)
103        if not self.data:
104            return False
105        for data in (self.data.observation, self.data.forecast):
106            if data:
107                poly = data.poly
108                if poly and path.intersects(poly):
109                    return True
110        return False
111
112    def contains(self, coord: Coord) -> bool:
113        """Returns True if the report area contains a coordinate"""
114        if not self.data:
115            return False
116        for data in (self.data.observation, self.data.forecast):
117            if data:
118                poly = data.poly
119                if poly and coord.point.within(poly):
120                    return True
121        return False

In addition to the manager, you can use the avwx.AirSigmet class like any other report when you supply the report string via parse or from_report.

>>> from avwx import AirSigmet
>>> report = 'WSPR31 SPJC 270529 SPIM SIGMET 3 VALID 270530/270830 SPJC- SPIM LIMA FIR EMBD TS OBS AT 0510Z NE OF LINE S0406 W07103 - S0358 W07225 - S0235 W07432 - S0114 W07503 TOP FL410 MOV SW NC='
>>> sigmet = AirSigmet.from_report(report)
True
>>> sigmet.last_updated
datetime.datetime(2022, 3, 27, 6, 29, 33, 300935, tzinfo=datetime.timezone.utc)
>>> sigmet.data.observation.coords
[Coord(lat=-4.06, lon=-71.03, repr='S0406 W07103'),
Coord(lat=-3.58, lon=-72.25, repr='S0358 W07225'),
Coord(lat=-2.35, lon=-74.32, repr='S0235 W07432'),
Coord(lat=-1.14, lon=-75.03, repr='S0114 W07503')]
>>> sigmet.data.observation.intensity
Code(repr='NC', value='No change')
>>> sigmet.data.observation.ceiling
Number(repr='FL410', value=410, spoken='flight level four one zero')
data: avwx.structs.AirSigmetData | None = None
@staticmethod
def sanitize(report: str) -> str:
93    @staticmethod
94    def sanitize(report: str) -> str:
95        """Sanitizes the report string"""
96        return sanitize(report)

Sanitizes the report string

def intersects(self, path: shapely.geometry.linestring.LineString) -> bool:
 98    def intersects(self, path: LineString) -> bool:
 99        """Returns True if the report area intersects a flight path"""
100        if LineString is None:
101            extra = "shape"
102            raise MissingExtraModule(extra)
103        if not self.data:
104            return False
105        for data in (self.data.observation, self.data.forecast):
106            if data:
107                poly = data.poly
108                if poly and path.intersects(poly):
109                    return True
110        return False

Returns True if the report area intersects a flight path

def contains(self, coord: avwx.structs.Coord) -> bool:
112    def contains(self, coord: Coord) -> bool:
113        """Returns True if the report area contains a coordinate"""
114        if not self.data:
115            return False
116        for data in (self.data.observation, self.data.forecast):
117            if data:
118                poly = data.poly
119                if poly and coord.point.within(poly):
120                    return True
121        return False

Returns True if the report area contains a coordinate

class AirSigManager:
124class AirSigManager:
125    """
126    Because of the global nature of these report types, we don't initialize a
127    report class with a station ident like the other report types. Instead, we
128    use a class to manage and update the list of all active SIGMET and AIRMET
129    reports.
130
131    ```python
132    >>> from avwx import AirSigManager
133    >>> from avwx.structs import Coord
134    >>> manager = AirSigManager()
135    >>> manager.update()
136    True
137    >>> manager.last_updated
138    datetime.datetime(2022, 3, 27, 5, 54, 21, 516741, tzinfo=datetime.timezone.utc)
139    >>> len(manager.reports)
140    113
141    >>> len(manager.contains(Coord(lat=33.12, lon=-105)))
142    5
143    >>> manager.reports[0].data.bulletin.type
144    Code(repr='WA', value='airmet')
145    >>> manager.reports[0].data.type
146    'AIRMET SIERRA FOR IFR AND MTN OBSCN'
147    ```
148    """
149
150    _services: list[Service]
151    _raw: list[tuple[str, str | None]]
152    last_updated: datetime | None = None
153    raw: list[str]
154    reports: list[AirSigmet] | None = None
155
156    def __init__(self):  # type: ignore
157        self._services = [NoaaBulk("airsigmet"), NoaaIntl("airsigmet")]
158        self._raw, self.raw = [], []
159
160    async def _update(self, index: int, timeout: int) -> list[tuple[str, str | None]]:
161        source = self._services[index].root
162        reports = await self._services[index].async_fetch(timeout=timeout)  # type: ignore
163        raw: list[tuple[str, str | None]] = [(report, source) for report in reports if report]
164        return raw
165
166    def update(self, timeout: int = 10, *, disable_post: bool = False) -> bool:
167        """Updates fetched reports and returns whether they've changed"""
168        return aio.run(self.async_update(timeout, disable_post=disable_post))
169
170    async def async_update(self, timeout: int = 10, *, disable_post: bool = False) -> bool:
171        """Updates fetched reports and returns whether they've changed"""
172        coros = [self._update(i, timeout) for i in range(len(self._services))]
173        data = await aio.gather(*coros)
174        raw = list(chain.from_iterable(data))
175        reports = [i[0] for i in raw]
176        if raw == self._raw:
177            return False
178        self._raw, self.raw = raw, reports
179        self.last_updated = datetime.now(tz=timezone.utc)
180        # Parse reports if not disabled
181        if not disable_post:
182            parsed = []
183            for report, source in raw:
184                try:
185                    if obj := AirSigmet.from_report(report):
186                        obj.source = source
187                        parsed.append(obj)
188                except Exception as exc:  # noqa: BLE001
189                    exceptions.exception_intercept(exc, raw={"report": report})
190            self.reports = parsed
191        return True
192
193    def along(self, coords: list[Coord]) -> list[AirSigmet]:
194        """Returns available reports the intersect a flight path"""
195        if LineString is None:
196            extra = "shape"
197            raise MissingExtraModule(extra)
198        if self.reports is None:
199            return []
200        path = LineString([c.pair for c in coords])
201        return [r for r in self.reports if r.intersects(path)]
202
203    def contains(self, coord: Coord) -> list[AirSigmet]:
204        """Returns available reports that contain a coordinate"""
205        if self.reports is None:
206            return []
207        return [r for r in self.reports if r.contains(coord)]

Because of the global nature of these report types, we don't initialize a report class with a station ident like the other report types. Instead, we use a class to manage and update the list of all active SIGMET and AIRMET reports.

>>> from avwx import AirSigManager
>>> from avwx.structs import Coord
>>> manager = AirSigManager()
>>> manager.update()
True
>>> manager.last_updated
datetime.datetime(2022, 3, 27, 5, 54, 21, 516741, tzinfo=datetime.timezone.utc)
>>> len(manager.reports)
113
>>> len(manager.contains(Coord(lat=33.12, lon=-105)))
5
>>> manager.reports[0].data.bulletin.type
Code(repr='WA', value='airmet')
>>> manager.reports[0].data.type
'AIRMET SIERRA FOR IFR AND MTN OBSCN'
last_updated: datetime.datetime | None = None
raw: list[str]
reports: list[AirSigmet] | None = None
def update(self, timeout: int = 10, *, disable_post: bool = False) -> bool:
166    def update(self, timeout: int = 10, *, disable_post: bool = False) -> bool:
167        """Updates fetched reports and returns whether they've changed"""
168        return aio.run(self.async_update(timeout, disable_post=disable_post))

Updates fetched reports and returns whether they've changed

async def async_update(self, timeout: int = 10, *, disable_post: bool = False) -> bool:
170    async def async_update(self, timeout: int = 10, *, disable_post: bool = False) -> bool:
171        """Updates fetched reports and returns whether they've changed"""
172        coros = [self._update(i, timeout) for i in range(len(self._services))]
173        data = await aio.gather(*coros)
174        raw = list(chain.from_iterable(data))
175        reports = [i[0] for i in raw]
176        if raw == self._raw:
177            return False
178        self._raw, self.raw = raw, reports
179        self.last_updated = datetime.now(tz=timezone.utc)
180        # Parse reports if not disabled
181        if not disable_post:
182            parsed = []
183            for report, source in raw:
184                try:
185                    if obj := AirSigmet.from_report(report):
186                        obj.source = source
187                        parsed.append(obj)
188                except Exception as exc:  # noqa: BLE001
189                    exceptions.exception_intercept(exc, raw={"report": report})
190            self.reports = parsed
191        return True

Updates fetched reports and returns whether they've changed

def along( self, coords: list[avwx.structs.Coord]) -> list[AirSigmet]:
193    def along(self, coords: list[Coord]) -> list[AirSigmet]:
194        """Returns available reports the intersect a flight path"""
195        if LineString is None:
196            extra = "shape"
197            raise MissingExtraModule(extra)
198        if self.reports is None:
199            return []
200        path = LineString([c.pair for c in coords])
201        return [r for r in self.reports if r.intersects(path)]

Returns available reports the intersect a flight path

def contains( self, coord: avwx.structs.Coord) -> list[AirSigmet]:
203    def contains(self, coord: Coord) -> list[AirSigmet]:
204        """Returns available reports that contain a coordinate"""
205        if self.reports is None:
206            return []
207        return [r for r in self.reports if r.contains(coord)]

Returns available reports that contain a coordinate

def sanitize(report: str) -> str:
603def sanitize(report: str) -> str:
604    """Sanitized AIRMET / SIGMET report string"""
605    report = report.strip(" =")
606    for key, val in _REPLACE.items():
607        report = report.replace(key, val)
608    data = report.split()
609    for i, item in reversed(list(enumerate(data))):
610        # Remove extra element on altitude Ex: FL450Z skip 1000FT
611        if (
612            len(item) > 4
613            and not item[-1].isdigit()
614            and item[-2:] != "FT"
615            and item[-1] != "M"
616            and core.is_altitude(item[:-1])
617        ):
618            data[i] = item[:-1]
619        # Split attached movement direction Ex: NE05KT
620        if len(item) >= 4 and item.endswith(("KT", "KMH")) and item[: _find_first_digit(item)] in CARDINALS:
621            index = _find_first_digit(item)
622            direction = item[:index]
623            data.insert(i + 1, item[index:])
624            data[i] = direction
625    return " ".join(data)

Sanitized AIRMET / SIGMET report string

def parse( report: str, issued: datetime.date | None = None) -> tuple[avwx.structs.AirSigmetData, avwx.structs.Units]:
628def parse(report: str, issued: date | None = None) -> tuple[AirSigmetData, Units]:
629    """Parse AIRMET / SIGMET report string"""
630    units = Units.international()
631    sanitized = sanitize(report)
632    data, bulletin, issuer, time, correction = _header(_parse_prep(sanitized))
633    data, area, report_type, start_time, end_time, station = _spacetime(data)
634    body = sanitized[sanitized.find(" ".join(data[:2])) :]
635    # Trim AIRMET type
636    if data[0] == "AIRMET":
637        with suppress(ValueError):
638            data = data[data.index("<elip>") + 1 :]
639    data, region = _region(data)
640    units, observation, forecast = _observations(data, units, issued)
641    struct = AirSigmetData(
642        raw=report,
643        sanitized=sanitized,
644        station=station,
645        time=core.make_timestamp(time, target_date=issued),
646        remarks=None,
647        bulletin=bulletin,
648        issuer=issuer,
649        correction=correction,
650        area=area,
651        type=report_type,
652        start_time=core.make_timestamp(start_time, target_date=issued),
653        end_time=core.make_timestamp(end_time, target_date=issued),
654        body=body,
655        region=region,
656        observation=observation,
657        forecast=forecast,
658    )
659    return struct, units

Parse AIRMET / SIGMET report string