avwx.current.airsigmet
A SIGMET (Significant Meteorological Information) is a weather advisory for the safety of all aircraft. They are divided into:
- Convective - thunderstorms, hail, and cyclones
- Non-Convective - turbulence, icing, dust clouds, volcanic activity, and radiation
An AIRMET (Airman's Meteorological Information) is a weather advisory for smaller aircraft or VFR navigation. They are divided into:
- Sierra - IFR conditions like low ceilings and mountain obscuration
- Tango - turbulence and high surface winds
- Zulu - icing and freezing levels
Both types share a similar report format and therefore are combined into a
single handling class. The Bulletin and weather type can be used to classify
each as a SIGMET or AIRMET for filtering purposes.
1""" 2A SIGMET (Significant Meteorological Information) is a weather advisory for the 3safety of all aircraft. They are divided into: 4 5- Convective - thunderstorms, hail, and cyclones 6- Non-Convective - turbulence, icing, dust clouds, volcanic activity, and radiation 7 8An AIRMET (Airman's Meteorological Information) is a weather advisory for 9smaller aircraft or VFR navigation. They are divided into: 10 11- Sierra - IFR conditions like low ceilings and mountain obscuration 12- Tango - turbulence and high surface winds 13- Zulu - icing and freezing levels 14 15Both types share a similar report format and therefore are combined into a 16single handling class. The `Bulletin` and weather type can be used to classify 17each as a SIGMET or AIRMET for filtering purposes. 18""" 19 20# stdlib 21from __future__ import annotations 22 23import asyncio as aio 24import re 25from contextlib import suppress 26from datetime import date, datetime, timezone 27from itertools import chain 28from typing import TypeAlias 29 30# library 31from geopy.distance import distance as geo_distance # type: ignore 32 33# module 34from avwx import exceptions 35from avwx.base import AVWXBase 36from avwx.exceptions import MissingExtraModule 37from avwx.flight_path import to_coordinates 38from avwx.load_utils import LazyLoad 39from avwx.parsing import core 40from avwx.service.bulk import NoaaBulk, NoaaIntl, Service 41from avwx.static.airsigmet import BULLETIN_TYPES, INTENSITY, WEATHER_TYPES 42from avwx.static.core import CARDINAL_DEGREES, CARDINALS 43from avwx.structs import ( 44 AirSigmetData, 45 AirSigObservation, 46 Bulletin, 47 Code, 48 Coord, 49 Movement, 50 Number, 51 Timestamp, 52 Units, 53) 54 55try: 56 from shapely.geometry import LineString 57except ModuleNotFoundError: 58 LineString = TypeAlias # type: ignore 59 60 61class AirSigmet(AVWXBase): 62 """ 63 In addition to the manager, you can use the `avwx.AirSigmet` class like any 64 other report when you supply the report string via `parse` or 65 `from_report`. 66 67 ```python 68 >>> from avwx import AirSigmet 69 >>> report = 'WSPR31 SPJC 270529 SPIM SIGMET 3 VALID 270530/270830 SPJC- SPIM LIMA FIR EMBD TS OBS AT 0510Z NE OF LINE S0406 W07103 - S0358 W07225 - S0235 W07432 - S0114 W07503 TOP FL410 MOV SW NC=' 70 >>> sigmet = AirSigmet.from_report(report) 71 True 72 >>> sigmet.last_updated 73 datetime.datetime(2022, 3, 27, 6, 29, 33, 300935, tzinfo=datetime.timezone.utc) 74 >>> sigmet.data.observation.coords 75 [Coord(lat=-4.06, lon=-71.03, repr='S0406 W07103'), 76 Coord(lat=-3.58, lon=-72.25, repr='S0358 W07225'), 77 Coord(lat=-2.35, lon=-74.32, repr='S0235 W07432'), 78 Coord(lat=-1.14, lon=-75.03, repr='S0114 W07503')] 79 >>> sigmet.data.observation.intensity 80 Code(repr='NC', value='No change') 81 >>> sigmet.data.observation.ceiling 82 Number(repr='FL410', value=410, spoken='flight level four one zero') 83 ``` 84 """ 85 86 data: AirSigmetData | None = None 87 88 def _post_parse(self) -> None: 89 if self.raw: 90 self.data, self.units = parse(self.raw, self.issued) 91 92 @staticmethod 93 def sanitize(report: str) -> str: 94 """Sanitizes the report string""" 95 return sanitize(report) 96 97 def intersects(self, path: LineString) -> bool: 98 """Returns True if the report area intersects a flight path""" 99 if LineString is None: 100 extra = "shape" 101 raise MissingExtraModule(extra) 102 if not self.data: 103 return False 104 for data in (self.data.observation, self.data.forecast): 105 if data: 106 poly = data.poly 107 if poly and path.intersects(poly): 108 return True 109 return False 110 111 def contains(self, coord: Coord) -> bool: 112 """Returns True if the report area contains a coordinate""" 113 if not self.data: 114 return False 115 for data in (self.data.observation, self.data.forecast): 116 if data: 117 poly = data.poly 118 if poly and coord.point.within(poly): 119 return True 120 return False 121 122 123class AirSigManager: 124 """ 125 Because of the global nature of these report types, we don't initialize a 126 report class with a station ident like the other report types. Instead, we 127 use a class to manage and update the list of all active SIGMET and AIRMET 128 reports. 129 130 ```python 131 >>> from avwx import AirSigManager 132 >>> from avwx.structs import Coord 133 >>> manager = AirSigManager() 134 >>> manager.update() 135 True 136 >>> manager.last_updated 137 datetime.datetime(2022, 3, 27, 5, 54, 21, 516741, tzinfo=datetime.timezone.utc) 138 >>> len(manager.reports) 139 113 140 >>> len(manager.contains(Coord(lat=33.12, lon=-105))) 141 5 142 >>> manager.reports[0].data.bulletin.type 143 Code(repr='WA', value='airmet') 144 >>> manager.reports[0].data.type 145 'AIRMET SIERRA FOR IFR AND MTN OBSCN' 146 ``` 147 """ 148 149 _services: list[Service] 150 _raw: list[tuple[str, str | None]] 151 last_updated: datetime | None = None 152 raw: list[str] 153 reports: list[AirSigmet] | None = None 154 155 def __init__(self): # type: ignore 156 self._services = [NoaaBulk("airsigmet"), NoaaIntl("airsigmet")] 157 self._raw, self.raw = [], [] 158 159 async def _update(self, index: int, timeout: int) -> list[tuple[str, str | None]]: 160 source = self._services[index].root 161 reports = await self._services[index].async_fetch(timeout=timeout) # type: ignore 162 raw: list[tuple[str, str | None]] = [(report, source) for report in reports if report] 163 return raw 164 165 def update(self, timeout: int = 10, *, disable_post: bool = False) -> bool: 166 """Updates fetched reports and returns whether they've changed""" 167 return aio.run(self.async_update(timeout, disable_post=disable_post)) 168 169 async def async_update(self, timeout: int = 10, *, disable_post: bool = False) -> bool: 170 """Updates fetched reports and returns whether they've changed""" 171 coros = [self._update(i, timeout) for i in range(len(self._services))] 172 data = await aio.gather(*coros) 173 raw = list(chain.from_iterable(data)) 174 reports = [i[0] for i in raw] 175 if raw == self._raw: 176 return False 177 self._raw, self.raw = raw, reports 178 self.last_updated = datetime.now(tz=timezone.utc) 179 # Parse reports if not disabled 180 if not disable_post: 181 parsed = [] 182 for report, source in raw: 183 try: 184 if obj := AirSigmet.from_report(report): 185 obj.source = source 186 parsed.append(obj) 187 except Exception as exc: # noqa: BLE001 188 exceptions.exception_intercept(exc, raw={"report": report}) 189 self.reports = parsed 190 return True 191 192 def along(self, coords: list[Coord]) -> list[AirSigmet]: 193 """Returns available reports the intersect a flight path""" 194 if LineString is None: 195 extra = "shape" 196 raise MissingExtraModule(extra) 197 if self.reports is None: 198 return [] 199 path = LineString([c.pair for c in coords]) 200 return [r for r in self.reports if r.intersects(path)] 201 202 def contains(self, coord: Coord) -> list[AirSigmet]: 203 """Returns available reports that contain a coordinate""" 204 if self.reports is None: 205 return [] 206 return [r for r in self.reports if r.contains(coord)] 207 208 209# N1429 W09053 - N1427 W09052 - N1411 W09139 - N1417 W09141 210_COORD_PATTERN = re.compile(r"\b[NS]\d{4} [EW]\d{5}\b( -)?") 211 212# FROM 60NW ISN-INL-TVC-GIJ-UIN-FSD-BIL-60NW ISN 213# FROM 70SSW ISN TO 20NNW FAR TO 70E DLH TO 40SE EAU TO 80SE RAP TO 40NNW BFF TO 70SSW 214_NAVAID_PATTERN = re.compile(r"\b(\d{1,3}[NESW]{1,3} [A-z]{3}-?\b)|((-|(TO )|(FROM ))[A-z]{3}\b)") 215 216# N OF N2050 AND S OF N2900 217_LATTERAL_PATTERN = re.compile(r"\b([NS] OF [NS]\d{2,4})|([EW] OF [EW]\d{3,5})( AND)?\b") 218 219NAVAIDS = LazyLoad("navaids") 220 221# Used to assist parsing after sanitized. Removed after parse 222_FLAGS = { 223 "...": " <elip> ", 224 "..": " <elip> ", 225 ". ": " <break> ", 226 "/VIS ": " <vis> VIS ", 227} 228 229 230def _parse_prep(report: str) -> list[str]: 231 """Prepares sanitized string by replacing elements with flags""" 232 report = report.rstrip(".") 233 for key, val in _FLAGS.items(): 234 report = report.replace(key, val) 235 return report.split() 236 237 238def _clean_flags(data: list[str]) -> list[str]: 239 return [i for i in data if i[0] != "<"] 240 241 242def _bulletin(value: str) -> Bulletin: 243 # if len(value) != 6: 244 # return None 245 type_key = value[:2] 246 return Bulletin( 247 repr=value, 248 type=Code(repr=type_key, value=BULLETIN_TYPES[type_key]), 249 country=value[2:4], 250 number=int(value[4:]), 251 ) 252 253 254def _header(data: list[str]) -> tuple[list[str], Bulletin, str, str, str | None]: 255 bulletin = _bulletin(data[0]) 256 correction, end = (data[3], 4) if len(data[3]) == 3 else (None, 3) 257 return data[end:], bulletin, data[1], data[2], correction 258 259 260def _spacetime( 261 data: list[str], 262) -> tuple[list[str], str, str, str | None, str, str | None]: 263 area = data.pop(0) 264 # Skip airmet type + time repeat 265 if data[0] == "WA" and data[1].isdigit(): 266 data = data[2:] 267 area = area[:-1] # Remove type label from 3-letter ident 268 valid_index = data.index("VALID") 269 report_type = " ".join(data[:valid_index]) 270 data = data[valid_index + 1 :] 271 if data[0] == "UNTIL": 272 start_time = None 273 end_time = data[1] 274 data = data[2:] 275 else: 276 target = "-" if "-" in data[0] else "/" 277 start_time, end_time = data.pop(0).split(target) 278 # KMCO- ORL FIR 279 if data[0][-1] == "-": 280 station = data.pop(0)[:-1] 281 # KMCO - KMCO 282 elif data[1] == "-" and len(data[0]) == 4: 283 station = data.pop(0) 284 data.pop(0) 285 else: 286 station = None 287 return data, area, report_type, start_time, end_time, station 288 289 290def _first_index(data: list[str], *targets: str) -> int: 291 for target in targets: 292 with suppress(ValueError): 293 return data.index(target) 294 return -1 295 296 297def _region(data: list[str]) -> tuple[list[str], str]: 298 # FIR/CTA region name 299 # Or non-standard name using lookahead Ex: FL CSTL WTRS FROM 100SSW 300 name_end = _first_index(data, "FIR", "CTA") + 1 or _first_index(data, "FROM") 301 # State list 302 if not name_end: 303 for item in data: 304 if len(item) == 2: 305 name_end += 1 306 else: 307 break 308 name = " ".join(data[:name_end]) 309 return data[name_end:], name 310 311 312def _time(data: list[str], issued: date | None = None) -> tuple[list[str], Timestamp | None, Timestamp | None]: 313 """Extracts the start and/or end time based on a couple starting elements""" 314 index = _first_index(data, "AT", "FCST", "UNTIL", "VALID", "OUTLOOK", "OTLK") 315 if index == -1: 316 return data, None, None 317 start_item = data.pop(index) 318 start, end, observed = None, None, None 319 if "-" in data[index]: 320 start_item, end_item = data.pop(index).split("-") 321 start = core.make_timestamp(start_item, time_only=len(start_item) < 6, target_date=issued) 322 end = core.make_timestamp(end_item, time_only=len(end_item) < 6, target_date=issued) 323 elif len(data[index]) >= 4 and data[index][:4].isdigit(): 324 observed = core.make_timestamp(data.pop(index), time_only=True, target_date=issued) 325 if index > 0 and data[index - 1] == "OBS": 326 data.pop(index - 1) 327 for remv in ("FCST", "OUTLOOK", "OTLK", "VALID"): 328 with suppress(ValueError): 329 data.remove(remv) 330 if observed: 331 if start_item in ("UNTIL", "VALID"): 332 end = observed 333 else: 334 start = observed 335 return data, start, end 336 337 338def _coord_value(value: str) -> float: 339 if value[0] in ("N", "S"): 340 index, strip, replace = 3, "N", "S" 341 else: 342 index, strip, replace = 4, "E", "W" 343 num = f"{value[:index]}.{value[index:]}".lstrip(strip).replace(replace, "-") 344 return float(num) 345 346 347def _position(data: list[str]) -> tuple[list[str], Coord | None]: 348 try: 349 index = data.index("PSN") 350 except ValueError: 351 return data, None 352 data.pop(index) 353 raw = f"{data[index]} {data[index + 1]}" 354 lat = _coord_value(data.pop(index)) 355 lon = _coord_value(data.pop(index)) 356 return data, Coord(lat=lat, lon=lon, repr=raw) 357 358 359def _movement(data: list[str], units: Units) -> tuple[list[str], Units, Movement | None]: 360 with suppress(ValueError): 361 data.remove("STNR") 362 speed = core.make_number("STNR") 363 return data, units, Movement(repr="STNR", direction=None, speed=speed) 364 try: 365 index = data.index("MOV") 366 except ValueError: 367 return data, units, None 368 raw = data.pop(index) 369 direction_str = data.pop(index) 370 # MOV CNL 371 if direction_str == "CNL": 372 return data, units, None 373 raw += f" {direction_str} " 374 # MOV FROM 23040KT 375 if direction_str == "FROM": 376 value = data[index][:3] 377 raw += value 378 direction = core.make_number(value) 379 data[index] = data[index][3:] 380 # MOV E 45KMH 381 else: 382 direction = core.make_number(direction_str.replace("/", ""), literal=True, special=CARDINAL_DEGREES) 383 speed = None 384 with suppress(IndexError): 385 kt_unit, kmh_unit = data[index].endswith("KT"), data[index].endswith("KMH") 386 if kt_unit or kmh_unit: 387 units.wind_speed = "kmh" if kmh_unit else "kt" 388 speed_str = data.pop(index) 389 raw += speed_str 390 # Remove bottom speed Ex: MOV W 05-10KT 391 if "-" in speed_str: 392 speed_str = speed_str[speed_str.find("-") + 1 :] 393 speed = core.make_number(speed_str[: -3 if kmh_unit else -2]) 394 return data, units, Movement(repr=raw.strip(), direction=direction, speed=speed) 395 396 397def _info_from_match(match: re.Match, start: int) -> tuple[str, int]: 398 """Returns the matching text and starting location if none yet available""" 399 if start == -1: 400 start = match.start() 401 return match.group(), start 402 403 404def _pre_break(report: str) -> str: 405 break_index = report.find(" <break> ") 406 return report[:break_index] if break_index != -1 else report 407 408 409def _bounds_from_latterals(report: str, start: int) -> tuple[str, list[str], int]: 410 """Extract coordinate latterals from report Ex: N OF N2050""" 411 bounds = [] 412 for match in _LATTERAL_PATTERN.finditer(_pre_break(report)): 413 group, start = _info_from_match(match, start) 414 bounds.append(group.removesuffix(" AND")) 415 report = report.replace(group, " ") 416 return report, bounds, start 417 418 419def _coords_from_text(report: str, start: int) -> tuple[str, list[Coord], int]: 420 """Extract raw coordinate values from report Ex: N4409 E01506""" 421 coords = [] 422 for match in _COORD_PATTERN.finditer(_pre_break(report)): 423 group, start = _info_from_match(match, start) 424 text = group.strip(" -") 425 lat, lon = text.split() 426 coord = Coord(lat=_coord_value(lat), lon=_coord_value(lon), repr=text) 427 coords.append(coord) 428 report = report.replace(group, " ") 429 return report, coords, start 430 431 432def _coords_from_navaids(report: str, start: int) -> tuple[str, list[Coord], int]: 433 """Extract navaid referenced coordinates from report Ex: 30SSW BNA""" 434 coords, navs = [], [] 435 for match in _NAVAID_PATTERN.finditer(_pre_break(report)): 436 group, start = _info_from_match(match, start) 437 report = report.replace(group, " ") 438 group = group.strip("-").removeprefix("FROM ").removeprefix("TO ") 439 navs.append((group, *group.split())) 440 locs = to_coordinates([n[2 if len(n) == 3 else 1] for n in navs]) 441 for i, nav in enumerate(navs): 442 value = nav[0] 443 if len(nav) == 3: 444 vector, num_index = nav[1], 0 445 while vector[num_index].isdigit(): 446 num_index += 1 447 distance, bearing = ( 448 int(vector[:num_index]), 449 CARDINAL_DEGREES[vector[num_index:]], 450 ) 451 loc = geo_distance(nautical=distance).destination(locs[i].pair, bearing=bearing) 452 coord = Coord(lat=loc.latitude, lon=loc.longitude, repr=value) 453 else: 454 coord = locs[i] 455 coord.repr = value 456 coords.append(coord) 457 return report, coords, start 458 459 460def _bounds(data: list[str]) -> tuple[list[str], list[Coord], list[str]]: 461 """Extract coordinate bounds by coord, navaid, and latterals""" 462 report, start = " ".join(data), -1 463 report, bounds, start = _bounds_from_latterals(report, start) 464 report, coords, start = _coords_from_text(report, start) 465 report, navs, start = _coords_from_navaids(report, start) 466 coords += navs 467 for target in ("FROM", "WI", "BOUNDED", "OBS"): 468 index = report.find(f"{target} ") 469 if index != -1 and index < start: 470 start = index 471 report = report[:start] + report[report.rfind(" ") :] 472 data = [s for s in report.split() if s] 473 return data, coords, bounds 474 475 476def _altitudes(data: list[str], units: Units) -> tuple[list[str], Units, Number | None, Number | None]: 477 """Extract the floor and ceiling altitudes""" 478 floor, ceiling = None, None 479 for i, item in enumerate(data): 480 # BTN FL180 AND FL330 481 if item == "BTN" and len(data) > i + 2 and data[i + 2] == "AND": 482 floor, units = core.make_altitude(data[i + 1], units) 483 ceiling, units = core.make_altitude(data[i + 3], units) 484 data = data[:i] + data[i + 4 :] 485 break 486 # TOPS ABV FL450 487 if item in ("TOP", "TOPS", "BLW"): 488 if data[i + 1] == "ABV": 489 ceiling = core.make_number(f"ABV {data[i + 2]}") 490 data = data[:i] + data[i + 3 :] 491 break 492 if data[i + 1] == "BLW": 493 ceiling = core.make_number(f"BLW {data[i + 2]}") 494 data = data[:i] + data[i + 3 :] 495 break 496 # TOPS TO FL310 497 if data[i + 1] == "TO": 498 data.pop(i) 499 ceiling, units = core.make_altitude(data[i + 1], units) 500 data = data[:i] + data[i + 2 :] 501 # CIG BLW 010 502 if data[i - 1] == "CIG": 503 data.pop(i - 1) 504 break 505 # FL060/300 SFC/FL160 506 if core.is_altitude(item): 507 if "/" in item: 508 floor_val, ceiling_val = item.split("/") 509 floor, units = core.make_altitude(floor_val, units) 510 if (floor_val == "SFC" or floor_val[:2] == "FL") and ceiling_val[:2] != "FL": 511 ceiling, units = core.make_altitude(ceiling_val, units, force_fl=True) 512 else: 513 ceiling, units = core.make_altitude(ceiling_val, units) 514 else: 515 ceiling, units = core.make_altitude(item, units) 516 data.pop(i) 517 break 518 return data, units, floor, ceiling 519 520 521def _weather_type(data: list[str]) -> tuple[list[str], Code | None]: 522 weather = None 523 report = " ".join(data) 524 for key, val in WEATHER_TYPES.items(): 525 if key in report: 526 weather = Code(repr=key, value=val) 527 data = [i for i in report.replace(key, "").split() if i] 528 break 529 return data, weather 530 531 532def _intensity(data: list[str]) -> tuple[list[str], Code | None]: 533 if not data: 534 return data, None 535 try: 536 value = INTENSITY[data[-1]] 537 code = data.pop() 538 return data, Code(repr=code, value=value) 539 except KeyError: 540 return data, None 541 542 543def _sigmet_observation(data: list[str], units: Units, issued: date | None = None) -> tuple[AirSigObservation, Units]: 544 data, start_time, end_time = _time(data, issued) 545 data, position = _position(data) 546 data, coords, bounds = _bounds(data) 547 data, units, movement = _movement(data, units) 548 data, intensity = _intensity(data) 549 data, units, floor, ceiling = _altitudes(data, units) 550 data, weather = _weather_type(data) 551 struct = AirSigObservation( 552 type=weather, 553 start_time=start_time, 554 end_time=end_time, 555 position=position, 556 floor=floor, 557 ceiling=ceiling, 558 coords=coords, 559 bounds=bounds, 560 movement=movement, 561 intensity=intensity, 562 other=_clean_flags(data), 563 ) 564 return struct, units 565 566 567def _observations( 568 data: list[str], units: Units, issued: date | None = None 569) -> tuple[Units, AirSigObservation | None, AirSigObservation | None]: 570 observation, forecast, forecast_index = None, None, -1 571 forecast_index = _first_index(data, "FCST", "OUTLOOK", "OTLK") 572 if forecast_index == -1: 573 observation, units = _sigmet_observation(data, units, issued) 574 # 6 is arbitrary. Will likely change or be more precise later 575 elif forecast_index < 6: 576 forecast, units = _sigmet_observation(data, units, issued) 577 else: 578 observation, units = _sigmet_observation(data[:forecast_index], units, issued) 579 forecast, units = _sigmet_observation(data[forecast_index:], units, issued) 580 return units, observation, forecast 581 582 583_REPLACE = { 584 " MO V ": " MOV ", 585 " STNRY": " STNR", 586 " STCNRY": " STNR", 587 " N-NE ": " NNE ", 588 " N-NW ": " NNW ", 589 " E-NE ": " ENE ", 590 " E-SE ": " ESE ", 591 " S-SE ": " SSE ", 592 " S-SW ": " SSW ", 593 " W-SW ": " WSW ", 594 " W-NW ": " WNW ", 595} 596 597 598def _find_first_digit(item: str) -> int: 599 return next((i for i, char in enumerate(item) if char.isdigit()), -1) 600 601 602def sanitize(report: str) -> str: 603 """Sanitized AIRMET / SIGMET report string""" 604 report = report.strip(" =") 605 for key, val in _REPLACE.items(): 606 report = report.replace(key, val) 607 data = report.split() 608 for i, item in reversed(list(enumerate(data))): 609 # Remove extra element on altitude Ex: FL450Z skip 1000FT 610 if ( 611 len(item) > 4 612 and not item[-1].isdigit() 613 and item[-2:] != "FT" 614 and item[-1] != "M" 615 and core.is_altitude(item[:-1]) 616 ): 617 data[i] = item[:-1] 618 # Split attached movement direction Ex: NE05KT 619 if len(item) >= 4 and item.endswith(("KT", "KMH")) and item[: _find_first_digit(item)] in CARDINALS: 620 index = _find_first_digit(item) 621 direction = item[:index] 622 data.insert(i + 1, item[index:]) 623 data[i] = direction 624 return " ".join(data) 625 626 627def parse(report: str, issued: date | None = None) -> tuple[AirSigmetData, Units]: 628 """Parse AIRMET / SIGMET report string""" 629 units = Units.international() 630 sanitized = sanitize(report) 631 data, bulletin, issuer, time, correction = _header(_parse_prep(sanitized)) 632 data, area, report_type, start_time, end_time, station = _spacetime(data) 633 body = sanitized[sanitized.find(" ".join(data[:2])) :] 634 # Trim AIRMET type 635 if data[0] == "AIRMET": 636 with suppress(ValueError): 637 data = data[data.index("<elip>") + 1 :] 638 data, region = _region(data) 639 units, observation, forecast = _observations(data, units, issued) 640 struct = AirSigmetData( 641 raw=report, 642 sanitized=sanitized, 643 station=station, 644 time=core.make_timestamp(time, target_date=issued), 645 remarks=None, 646 bulletin=bulletin, 647 issuer=issuer, 648 correction=correction, 649 area=area, 650 type=report_type, 651 start_time=core.make_timestamp(start_time, target_date=issued), 652 end_time=core.make_timestamp(end_time, target_date=issued), 653 body=body, 654 region=region, 655 observation=observation, 656 forecast=forecast, 657 ) 658 return struct, units
62class AirSigmet(AVWXBase): 63 """ 64 In addition to the manager, you can use the `avwx.AirSigmet` class like any 65 other report when you supply the report string via `parse` or 66 `from_report`. 67 68 ```python 69 >>> from avwx import AirSigmet 70 >>> report = 'WSPR31 SPJC 270529 SPIM SIGMET 3 VALID 270530/270830 SPJC- SPIM LIMA FIR EMBD TS OBS AT 0510Z NE OF LINE S0406 W07103 - S0358 W07225 - S0235 W07432 - S0114 W07503 TOP FL410 MOV SW NC=' 71 >>> sigmet = AirSigmet.from_report(report) 72 True 73 >>> sigmet.last_updated 74 datetime.datetime(2022, 3, 27, 6, 29, 33, 300935, tzinfo=datetime.timezone.utc) 75 >>> sigmet.data.observation.coords 76 [Coord(lat=-4.06, lon=-71.03, repr='S0406 W07103'), 77 Coord(lat=-3.58, lon=-72.25, repr='S0358 W07225'), 78 Coord(lat=-2.35, lon=-74.32, repr='S0235 W07432'), 79 Coord(lat=-1.14, lon=-75.03, repr='S0114 W07503')] 80 >>> sigmet.data.observation.intensity 81 Code(repr='NC', value='No change') 82 >>> sigmet.data.observation.ceiling 83 Number(repr='FL410', value=410, spoken='flight level four one zero') 84 ``` 85 """ 86 87 data: AirSigmetData | None = None 88 89 def _post_parse(self) -> None: 90 if self.raw: 91 self.data, self.units = parse(self.raw, self.issued) 92 93 @staticmethod 94 def sanitize(report: str) -> str: 95 """Sanitizes the report string""" 96 return sanitize(report) 97 98 def intersects(self, path: LineString) -> bool: 99 """Returns True if the report area intersects a flight path""" 100 if LineString is None: 101 extra = "shape" 102 raise MissingExtraModule(extra) 103 if not self.data: 104 return False 105 for data in (self.data.observation, self.data.forecast): 106 if data: 107 poly = data.poly 108 if poly and path.intersects(poly): 109 return True 110 return False 111 112 def contains(self, coord: Coord) -> bool: 113 """Returns True if the report area contains a coordinate""" 114 if not self.data: 115 return False 116 for data in (self.data.observation, self.data.forecast): 117 if data: 118 poly = data.poly 119 if poly and coord.point.within(poly): 120 return True 121 return False
In addition to the manager, you can use the avwx.AirSigmet class like any
other report when you supply the report string via parse or
from_report.
>>> from avwx import AirSigmet
>>> report = 'WSPR31 SPJC 270529 SPIM SIGMET 3 VALID 270530/270830 SPJC- SPIM LIMA FIR EMBD TS OBS AT 0510Z NE OF LINE S0406 W07103 - S0358 W07225 - S0235 W07432 - S0114 W07503 TOP FL410 MOV SW NC='
>>> sigmet = AirSigmet.from_report(report)
True
>>> sigmet.last_updated
datetime.datetime(2022, 3, 27, 6, 29, 33, 300935, tzinfo=datetime.timezone.utc)
>>> sigmet.data.observation.coords
[Coord(lat=-4.06, lon=-71.03, repr='S0406 W07103'),
Coord(lat=-3.58, lon=-72.25, repr='S0358 W07225'),
Coord(lat=-2.35, lon=-74.32, repr='S0235 W07432'),
Coord(lat=-1.14, lon=-75.03, repr='S0114 W07503')]
>>> sigmet.data.observation.intensity
Code(repr='NC', value='No change')
>>> sigmet.data.observation.ceiling
Number(repr='FL410', value=410, spoken='flight level four one zero')
93 @staticmethod 94 def sanitize(report: str) -> str: 95 """Sanitizes the report string""" 96 return sanitize(report)
Sanitizes the report string
98 def intersects(self, path: LineString) -> bool: 99 """Returns True if the report area intersects a flight path""" 100 if LineString is None: 101 extra = "shape" 102 raise MissingExtraModule(extra) 103 if not self.data: 104 return False 105 for data in (self.data.observation, self.data.forecast): 106 if data: 107 poly = data.poly 108 if poly and path.intersects(poly): 109 return True 110 return False
Returns True if the report area intersects a flight path
112 def contains(self, coord: Coord) -> bool: 113 """Returns True if the report area contains a coordinate""" 114 if not self.data: 115 return False 116 for data in (self.data.observation, self.data.forecast): 117 if data: 118 poly = data.poly 119 if poly and coord.point.within(poly): 120 return True 121 return False
Returns True if the report area contains a coordinate
Inherited Members
124class AirSigManager: 125 """ 126 Because of the global nature of these report types, we don't initialize a 127 report class with a station ident like the other report types. Instead, we 128 use a class to manage and update the list of all active SIGMET and AIRMET 129 reports. 130 131 ```python 132 >>> from avwx import AirSigManager 133 >>> from avwx.structs import Coord 134 >>> manager = AirSigManager() 135 >>> manager.update() 136 True 137 >>> manager.last_updated 138 datetime.datetime(2022, 3, 27, 5, 54, 21, 516741, tzinfo=datetime.timezone.utc) 139 >>> len(manager.reports) 140 113 141 >>> len(manager.contains(Coord(lat=33.12, lon=-105))) 142 5 143 >>> manager.reports[0].data.bulletin.type 144 Code(repr='WA', value='airmet') 145 >>> manager.reports[0].data.type 146 'AIRMET SIERRA FOR IFR AND MTN OBSCN' 147 ``` 148 """ 149 150 _services: list[Service] 151 _raw: list[tuple[str, str | None]] 152 last_updated: datetime | None = None 153 raw: list[str] 154 reports: list[AirSigmet] | None = None 155 156 def __init__(self): # type: ignore 157 self._services = [NoaaBulk("airsigmet"), NoaaIntl("airsigmet")] 158 self._raw, self.raw = [], [] 159 160 async def _update(self, index: int, timeout: int) -> list[tuple[str, str | None]]: 161 source = self._services[index].root 162 reports = await self._services[index].async_fetch(timeout=timeout) # type: ignore 163 raw: list[tuple[str, str | None]] = [(report, source) for report in reports if report] 164 return raw 165 166 def update(self, timeout: int = 10, *, disable_post: bool = False) -> bool: 167 """Updates fetched reports and returns whether they've changed""" 168 return aio.run(self.async_update(timeout, disable_post=disable_post)) 169 170 async def async_update(self, timeout: int = 10, *, disable_post: bool = False) -> bool: 171 """Updates fetched reports and returns whether they've changed""" 172 coros = [self._update(i, timeout) for i in range(len(self._services))] 173 data = await aio.gather(*coros) 174 raw = list(chain.from_iterable(data)) 175 reports = [i[0] for i in raw] 176 if raw == self._raw: 177 return False 178 self._raw, self.raw = raw, reports 179 self.last_updated = datetime.now(tz=timezone.utc) 180 # Parse reports if not disabled 181 if not disable_post: 182 parsed = [] 183 for report, source in raw: 184 try: 185 if obj := AirSigmet.from_report(report): 186 obj.source = source 187 parsed.append(obj) 188 except Exception as exc: # noqa: BLE001 189 exceptions.exception_intercept(exc, raw={"report": report}) 190 self.reports = parsed 191 return True 192 193 def along(self, coords: list[Coord]) -> list[AirSigmet]: 194 """Returns available reports the intersect a flight path""" 195 if LineString is None: 196 extra = "shape" 197 raise MissingExtraModule(extra) 198 if self.reports is None: 199 return [] 200 path = LineString([c.pair for c in coords]) 201 return [r for r in self.reports if r.intersects(path)] 202 203 def contains(self, coord: Coord) -> list[AirSigmet]: 204 """Returns available reports that contain a coordinate""" 205 if self.reports is None: 206 return [] 207 return [r for r in self.reports if r.contains(coord)]
Because of the global nature of these report types, we don't initialize a report class with a station ident like the other report types. Instead, we use a class to manage and update the list of all active SIGMET and AIRMET reports.
>>> from avwx import AirSigManager
>>> from avwx.structs import Coord
>>> manager = AirSigManager()
>>> manager.update()
True
>>> manager.last_updated
datetime.datetime(2022, 3, 27, 5, 54, 21, 516741, tzinfo=datetime.timezone.utc)
>>> len(manager.reports)
113
>>> len(manager.contains(Coord(lat=33.12, lon=-105)))
5
>>> manager.reports[0].data.bulletin.type
Code(repr='WA', value='airmet')
>>> manager.reports[0].data.type
'AIRMET SIERRA FOR IFR AND MTN OBSCN'
166 def update(self, timeout: int = 10, *, disable_post: bool = False) -> bool: 167 """Updates fetched reports and returns whether they've changed""" 168 return aio.run(self.async_update(timeout, disable_post=disable_post))
Updates fetched reports and returns whether they've changed
170 async def async_update(self, timeout: int = 10, *, disable_post: bool = False) -> bool: 171 """Updates fetched reports and returns whether they've changed""" 172 coros = [self._update(i, timeout) for i in range(len(self._services))] 173 data = await aio.gather(*coros) 174 raw = list(chain.from_iterable(data)) 175 reports = [i[0] for i in raw] 176 if raw == self._raw: 177 return False 178 self._raw, self.raw = raw, reports 179 self.last_updated = datetime.now(tz=timezone.utc) 180 # Parse reports if not disabled 181 if not disable_post: 182 parsed = [] 183 for report, source in raw: 184 try: 185 if obj := AirSigmet.from_report(report): 186 obj.source = source 187 parsed.append(obj) 188 except Exception as exc: # noqa: BLE001 189 exceptions.exception_intercept(exc, raw={"report": report}) 190 self.reports = parsed 191 return True
Updates fetched reports and returns whether they've changed
193 def along(self, coords: list[Coord]) -> list[AirSigmet]: 194 """Returns available reports the intersect a flight path""" 195 if LineString is None: 196 extra = "shape" 197 raise MissingExtraModule(extra) 198 if self.reports is None: 199 return [] 200 path = LineString([c.pair for c in coords]) 201 return [r for r in self.reports if r.intersects(path)]
Returns available reports the intersect a flight path
603def sanitize(report: str) -> str: 604 """Sanitized AIRMET / SIGMET report string""" 605 report = report.strip(" =") 606 for key, val in _REPLACE.items(): 607 report = report.replace(key, val) 608 data = report.split() 609 for i, item in reversed(list(enumerate(data))): 610 # Remove extra element on altitude Ex: FL450Z skip 1000FT 611 if ( 612 len(item) > 4 613 and not item[-1].isdigit() 614 and item[-2:] != "FT" 615 and item[-1] != "M" 616 and core.is_altitude(item[:-1]) 617 ): 618 data[i] = item[:-1] 619 # Split attached movement direction Ex: NE05KT 620 if len(item) >= 4 and item.endswith(("KT", "KMH")) and item[: _find_first_digit(item)] in CARDINALS: 621 index = _find_first_digit(item) 622 direction = item[:index] 623 data.insert(i + 1, item[index:]) 624 data[i] = direction 625 return " ".join(data)
Sanitized AIRMET / SIGMET report string
628def parse(report: str, issued: date | None = None) -> tuple[AirSigmetData, Units]: 629 """Parse AIRMET / SIGMET report string""" 630 units = Units.international() 631 sanitized = sanitize(report) 632 data, bulletin, issuer, time, correction = _header(_parse_prep(sanitized)) 633 data, area, report_type, start_time, end_time, station = _spacetime(data) 634 body = sanitized[sanitized.find(" ".join(data[:2])) :] 635 # Trim AIRMET type 636 if data[0] == "AIRMET": 637 with suppress(ValueError): 638 data = data[data.index("<elip>") + 1 :] 639 data, region = _region(data) 640 units, observation, forecast = _observations(data, units, issued) 641 struct = AirSigmetData( 642 raw=report, 643 sanitized=sanitized, 644 station=station, 645 time=core.make_timestamp(time, target_date=issued), 646 remarks=None, 647 bulletin=bulletin, 648 issuer=issuer, 649 correction=correction, 650 area=area, 651 type=report_type, 652 start_time=core.make_timestamp(start_time, target_date=issued), 653 end_time=core.make_timestamp(end_time, target_date=issued), 654 body=body, 655 region=region, 656 observation=observation, 657 forecast=forecast, 658 ) 659 return struct, units
Parse AIRMET / SIGMET report string