avwx.current.airsigmet

A SIGMET (Significant Meteorological Information) is a weather advisory for the safety of all aircraft. They are divided into:

  • Convective - thunderstorms, hail, and cyclones
  • Non-Convective - turbulence, icing, dust clouds, volcanic activity, and radiation

An AIRMET (Airman's Meteorological Information) is a weather advisory for smaller aircraft or VFR navigation. They are divided into:

  • Sierra - IFR conditions like low ceilings and mountain obscuration
  • Tango - turbulence and high surface winds
  • Zulu - icing and freezing levels

Both types share a similar report format and therefore are combined into a single handling class. The Bulletin and weather type can be used to classify each as a SIGMET or AIRMET for filtering purposes.

  1"""
  2A SIGMET (Significant Meteorological Information) is a weather advisory for the
  3safety of all aircraft. They are divided into:
  4
  5- Convective - thunderstorms, hail, and cyclones
  6- Non-Convective - turbulence, icing, dust clouds, volcanic activity, and radiation
  7
  8An AIRMET (Airman's Meteorological Information) is a weather advisory for
  9smaller aircraft or VFR navigation. They are divided into:
 10
 11- Sierra - IFR conditions like low ceilings and mountain obscuration
 12- Tango - turbulence and high surface winds
 13- Zulu - icing and freezing levels
 14
 15Both types share a similar report format and therefore are combined into a
 16single handling class. The `Bulletin` and weather type can be used to classify
 17each as a SIGMET or AIRMET for filtering purposes.
 18"""
 19
 20# stdlib
 21from __future__ import annotations
 22
 23import asyncio as aio
 24import re
 25from contextlib import suppress
 26from datetime import date, datetime, timezone
 27from itertools import chain
 28
 29# library
 30from geopy.distance import distance as geo_distance  # type: ignore
 31
 32# module
 33from avwx import exceptions
 34from avwx.base import AVWXBase
 35from avwx.exceptions import MissingExtraModule
 36from avwx.flight_path import to_coordinates
 37from avwx.load_utils import LazyLoad
 38from avwx.parsing import core
 39from avwx.service.bulk import NoaaBulk, NoaaIntl, Service
 40from avwx.static.airsigmet import BULLETIN_TYPES, INTENSITY, WEATHER_TYPES
 41from avwx.static.core import CARDINAL_DEGREES, CARDINALS
 42from avwx.structs import (
 43    AirSigmetData,
 44    AirSigObservation,
 45    Bulletin,
 46    Code,
 47    Coord,
 48    Movement,
 49    Number,
 50    Timestamp,
 51    Units,
 52)
 53
 54try:
 55    from shapely.geometry import LineString  # type: ignore
 56except ModuleNotFoundError:
 57    LineString = None
 58
 59
 60class AirSigmet(AVWXBase):
 61    """
 62    In addition to the manager, you can use the `avwx.AirSigmet` class like any
 63    other report when you supply the report string via `parse` or
 64    `from_report`.
 65
 66    ```python
 67    >>> from avwx import AirSigmet
 68    >>> report = 'WSPR31 SPJC 270529 SPIM SIGMET 3 VALID 270530/270830 SPJC- SPIM LIMA FIR EMBD TS OBS AT 0510Z NE OF LINE S0406 W07103 - S0358 W07225 - S0235 W07432 - S0114 W07503 TOP FL410 MOV SW NC='
 69    >>> sigmet = AirSigmet.from_report(report)
 70    True
 71    >>> sigmet.last_updated
 72    datetime.datetime(2022, 3, 27, 6, 29, 33, 300935, tzinfo=datetime.timezone.utc)
 73    >>> sigmet.data.observation.coords
 74    [Coord(lat=-4.06, lon=-71.03, repr='S0406 W07103'),
 75    Coord(lat=-3.58, lon=-72.25, repr='S0358 W07225'),
 76    Coord(lat=-2.35, lon=-74.32, repr='S0235 W07432'),
 77    Coord(lat=-1.14, lon=-75.03, repr='S0114 W07503')]
 78    >>> sigmet.data.observation.intensity
 79    Code(repr='NC', value='No change')
 80    >>> sigmet.data.observation.ceiling
 81    Number(repr='FL410', value=410, spoken='flight level four one zero')
 82    ```
 83    """
 84
 85    data: AirSigmetData | None = None
 86
 87    def _post_parse(self) -> None:
 88        if self.raw:
 89            self.data, self.units = parse(self.raw, self.issued)
 90
 91    @staticmethod
 92    def sanitize(report: str) -> str:
 93        """Sanitizes the report string"""
 94        return sanitize(report)
 95
 96    def intersects(self, path: LineString) -> bool:
 97        """Returns True if the report area intersects a flight path"""
 98        if LineString is None:
 99            extra = "shape"
100            raise MissingExtraModule(extra)
101        if not self.data:
102            return False
103        for data in (self.data.observation, self.data.forecast):
104            if data:
105                poly = data.poly
106                if poly and path.intersects(poly):
107                    return True
108        return False
109
110    def contains(self, coord: Coord) -> bool:
111        """Returns True if the report area contains a coordinate"""
112        if not self.data:
113            return False
114        for data in (self.data.observation, self.data.forecast):
115            if data:
116                poly = data.poly
117                if poly and coord.point.within(poly):
118                    return True
119        return False
120
121
122class AirSigManager:
123    """
124    Because of the global nature of these report types, we don't initialize a
125    report class with a station ident like the other report types. Instead, we
126    use a class to manage and update the list of all active SIGMET and AIRMET
127    reports.
128
129    ```python
130    >>> from avwx import AirSigManager
131    >>> from avwx.structs import Coord
132    >>> manager = AirSigManager()
133    >>> manager.update()
134    True
135    >>> manager.last_updated
136    datetime.datetime(2022, 3, 27, 5, 54, 21, 516741, tzinfo=datetime.timezone.utc)
137    >>> len(manager.reports)
138    113
139    >>> len(manager.contains(Coord(lat=33.12, lon=-105)))
140    5
141    >>> manager.reports[0].data.bulletin.type
142    Code(repr='WA', value='airmet')
143    >>> manager.reports[0].data.type
144    'AIRMET SIERRA FOR IFR AND MTN OBSCN'
145    ```
146    """
147
148    _services: list[Service]
149    _raw: list[tuple[str, str | None]]
150    last_updated: datetime | None = None
151    raw: list[str]
152    reports: list[AirSigmet] | None = None
153
154    def __init__(self):  # type: ignore
155        self._services = [NoaaBulk("airsigmet"), NoaaIntl("airsigmet")]
156        self._raw, self.raw = [], []
157
158    async def _update(self, index: int, timeout: int) -> list[tuple[str, str | None]]:
159        source = self._services[index].root
160        reports = await self._services[index].async_fetch(timeout=timeout)  # type: ignore
161        raw: list[tuple[str, str | None]] = [(report, source) for report in reports if report]
162        return raw
163
164    def update(self, timeout: int = 10, *, disable_post: bool = False) -> bool:
165        """Updates fetched reports and returns whether they've changed"""
166        return aio.run(self.async_update(timeout, disable_post=disable_post))
167
168    async def async_update(self, timeout: int = 10, *, disable_post: bool = False) -> bool:
169        """Updates fetched reports and returns whether they've changed"""
170        coros = [self._update(i, timeout) for i in range(len(self._services))]
171        data = await aio.gather(*coros)
172        raw = list(chain.from_iterable(data))
173        reports = [i[0] for i in raw]
174        if raw == self._raw:
175            return False
176        self._raw, self.raw = raw, reports
177        self.last_updated = datetime.now(tz=timezone.utc)
178        # Parse reports if not disabled
179        if not disable_post:
180            parsed = []
181            for report, source in raw:
182                try:
183                    if obj := AirSigmet.from_report(report):
184                        obj.source = source
185                        parsed.append(obj)
186                except Exception as exc:  # noqa: BLE001
187                    exceptions.exception_intercept(exc, raw={"report": report})
188            self.reports = parsed
189        return True
190
191    def along(self, coords: list[Coord]) -> list[AirSigmet]:
192        """Returns available reports the intersect a flight path"""
193        if LineString is None:
194            extra = "shape"
195            raise MissingExtraModule(extra)
196        if self.reports is None:
197            return []
198        path = LineString([c.pair for c in coords])
199        return [r for r in self.reports if r.intersects(path)]
200
201    def contains(self, coord: Coord) -> list[AirSigmet]:
202        """Returns available reports that contain a coordinate"""
203        if self.reports is None:
204            return []
205        return [r for r in self.reports if r.contains(coord)]
206
207
208# N1429 W09053 - N1427 W09052 - N1411 W09139 - N1417 W09141
209_COORD_PATTERN = re.compile(r"\b[NS]\d{4} [EW]\d{5}\b( -)?")
210
211# FROM 60NW ISN-INL-TVC-GIJ-UIN-FSD-BIL-60NW ISN
212# FROM 70SSW ISN TO 20NNW FAR TO 70E DLH TO 40SE EAU TO 80SE RAP TO 40NNW BFF TO 70SSW
213_NAVAID_PATTERN = re.compile(r"\b(\d{1,3}[NESW]{1,3} [A-z]{3}-?\b)|((-|(TO )|(FROM ))[A-z]{3}\b)")
214
215# N OF N2050 AND S OF N2900
216_LATTERAL_PATTERN = re.compile(r"\b([NS] OF [NS]\d{2,4})|([EW] OF [EW]\d{3,5})( AND)?\b")
217
218NAVAIDS = LazyLoad("navaids")
219
220# Used to assist parsing after sanitized. Removed after parse
221_FLAGS = {
222    "...": " <elip> ",
223    "..": " <elip> ",
224    ". ": " <break> ",
225    "/VIS ": " <vis> VIS ",
226}
227
228
229def _parse_prep(report: str) -> list[str]:
230    """Prepares sanitized string by replacing elements with flags"""
231    report = report.rstrip(".")
232    for key, val in _FLAGS.items():
233        report = report.replace(key, val)
234    return report.split()
235
236
237def _clean_flags(data: list[str]) -> list[str]:
238    return [i for i in data if i[0] != "<"]
239
240
241def _bulletin(value: str) -> Bulletin:
242    # if len(value) != 6:
243    #     return None
244    type_key = value[:2]
245    return Bulletin(
246        repr=value,
247        type=Code(repr=type_key, value=BULLETIN_TYPES[type_key]),
248        country=value[2:4],
249        number=int(value[4:]),
250    )
251
252
253def _header(data: list[str]) -> tuple[list[str], Bulletin, str, str, str | None]:
254    bulletin = _bulletin(data[0])
255    correction, end = (data[3], 4) if len(data[3]) == 3 else (None, 3)
256    return data[end:], bulletin, data[1], data[2], correction
257
258
259def _spacetime(
260    data: list[str],
261) -> tuple[list[str], str, str, str | None, str, str | None]:
262    area = data.pop(0)
263    # Skip airmet type + time repeat
264    if data[0] == "WA" and data[1].isdigit():
265        data = data[2:]
266        area = area[:-1]  # Remove type label from 3-letter ident
267    valid_index = data.index("VALID")
268    report_type = " ".join(data[:valid_index])
269    data = data[valid_index + 1 :]
270    if data[0] == "UNTIL":
271        start_time = None
272        end_time = data[1]
273        data = data[2:]
274    else:
275        target = "-" if "-" in data[0] else "/"
276        start_time, end_time = data.pop(0).split(target)
277    # KMCO- ORL FIR
278    if data[0][-1] == "-":
279        station = data.pop(0)[:-1]
280    # KMCO - KMCO
281    elif data[1] == "-" and len(data[0]) == 4:
282        station = data.pop(0)
283        data.pop(0)
284    else:
285        station = None
286    return data, area, report_type, start_time, end_time, station
287
288
289def _first_index(data: list[str], *targets: str) -> int:
290    for target in targets:
291        with suppress(ValueError):
292            return data.index(target)
293    return -1
294
295
296def _region(data: list[str]) -> tuple[list[str], str]:
297    # FIR/CTA region name
298    # Or non-standard name using lookahead Ex: FL CSTL WTRS FROM 100SSW
299    name_end = _first_index(data, "FIR", "CTA") + 1 or _first_index(data, "FROM")
300    # State list
301    if not name_end:
302        for item in data:
303            if len(item) == 2:
304                name_end += 1
305            else:
306                break
307    name = " ".join(data[:name_end])
308    return data[name_end:], name
309
310
311def _time(data: list[str], issued: date | None = None) -> tuple[list[str], Timestamp | None, Timestamp | None]:
312    """Extracts the start and/or end time based on a couple starting elements"""
313    index = _first_index(data, "AT", "FCST", "UNTIL", "VALID", "OUTLOOK", "OTLK")
314    if index == -1:
315        return data, None, None
316    start_item = data.pop(index)
317    start, end, observed = None, None, None
318    if "-" in data[index]:
319        start_item, end_item = data.pop(index).split("-")
320        start = core.make_timestamp(start_item, time_only=len(start_item) < 6, target_date=issued)
321        end = core.make_timestamp(end_item, time_only=len(end_item) < 6, target_date=issued)
322    elif len(data[index]) >= 4 and data[index][:4].isdigit():
323        observed = core.make_timestamp(data.pop(index), time_only=True, target_date=issued)
324        if index > 0 and data[index - 1] == "OBS":
325            data.pop(index - 1)
326    for remv in ("FCST", "OUTLOOK", "OTLK", "VALID"):
327        with suppress(ValueError):
328            data.remove(remv)
329    if observed:
330        if start_item in ("UNTIL", "VALID"):
331            end = observed
332        else:
333            start = observed
334    return data, start, end
335
336
337def _coord_value(value: str) -> float:
338    if value[0] in ("N", "S"):
339        index, strip, replace = 3, "N", "S"
340    else:
341        index, strip, replace = 4, "E", "W"
342    num = f"{value[:index]}.{value[index:]}".lstrip(strip).replace(replace, "-")
343    return float(num)
344
345
346def _position(data: list[str]) -> tuple[list[str], Coord | None]:
347    try:
348        index = data.index("PSN")
349    except ValueError:
350        return data, None
351    data.pop(index)
352    raw = f"{data[index]} {data[index + 1]}"
353    lat = _coord_value(data.pop(index))
354    lon = _coord_value(data.pop(index))
355    return data, Coord(lat=lat, lon=lon, repr=raw)
356
357
358def _movement(data: list[str], units: Units) -> tuple[list[str], Units, Movement | None]:
359    with suppress(ValueError):
360        data.remove("STNR")
361        speed = core.make_number("STNR")
362        return data, units, Movement(repr="STNR", direction=None, speed=speed)
363    try:
364        index = data.index("MOV")
365    except ValueError:
366        return data, units, None
367    raw = data.pop(index)
368    direction_str = data.pop(index)
369    # MOV CNL
370    if direction_str == "CNL":
371        return data, units, None
372    raw += f" {direction_str} "
373    # MOV FROM 23040KT
374    if direction_str == "FROM":
375        value = data[index][:3]
376        raw += value
377        direction = core.make_number(value)
378        data[index] = data[index][3:]
379    # MOV E 45KMH
380    else:
381        direction = core.make_number(direction_str.replace("/", ""), literal=True, special=CARDINAL_DEGREES)
382    speed = None
383    with suppress(IndexError):
384        kt_unit, kmh_unit = data[index].endswith("KT"), data[index].endswith("KMH")
385        if kt_unit or kmh_unit:
386            units.wind_speed = "kmh" if kmh_unit else "kt"
387            speed_str = data.pop(index)
388            raw += speed_str
389            # Remove bottom speed Ex: MOV W 05-10KT
390            if "-" in speed_str:
391                speed_str = speed_str[speed_str.find("-") + 1 :]
392            speed = core.make_number(speed_str[: -3 if kmh_unit else -2])
393    return data, units, Movement(repr=raw.strip(), direction=direction, speed=speed)
394
395
396def _info_from_match(match: re.Match, start: int) -> tuple[str, int]:
397    """Returns the matching text and starting location if none yet available"""
398    if start == -1:
399        start = match.start()
400    return match.group(), start
401
402
403def _pre_break(report: str) -> str:
404    break_index = report.find(" <break> ")
405    return report[:break_index] if break_index != -1 else report
406
407
408def _bounds_from_latterals(report: str, start: int) -> tuple[str, list[str], int]:
409    """Extract coordinate latterals from report Ex: N OF N2050"""
410    bounds = []
411    for match in _LATTERAL_PATTERN.finditer(_pre_break(report)):
412        group, start = _info_from_match(match, start)
413        bounds.append(group.removesuffix(" AND"))
414        report = report.replace(group, " ")
415    return report, bounds, start
416
417
418def _coords_from_text(report: str, start: int) -> tuple[str, list[Coord], int]:
419    """Extract raw coordinate values from report Ex: N4409 E01506"""
420    coords = []
421    for match in _COORD_PATTERN.finditer(_pre_break(report)):
422        group, start = _info_from_match(match, start)
423        text = group.strip(" -")
424        lat, lon = text.split()
425        coord = Coord(lat=_coord_value(lat), lon=_coord_value(lon), repr=text)
426        coords.append(coord)
427        report = report.replace(group, " ")
428    return report, coords, start
429
430
431def _coords_from_navaids(report: str, start: int) -> tuple[str, list[Coord], int]:
432    """Extract navaid referenced coordinates from report Ex: 30SSW BNA"""
433    coords, navs = [], []
434    for match in _NAVAID_PATTERN.finditer(_pre_break(report)):
435        group, start = _info_from_match(match, start)
436        report = report.replace(group, " ")
437        group = group.strip("-").removeprefix("FROM ").removeprefix("TO ")
438        navs.append((group, *group.split()))
439    locs = to_coordinates([n[2 if len(n) == 3 else 1] for n in navs])
440    for i, nav in enumerate(navs):
441        value = nav[0]
442        if len(nav) == 3:
443            vector, num_index = nav[1], 0
444            while vector[num_index].isdigit():
445                num_index += 1
446            distance, bearing = (
447                int(vector[:num_index]),
448                CARDINAL_DEGREES[vector[num_index:]],
449            )
450            loc = geo_distance(nautical=distance).destination(locs[i].pair, bearing=bearing)
451            coord = Coord(lat=loc.latitude, lon=loc.longitude, repr=value)
452        else:
453            coord = locs[i]
454            coord.repr = value
455        coords.append(coord)
456    return report, coords, start
457
458
459def _bounds(data: list[str]) -> tuple[list[str], list[Coord], list[str]]:
460    """Extract coordinate bounds by coord, navaid, and latterals"""
461    report, start = " ".join(data), -1
462    report, bounds, start = _bounds_from_latterals(report, start)
463    report, coords, start = _coords_from_text(report, start)
464    report, navs, start = _coords_from_navaids(report, start)
465    coords += navs
466    for target in ("FROM", "WI", "BOUNDED", "OBS"):
467        index = report.find(f"{target} ")
468        if index != -1 and index < start:
469            start = index
470    report = report[:start] + report[report.rfind("  ") :]
471    data = [s for s in report.split() if s]
472    return data, coords, bounds
473
474
475def _altitudes(data: list[str], units: Units) -> tuple[list[str], Units, Number | None, Number | None]:
476    """Extract the floor and ceiling altitudes"""
477    floor, ceiling = None, None
478    for i, item in enumerate(data):
479        # BTN FL180 AND FL330
480        if item == "BTN" and len(data) > i + 2 and data[i + 2] == "AND":
481            floor, units = core.make_altitude(data[i + 1], units)
482            ceiling, units = core.make_altitude(data[i + 3], units)
483            data = data[:i] + data[i + 4 :]
484            break
485        # TOPS ABV FL450
486        if item in ("TOP", "TOPS", "BLW"):
487            if data[i + 1] == "ABV":
488                ceiling = core.make_number(f"ABV {data[i + 2]}")
489                data = data[:i] + data[i + 3 :]
490                break
491            if data[i + 1] == "BLW":
492                ceiling = core.make_number(f"BLW {data[i + 2]}")
493                data = data[:i] + data[i + 3 :]
494                break
495            # TOPS TO FL310
496            if data[i + 1] == "TO":
497                data.pop(i)
498            ceiling, units = core.make_altitude(data[i + 1], units)
499            data = data[:i] + data[i + 2 :]
500            # CIG BLW 010
501            if data[i - 1] == "CIG":
502                data.pop(i - 1)
503            break
504        # FL060/300 SFC/FL160
505        if core.is_altitude(item):
506            if "/" in item:
507                floor_val, ceiling_val = item.split("/")
508                floor, units = core.make_altitude(floor_val, units)
509                if (floor_val == "SFC" or floor_val[:2] == "FL") and ceiling_val[:2] != "FL":
510                    ceiling, units = core.make_altitude(ceiling_val, units, force_fl=True)
511                else:
512                    ceiling, units = core.make_altitude(ceiling_val, units)
513            else:
514                ceiling, units = core.make_altitude(item, units)
515            data.pop(i)
516            break
517    return data, units, floor, ceiling
518
519
520def _weather_type(data: list[str]) -> tuple[list[str], Code | None]:
521    weather = None
522    report = " ".join(data)
523    for key, val in WEATHER_TYPES.items():
524        if key in report:
525            weather = Code(repr=key, value=val)
526            data = [i for i in report.replace(key, "").split() if i]
527            break
528    return data, weather
529
530
531def _intensity(data: list[str]) -> tuple[list[str], Code | None]:
532    if not data:
533        return data, None
534    try:
535        value = INTENSITY[data[-1]]
536        code = data.pop()
537        return data, Code(repr=code, value=value)
538    except KeyError:
539        return data, None
540
541
542def _sigmet_observation(data: list[str], units: Units, issued: date | None = None) -> tuple[AirSigObservation, Units]:
543    data, start_time, end_time = _time(data, issued)
544    data, position = _position(data)
545    data, coords, bounds = _bounds(data)
546    data, units, movement = _movement(data, units)
547    data, intensity = _intensity(data)
548    data, units, floor, ceiling = _altitudes(data, units)
549    data, weather = _weather_type(data)
550    struct = AirSigObservation(
551        type=weather,
552        start_time=start_time,
553        end_time=end_time,
554        position=position,
555        floor=floor,
556        ceiling=ceiling,
557        coords=coords,
558        bounds=bounds,
559        movement=movement,
560        intensity=intensity,
561        other=_clean_flags(data),
562    )
563    return struct, units
564
565
566def _observations(
567    data: list[str], units: Units, issued: date | None = None
568) -> tuple[Units, AirSigObservation | None, AirSigObservation | None]:
569    observation, forecast, forecast_index = None, None, -1
570    forecast_index = _first_index(data, "FCST", "OUTLOOK", "OTLK")
571    if forecast_index == -1:
572        observation, units = _sigmet_observation(data, units, issued)
573    # 6 is arbitrary. Will likely change or be more precise later
574    elif forecast_index < 6:
575        forecast, units = _sigmet_observation(data, units, issued)
576    else:
577        observation, units = _sigmet_observation(data[:forecast_index], units, issued)
578        forecast, units = _sigmet_observation(data[forecast_index:], units, issued)
579    return units, observation, forecast
580
581
582_REPLACE = {
583    " MO V ": " MOV ",
584    " STNRY": " STNR",
585    " STCNRY": " STNR",
586    " N-NE ": " NNE ",
587    " N-NW ": " NNW ",
588    " E-NE ": " ENE ",
589    " E-SE ": " ESE ",
590    " S-SE ": " SSE ",
591    " S-SW ": " SSW ",
592    " W-SW ": " WSW ",
593    " W-NW ": " WNW ",
594}
595
596
597def _find_first_digit(item: str) -> int:
598    return next((i for i, char in enumerate(item) if char.isdigit()), -1)
599
600
601def sanitize(report: str) -> str:
602    """Sanitized AIRMET / SIGMET report string"""
603    report = report.strip(" =")
604    for key, val in _REPLACE.items():
605        report = report.replace(key, val)
606    data = report.split()
607    for i, item in reversed(list(enumerate(data))):
608        # Remove extra element on altitude Ex: FL450Z skip 1000FT
609        if (
610            len(item) > 4
611            and not item[-1].isdigit()
612            and item[-2:] != "FT"
613            and item[-1] != "M"
614            and core.is_altitude(item[:-1])
615        ):
616            data[i] = item[:-1]
617        # Split attached movement direction Ex: NE05KT
618        if len(item) >= 4 and item.endswith(("KT", "KMH")) and item[: _find_first_digit(item)] in CARDINALS:
619            index = _find_first_digit(item)
620            direction = item[:index]
621            data.insert(i + 1, item[index:])
622            data[i] = direction
623    return " ".join(data)
624
625
626def parse(report: str, issued: date | None = None) -> tuple[AirSigmetData, Units]:
627    """Parse AIRMET / SIGMET report string"""
628    units = Units.international()
629    sanitized = sanitize(report)
630    data, bulletin, issuer, time, correction = _header(_parse_prep(sanitized))
631    data, area, report_type, start_time, end_time, station = _spacetime(data)
632    body = sanitized[sanitized.find(" ".join(data[:2])) :]
633    # Trim AIRMET type
634    if data[0] == "AIRMET":
635        with suppress(ValueError):
636            data = data[data.index("<elip>") + 1 :]
637    data, region = _region(data)
638    units, observation, forecast = _observations(data, units, issued)
639    struct = AirSigmetData(
640        raw=report,
641        sanitized=sanitized,
642        station=station,
643        time=core.make_timestamp(time, target_date=issued),
644        remarks=None,
645        bulletin=bulletin,
646        issuer=issuer,
647        correction=correction,
648        area=area,
649        type=report_type,
650        start_time=core.make_timestamp(start_time, target_date=issued),
651        end_time=core.make_timestamp(end_time, target_date=issued),
652        body=body,
653        region=region,
654        observation=observation,
655        forecast=forecast,
656    )
657    return struct, units
class AirSigmet(avwx.base.AVWXBase):
 61class AirSigmet(AVWXBase):
 62    """
 63    In addition to the manager, you can use the `avwx.AirSigmet` class like any
 64    other report when you supply the report string via `parse` or
 65    `from_report`.
 66
 67    ```python
 68    >>> from avwx import AirSigmet
 69    >>> report = 'WSPR31 SPJC 270529 SPIM SIGMET 3 VALID 270530/270830 SPJC- SPIM LIMA FIR EMBD TS OBS AT 0510Z NE OF LINE S0406 W07103 - S0358 W07225 - S0235 W07432 - S0114 W07503 TOP FL410 MOV SW NC='
 70    >>> sigmet = AirSigmet.from_report(report)
 71    True
 72    >>> sigmet.last_updated
 73    datetime.datetime(2022, 3, 27, 6, 29, 33, 300935, tzinfo=datetime.timezone.utc)
 74    >>> sigmet.data.observation.coords
 75    [Coord(lat=-4.06, lon=-71.03, repr='S0406 W07103'),
 76    Coord(lat=-3.58, lon=-72.25, repr='S0358 W07225'),
 77    Coord(lat=-2.35, lon=-74.32, repr='S0235 W07432'),
 78    Coord(lat=-1.14, lon=-75.03, repr='S0114 W07503')]
 79    >>> sigmet.data.observation.intensity
 80    Code(repr='NC', value='No change')
 81    >>> sigmet.data.observation.ceiling
 82    Number(repr='FL410', value=410, spoken='flight level four one zero')
 83    ```
 84    """
 85
 86    data: AirSigmetData | None = None
 87
 88    def _post_parse(self) -> None:
 89        if self.raw:
 90            self.data, self.units = parse(self.raw, self.issued)
 91
 92    @staticmethod
 93    def sanitize(report: str) -> str:
 94        """Sanitizes the report string"""
 95        return sanitize(report)
 96
 97    def intersects(self, path: LineString) -> bool:
 98        """Returns True if the report area intersects a flight path"""
 99        if LineString is None:
100            extra = "shape"
101            raise MissingExtraModule(extra)
102        if not self.data:
103            return False
104        for data in (self.data.observation, self.data.forecast):
105            if data:
106                poly = data.poly
107                if poly and path.intersects(poly):
108                    return True
109        return False
110
111    def contains(self, coord: Coord) -> bool:
112        """Returns True if the report area contains a coordinate"""
113        if not self.data:
114            return False
115        for data in (self.data.observation, self.data.forecast):
116            if data:
117                poly = data.poly
118                if poly and coord.point.within(poly):
119                    return True
120        return False

In addition to the manager, you can use the avwx.AirSigmet class like any other report when you supply the report string via parse or from_report.

>>> from avwx import AirSigmet
>>> report = 'WSPR31 SPJC 270529 SPIM SIGMET 3 VALID 270530/270830 SPJC- SPIM LIMA FIR EMBD TS OBS AT 0510Z NE OF LINE S0406 W07103 - S0358 W07225 - S0235 W07432 - S0114 W07503 TOP FL410 MOV SW NC='
>>> sigmet = AirSigmet.from_report(report)
True
>>> sigmet.last_updated
datetime.datetime(2022, 3, 27, 6, 29, 33, 300935, tzinfo=datetime.timezone.utc)
>>> sigmet.data.observation.coords
[Coord(lat=-4.06, lon=-71.03, repr='S0406 W07103'),
Coord(lat=-3.58, lon=-72.25, repr='S0358 W07225'),
Coord(lat=-2.35, lon=-74.32, repr='S0235 W07432'),
Coord(lat=-1.14, lon=-75.03, repr='S0114 W07503')]
>>> sigmet.data.observation.intensity
Code(repr='NC', value='No change')
>>> sigmet.data.observation.ceiling
Number(repr='FL410', value=410, spoken='flight level four one zero')
data: avwx.structs.AirSigmetData | None = None
@staticmethod
def sanitize(report: str) -> str:
92    @staticmethod
93    def sanitize(report: str) -> str:
94        """Sanitizes the report string"""
95        return sanitize(report)

Sanitizes the report string

def intersects(self, path: shapely.geometry.linestring.LineString) -> bool:
 97    def intersects(self, path: LineString) -> bool:
 98        """Returns True if the report area intersects a flight path"""
 99        if LineString is None:
100            extra = "shape"
101            raise MissingExtraModule(extra)
102        if not self.data:
103            return False
104        for data in (self.data.observation, self.data.forecast):
105            if data:
106                poly = data.poly
107                if poly and path.intersects(poly):
108                    return True
109        return False

Returns True if the report area intersects a flight path

def contains(self, coord: avwx.structs.Coord) -> bool:
111    def contains(self, coord: Coord) -> bool:
112        """Returns True if the report area contains a coordinate"""
113        if not self.data:
114            return False
115        for data in (self.data.observation, self.data.forecast):
116            if data:
117                poly = data.poly
118                if poly and coord.point.within(poly):
119                    return True
120        return False

Returns True if the report area contains a coordinate

class AirSigManager:
123class AirSigManager:
124    """
125    Because of the global nature of these report types, we don't initialize a
126    report class with a station ident like the other report types. Instead, we
127    use a class to manage and update the list of all active SIGMET and AIRMET
128    reports.
129
130    ```python
131    >>> from avwx import AirSigManager
132    >>> from avwx.structs import Coord
133    >>> manager = AirSigManager()
134    >>> manager.update()
135    True
136    >>> manager.last_updated
137    datetime.datetime(2022, 3, 27, 5, 54, 21, 516741, tzinfo=datetime.timezone.utc)
138    >>> len(manager.reports)
139    113
140    >>> len(manager.contains(Coord(lat=33.12, lon=-105)))
141    5
142    >>> manager.reports[0].data.bulletin.type
143    Code(repr='WA', value='airmet')
144    >>> manager.reports[0].data.type
145    'AIRMET SIERRA FOR IFR AND MTN OBSCN'
146    ```
147    """
148
149    _services: list[Service]
150    _raw: list[tuple[str, str | None]]
151    last_updated: datetime | None = None
152    raw: list[str]
153    reports: list[AirSigmet] | None = None
154
155    def __init__(self):  # type: ignore
156        self._services = [NoaaBulk("airsigmet"), NoaaIntl("airsigmet")]
157        self._raw, self.raw = [], []
158
159    async def _update(self, index: int, timeout: int) -> list[tuple[str, str | None]]:
160        source = self._services[index].root
161        reports = await self._services[index].async_fetch(timeout=timeout)  # type: ignore
162        raw: list[tuple[str, str | None]] = [(report, source) for report in reports if report]
163        return raw
164
165    def update(self, timeout: int = 10, *, disable_post: bool = False) -> bool:
166        """Updates fetched reports and returns whether they've changed"""
167        return aio.run(self.async_update(timeout, disable_post=disable_post))
168
169    async def async_update(self, timeout: int = 10, *, disable_post: bool = False) -> bool:
170        """Updates fetched reports and returns whether they've changed"""
171        coros = [self._update(i, timeout) for i in range(len(self._services))]
172        data = await aio.gather(*coros)
173        raw = list(chain.from_iterable(data))
174        reports = [i[0] for i in raw]
175        if raw == self._raw:
176            return False
177        self._raw, self.raw = raw, reports
178        self.last_updated = datetime.now(tz=timezone.utc)
179        # Parse reports if not disabled
180        if not disable_post:
181            parsed = []
182            for report, source in raw:
183                try:
184                    if obj := AirSigmet.from_report(report):
185                        obj.source = source
186                        parsed.append(obj)
187                except Exception as exc:  # noqa: BLE001
188                    exceptions.exception_intercept(exc, raw={"report": report})
189            self.reports = parsed
190        return True
191
192    def along(self, coords: list[Coord]) -> list[AirSigmet]:
193        """Returns available reports the intersect a flight path"""
194        if LineString is None:
195            extra = "shape"
196            raise MissingExtraModule(extra)
197        if self.reports is None:
198            return []
199        path = LineString([c.pair for c in coords])
200        return [r for r in self.reports if r.intersects(path)]
201
202    def contains(self, coord: Coord) -> list[AirSigmet]:
203        """Returns available reports that contain a coordinate"""
204        if self.reports is None:
205            return []
206        return [r for r in self.reports if r.contains(coord)]

Because of the global nature of these report types, we don't initialize a report class with a station ident like the other report types. Instead, we use a class to manage and update the list of all active SIGMET and AIRMET reports.

>>> from avwx import AirSigManager
>>> from avwx.structs import Coord
>>> manager = AirSigManager()
>>> manager.update()
True
>>> manager.last_updated
datetime.datetime(2022, 3, 27, 5, 54, 21, 516741, tzinfo=datetime.timezone.utc)
>>> len(manager.reports)
113
>>> len(manager.contains(Coord(lat=33.12, lon=-105)))
5
>>> manager.reports[0].data.bulletin.type
Code(repr='WA', value='airmet')
>>> manager.reports[0].data.type
'AIRMET SIERRA FOR IFR AND MTN OBSCN'
last_updated: datetime.datetime | None = None
raw: list[str]
reports: list[AirSigmet] | None = None
def update(self, timeout: int = 10, *, disable_post: bool = False) -> bool:
165    def update(self, timeout: int = 10, *, disable_post: bool = False) -> bool:
166        """Updates fetched reports and returns whether they've changed"""
167        return aio.run(self.async_update(timeout, disable_post=disable_post))

Updates fetched reports and returns whether they've changed

async def async_update(self, timeout: int = 10, *, disable_post: bool = False) -> bool:
169    async def async_update(self, timeout: int = 10, *, disable_post: bool = False) -> bool:
170        """Updates fetched reports and returns whether they've changed"""
171        coros = [self._update(i, timeout) for i in range(len(self._services))]
172        data = await aio.gather(*coros)
173        raw = list(chain.from_iterable(data))
174        reports = [i[0] for i in raw]
175        if raw == self._raw:
176            return False
177        self._raw, self.raw = raw, reports
178        self.last_updated = datetime.now(tz=timezone.utc)
179        # Parse reports if not disabled
180        if not disable_post:
181            parsed = []
182            for report, source in raw:
183                try:
184                    if obj := AirSigmet.from_report(report):
185                        obj.source = source
186                        parsed.append(obj)
187                except Exception as exc:  # noqa: BLE001
188                    exceptions.exception_intercept(exc, raw={"report": report})
189            self.reports = parsed
190        return True

Updates fetched reports and returns whether they've changed

def along( self, coords: list[avwx.structs.Coord]) -> list[AirSigmet]:
192    def along(self, coords: list[Coord]) -> list[AirSigmet]:
193        """Returns available reports the intersect a flight path"""
194        if LineString is None:
195            extra = "shape"
196            raise MissingExtraModule(extra)
197        if self.reports is None:
198            return []
199        path = LineString([c.pair for c in coords])
200        return [r for r in self.reports if r.intersects(path)]

Returns available reports the intersect a flight path

def contains( self, coord: avwx.structs.Coord) -> list[AirSigmet]:
202    def contains(self, coord: Coord) -> list[AirSigmet]:
203        """Returns available reports that contain a coordinate"""
204        if self.reports is None:
205            return []
206        return [r for r in self.reports if r.contains(coord)]

Returns available reports that contain a coordinate

def sanitize(report: str) -> str:
602def sanitize(report: str) -> str:
603    """Sanitized AIRMET / SIGMET report string"""
604    report = report.strip(" =")
605    for key, val in _REPLACE.items():
606        report = report.replace(key, val)
607    data = report.split()
608    for i, item in reversed(list(enumerate(data))):
609        # Remove extra element on altitude Ex: FL450Z skip 1000FT
610        if (
611            len(item) > 4
612            and not item[-1].isdigit()
613            and item[-2:] != "FT"
614            and item[-1] != "M"
615            and core.is_altitude(item[:-1])
616        ):
617            data[i] = item[:-1]
618        # Split attached movement direction Ex: NE05KT
619        if len(item) >= 4 and item.endswith(("KT", "KMH")) and item[: _find_first_digit(item)] in CARDINALS:
620            index = _find_first_digit(item)
621            direction = item[:index]
622            data.insert(i + 1, item[index:])
623            data[i] = direction
624    return " ".join(data)

Sanitized AIRMET / SIGMET report string

def parse( report: str, issued: datetime.date | None = None) -> tuple[avwx.structs.AirSigmetData, avwx.structs.Units]:
627def parse(report: str, issued: date | None = None) -> tuple[AirSigmetData, Units]:
628    """Parse AIRMET / SIGMET report string"""
629    units = Units.international()
630    sanitized = sanitize(report)
631    data, bulletin, issuer, time, correction = _header(_parse_prep(sanitized))
632    data, area, report_type, start_time, end_time, station = _spacetime(data)
633    body = sanitized[sanitized.find(" ".join(data[:2])) :]
634    # Trim AIRMET type
635    if data[0] == "AIRMET":
636        with suppress(ValueError):
637            data = data[data.index("<elip>") + 1 :]
638    data, region = _region(data)
639    units, observation, forecast = _observations(data, units, issued)
640    struct = AirSigmetData(
641        raw=report,
642        sanitized=sanitized,
643        station=station,
644        time=core.make_timestamp(time, target_date=issued),
645        remarks=None,
646        bulletin=bulletin,
647        issuer=issuer,
648        correction=correction,
649        area=area,
650        type=report_type,
651        start_time=core.make_timestamp(start_time, target_date=issued),
652        end_time=core.make_timestamp(end_time, target_date=issued),
653        body=body,
654        region=region,
655        observation=observation,
656        forecast=forecast,
657    )
658    return struct, units

Parse AIRMET / SIGMET report string