avwx.current.airsigmet
A SIGMET (Significant Meteorological Information) is a weather advisory for the safety of all aircraft. They are divided into:
- Convective - thunderstorms, hail, and cyclones
- Non-Convective - turbulence, icing, dust clouds, volcanic activity, and radiation
An AIRMET (Airman's Meteorological Information) is a weather advisory for smaller aircraft or VFR navigation. They are divided into:
- Sierra - IFR conditions like low ceilings and mountain obscuration
- Tango - turbulence and high surface winds
- Zulu - icing and freezing levels
Both types share a similar report format and therefore are combined into a
single handling class. The Bulletin
and weather type can be used to classify
each as a SIGMET or AIRMET for filtering purposes.
1""" 2A SIGMET (Significant Meteorological Information) is a weather advisory for the 3safety of all aircraft. They are divided into: 4 5- Convective - thunderstorms, hail, and cyclones 6- Non-Convective - turbulence, icing, dust clouds, volcanic activity, and radiation 7 8An AIRMET (Airman's Meteorological Information) is a weather advisory for 9smaller aircraft or VFR navigation. They are divided into: 10 11- Sierra - IFR conditions like low ceilings and mountain obscuration 12- Tango - turbulence and high surface winds 13- Zulu - icing and freezing levels 14 15Both types share a similar report format and therefore are combined into a 16single handling class. The `Bulletin` and weather type can be used to classify 17each as a SIGMET or AIRMET for filtering purposes. 18""" 19 20# stdlib 21from __future__ import annotations 22 23import asyncio as aio 24import re 25from contextlib import suppress 26from datetime import date, datetime, timezone 27from itertools import chain 28 29# library 30from geopy.distance import distance as geo_distance # type: ignore 31 32# module 33from avwx import exceptions 34from avwx.base import AVWXBase 35from avwx.exceptions import MissingExtraModule 36from avwx.flight_path import to_coordinates 37from avwx.load_utils import LazyLoad 38from avwx.parsing import core 39from avwx.service.bulk import NoaaBulk, NoaaIntl, Service 40from avwx.static.airsigmet import BULLETIN_TYPES, INTENSITY, WEATHER_TYPES 41from avwx.static.core import CARDINAL_DEGREES, CARDINALS 42from avwx.structs import ( 43 AirSigmetData, 44 AirSigObservation, 45 Bulletin, 46 Code, 47 Coord, 48 Movement, 49 Number, 50 Timestamp, 51 Units, 52) 53 54try: 55 from shapely.geometry import LineString # type: ignore 56except ModuleNotFoundError: 57 LineString = None 58 59 60class AirSigmet(AVWXBase): 61 """ 62 In addition to the manager, you can use the `avwx.AirSigmet` class like any 63 other report when you supply the report string via `parse` or 64 `from_report`. 65 66 ```python 67 >>> from avwx import AirSigmet 68 >>> report = 'WSPR31 SPJC 270529 SPIM SIGMET 3 VALID 270530/270830 SPJC- SPIM LIMA FIR EMBD TS OBS AT 0510Z NE OF LINE S0406 W07103 - S0358 W07225 - S0235 W07432 - S0114 W07503 TOP FL410 MOV SW NC=' 69 >>> sigmet = AirSigmet.from_report(report) 70 True 71 >>> sigmet.last_updated 72 datetime.datetime(2022, 3, 27, 6, 29, 33, 300935, tzinfo=datetime.timezone.utc) 73 >>> sigmet.data.observation.coords 74 [Coord(lat=-4.06, lon=-71.03, repr='S0406 W07103'), 75 Coord(lat=-3.58, lon=-72.25, repr='S0358 W07225'), 76 Coord(lat=-2.35, lon=-74.32, repr='S0235 W07432'), 77 Coord(lat=-1.14, lon=-75.03, repr='S0114 W07503')] 78 >>> sigmet.data.observation.intensity 79 Code(repr='NC', value='No change') 80 >>> sigmet.data.observation.ceiling 81 Number(repr='FL410', value=410, spoken='flight level four one zero') 82 ``` 83 """ 84 85 data: AirSigmetData | None = None 86 87 def _post_parse(self) -> None: 88 if self.raw: 89 self.data, self.units = parse(self.raw, self.issued) 90 91 @staticmethod 92 def sanitize(report: str) -> str: 93 """Sanitizes the report string""" 94 return sanitize(report) 95 96 def intersects(self, path: LineString) -> bool: 97 """Returns True if the report area intersects a flight path""" 98 if LineString is None: 99 extra = "shape" 100 raise MissingExtraModule(extra) 101 if not self.data: 102 return False 103 for data in (self.data.observation, self.data.forecast): 104 if data: 105 poly = data.poly 106 if poly and path.intersects(poly): 107 return True 108 return False 109 110 def contains(self, coord: Coord) -> bool: 111 """Returns True if the report area contains a coordinate""" 112 if not self.data: 113 return False 114 for data in (self.data.observation, self.data.forecast): 115 if data: 116 poly = data.poly 117 if poly and coord.point.within(poly): 118 return True 119 return False 120 121 122class AirSigManager: 123 """ 124 Because of the global nature of these report types, we don't initialize a 125 report class with a station ident like the other report types. Instead, we 126 use a class to manage and update the list of all active SIGMET and AIRMET 127 reports. 128 129 ```python 130 >>> from avwx import AirSigManager 131 >>> from avwx.structs import Coord 132 >>> manager = AirSigManager() 133 >>> manager.update() 134 True 135 >>> manager.last_updated 136 datetime.datetime(2022, 3, 27, 5, 54, 21, 516741, tzinfo=datetime.timezone.utc) 137 >>> len(manager.reports) 138 113 139 >>> len(manager.contains(Coord(lat=33.12, lon=-105))) 140 5 141 >>> manager.reports[0].data.bulletin.type 142 Code(repr='WA', value='airmet') 143 >>> manager.reports[0].data.type 144 'AIRMET SIERRA FOR IFR AND MTN OBSCN' 145 ``` 146 """ 147 148 _services: list[Service] 149 _raw: list[tuple[str, str | None]] 150 last_updated: datetime | None = None 151 raw: list[str] 152 reports: list[AirSigmet] | None = None 153 154 def __init__(self): # type: ignore 155 self._services = [NoaaBulk("airsigmet"), NoaaIntl("airsigmet")] 156 self._raw, self.raw = [], [] 157 158 async def _update(self, index: int, timeout: int) -> list[tuple[str, str | None]]: 159 source = self._services[index].root 160 reports = await self._services[index].async_fetch(timeout=timeout) # type: ignore 161 raw: list[tuple[str, str | None]] = [(report, source) for report in reports if report] 162 return raw 163 164 def update(self, timeout: int = 10, *, disable_post: bool = False) -> bool: 165 """Updates fetched reports and returns whether they've changed""" 166 return aio.run(self.async_update(timeout, disable_post=disable_post)) 167 168 async def async_update(self, timeout: int = 10, *, disable_post: bool = False) -> bool: 169 """Updates fetched reports and returns whether they've changed""" 170 coros = [self._update(i, timeout) for i in range(len(self._services))] 171 data = await aio.gather(*coros) 172 raw = list(chain.from_iterable(data)) 173 reports = [i[0] for i in raw] 174 if raw == self._raw: 175 return False 176 self._raw, self.raw = raw, reports 177 self.last_updated = datetime.now(tz=timezone.utc) 178 # Parse reports if not disabled 179 if not disable_post: 180 parsed = [] 181 for report, source in raw: 182 try: 183 if obj := AirSigmet.from_report(report): 184 obj.source = source 185 parsed.append(obj) 186 except Exception as exc: # noqa: BLE001 187 exceptions.exception_intercept(exc, raw={"report": report}) 188 self.reports = parsed 189 return True 190 191 def along(self, coords: list[Coord]) -> list[AirSigmet]: 192 """Returns available reports the intersect a flight path""" 193 if LineString is None: 194 extra = "shape" 195 raise MissingExtraModule(extra) 196 if self.reports is None: 197 return [] 198 path = LineString([c.pair for c in coords]) 199 return [r for r in self.reports if r.intersects(path)] 200 201 def contains(self, coord: Coord) -> list[AirSigmet]: 202 """Returns available reports that contain a coordinate""" 203 if self.reports is None: 204 return [] 205 return [r for r in self.reports if r.contains(coord)] 206 207 208# N1429 W09053 - N1427 W09052 - N1411 W09139 - N1417 W09141 209_COORD_PATTERN = re.compile(r"\b[NS]\d{4} [EW]\d{5}\b( -)?") 210 211# FROM 60NW ISN-INL-TVC-GIJ-UIN-FSD-BIL-60NW ISN 212# FROM 70SSW ISN TO 20NNW FAR TO 70E DLH TO 40SE EAU TO 80SE RAP TO 40NNW BFF TO 70SSW 213_NAVAID_PATTERN = re.compile(r"\b(\d{1,3}[NESW]{1,3} [A-z]{3}-?\b)|((-|(TO )|(FROM ))[A-z]{3}\b)") 214 215# N OF N2050 AND S OF N2900 216_LATTERAL_PATTERN = re.compile(r"\b([NS] OF [NS]\d{2,4})|([EW] OF [EW]\d{3,5})( AND)?\b") 217 218NAVAIDS = LazyLoad("navaids") 219 220# Used to assist parsing after sanitized. Removed after parse 221_FLAGS = { 222 "...": " <elip> ", 223 "..": " <elip> ", 224 ". ": " <break> ", 225 "/VIS ": " <vis> VIS ", 226} 227 228 229def _parse_prep(report: str) -> list[str]: 230 """Prepares sanitized string by replacing elements with flags""" 231 report = report.rstrip(".") 232 for key, val in _FLAGS.items(): 233 report = report.replace(key, val) 234 return report.split() 235 236 237def _clean_flags(data: list[str]) -> list[str]: 238 return [i for i in data if i[0] != "<"] 239 240 241def _bulletin(value: str) -> Bulletin: 242 # if len(value) != 6: 243 # return None 244 type_key = value[:2] 245 return Bulletin( 246 repr=value, 247 type=Code(repr=type_key, value=BULLETIN_TYPES[type_key]), 248 country=value[2:4], 249 number=int(value[4:]), 250 ) 251 252 253def _header(data: list[str]) -> tuple[list[str], Bulletin, str, str, str | None]: 254 bulletin = _bulletin(data[0]) 255 correction, end = (data[3], 4) if len(data[3]) == 3 else (None, 3) 256 return data[end:], bulletin, data[1], data[2], correction 257 258 259def _spacetime( 260 data: list[str], 261) -> tuple[list[str], str, str, str | None, str, str | None]: 262 area = data.pop(0) 263 # Skip airmet type + time repeat 264 if data[0] == "WA" and data[1].isdigit(): 265 data = data[2:] 266 area = area[:-1] # Remove type label from 3-letter ident 267 valid_index = data.index("VALID") 268 report_type = " ".join(data[:valid_index]) 269 data = data[valid_index + 1 :] 270 if data[0] == "UNTIL": 271 start_time = None 272 end_time = data[1] 273 data = data[2:] 274 else: 275 target = "-" if "-" in data[0] else "/" 276 start_time, end_time = data.pop(0).split(target) 277 # KMCO- ORL FIR 278 if data[0][-1] == "-": 279 station = data.pop(0)[:-1] 280 # KMCO - KMCO 281 elif data[1] == "-" and len(data[0]) == 4: 282 station = data.pop(0) 283 data.pop(0) 284 else: 285 station = None 286 return data, area, report_type, start_time, end_time, station 287 288 289def _first_index(data: list[str], *targets: str) -> int: 290 for target in targets: 291 with suppress(ValueError): 292 return data.index(target) 293 return -1 294 295 296def _region(data: list[str]) -> tuple[list[str], str]: 297 # FIR/CTA region name 298 # Or non-standard name using lookahead Ex: FL CSTL WTRS FROM 100SSW 299 name_end = _first_index(data, "FIR", "CTA") + 1 or _first_index(data, "FROM") 300 # State list 301 if not name_end: 302 for item in data: 303 if len(item) == 2: 304 name_end += 1 305 else: 306 break 307 name = " ".join(data[:name_end]) 308 return data[name_end:], name 309 310 311def _time(data: list[str], issued: date | None = None) -> tuple[list[str], Timestamp | None, Timestamp | None]: 312 """Extracts the start and/or end time based on a couple starting elements""" 313 index = _first_index(data, "AT", "FCST", "UNTIL", "VALID", "OUTLOOK", "OTLK") 314 if index == -1: 315 return data, None, None 316 start_item = data.pop(index) 317 start, end, observed = None, None, None 318 if "-" in data[index]: 319 start_item, end_item = data.pop(index).split("-") 320 start = core.make_timestamp(start_item, time_only=len(start_item) < 6, target_date=issued) 321 end = core.make_timestamp(end_item, time_only=len(end_item) < 6, target_date=issued) 322 elif len(data[index]) >= 4 and data[index][:4].isdigit(): 323 observed = core.make_timestamp(data.pop(index), time_only=True, target_date=issued) 324 if index > 0 and data[index - 1] == "OBS": 325 data.pop(index - 1) 326 for remv in ("FCST", "OUTLOOK", "OTLK", "VALID"): 327 with suppress(ValueError): 328 data.remove(remv) 329 if observed: 330 if start_item in ("UNTIL", "VALID"): 331 end = observed 332 else: 333 start = observed 334 return data, start, end 335 336 337def _coord_value(value: str) -> float: 338 if value[0] in ("N", "S"): 339 index, strip, replace = 3, "N", "S" 340 else: 341 index, strip, replace = 4, "E", "W" 342 num = f"{value[:index]}.{value[index:]}".lstrip(strip).replace(replace, "-") 343 return float(num) 344 345 346def _position(data: list[str]) -> tuple[list[str], Coord | None]: 347 try: 348 index = data.index("PSN") 349 except ValueError: 350 return data, None 351 data.pop(index) 352 raw = f"{data[index]} {data[index + 1]}" 353 lat = _coord_value(data.pop(index)) 354 lon = _coord_value(data.pop(index)) 355 return data, Coord(lat=lat, lon=lon, repr=raw) 356 357 358def _movement(data: list[str], units: Units) -> tuple[list[str], Units, Movement | None]: 359 with suppress(ValueError): 360 data.remove("STNR") 361 speed = core.make_number("STNR") 362 return data, units, Movement(repr="STNR", direction=None, speed=speed) 363 try: 364 index = data.index("MOV") 365 except ValueError: 366 return data, units, None 367 raw = data.pop(index) 368 direction_str = data.pop(index) 369 # MOV CNL 370 if direction_str == "CNL": 371 return data, units, None 372 raw += f" {direction_str} " 373 # MOV FROM 23040KT 374 if direction_str == "FROM": 375 value = data[index][:3] 376 raw += value 377 direction = core.make_number(value) 378 data[index] = data[index][3:] 379 # MOV E 45KMH 380 else: 381 direction = core.make_number(direction_str.replace("/", ""), literal=True, special=CARDINAL_DEGREES) 382 speed = None 383 with suppress(IndexError): 384 kt_unit, kmh_unit = data[index].endswith("KT"), data[index].endswith("KMH") 385 if kt_unit or kmh_unit: 386 units.wind_speed = "kmh" if kmh_unit else "kt" 387 speed_str = data.pop(index) 388 raw += speed_str 389 # Remove bottom speed Ex: MOV W 05-10KT 390 if "-" in speed_str: 391 speed_str = speed_str[speed_str.find("-") + 1 :] 392 speed = core.make_number(speed_str[: -3 if kmh_unit else -2]) 393 return data, units, Movement(repr=raw.strip(), direction=direction, speed=speed) 394 395 396def _info_from_match(match: re.Match, start: int) -> tuple[str, int]: 397 """Returns the matching text and starting location if none yet available""" 398 if start == -1: 399 start = match.start() 400 return match.group(), start 401 402 403def _pre_break(report: str) -> str: 404 break_index = report.find(" <break> ") 405 return report[:break_index] if break_index != -1 else report 406 407 408def _bounds_from_latterals(report: str, start: int) -> tuple[str, list[str], int]: 409 """Extract coordinate latterals from report Ex: N OF N2050""" 410 bounds = [] 411 for match in _LATTERAL_PATTERN.finditer(_pre_break(report)): 412 group, start = _info_from_match(match, start) 413 bounds.append(group.removesuffix(" AND")) 414 report = report.replace(group, " ") 415 return report, bounds, start 416 417 418def _coords_from_text(report: str, start: int) -> tuple[str, list[Coord], int]: 419 """Extract raw coordinate values from report Ex: N4409 E01506""" 420 coords = [] 421 for match in _COORD_PATTERN.finditer(_pre_break(report)): 422 group, start = _info_from_match(match, start) 423 text = group.strip(" -") 424 lat, lon = text.split() 425 coord = Coord(lat=_coord_value(lat), lon=_coord_value(lon), repr=text) 426 coords.append(coord) 427 report = report.replace(group, " ") 428 return report, coords, start 429 430 431def _coords_from_navaids(report: str, start: int) -> tuple[str, list[Coord], int]: 432 """Extract navaid referenced coordinates from report Ex: 30SSW BNA""" 433 coords, navs = [], [] 434 for match in _NAVAID_PATTERN.finditer(_pre_break(report)): 435 group, start = _info_from_match(match, start) 436 report = report.replace(group, " ") 437 group = group.strip("-").removeprefix("FROM ").removeprefix("TO ") 438 navs.append((group, *group.split())) 439 locs = to_coordinates([n[2 if len(n) == 3 else 1] for n in navs]) 440 for i, nav in enumerate(navs): 441 value = nav[0] 442 if len(nav) == 3: 443 vector, num_index = nav[1], 0 444 while vector[num_index].isdigit(): 445 num_index += 1 446 distance, bearing = ( 447 int(vector[:num_index]), 448 CARDINAL_DEGREES[vector[num_index:]], 449 ) 450 loc = geo_distance(nautical=distance).destination(locs[i].pair, bearing=bearing) 451 coord = Coord(lat=loc.latitude, lon=loc.longitude, repr=value) 452 else: 453 coord = locs[i] 454 coord.repr = value 455 coords.append(coord) 456 return report, coords, start 457 458 459def _bounds(data: list[str]) -> tuple[list[str], list[Coord], list[str]]: 460 """Extract coordinate bounds by coord, navaid, and latterals""" 461 report, start = " ".join(data), -1 462 report, bounds, start = _bounds_from_latterals(report, start) 463 report, coords, start = _coords_from_text(report, start) 464 report, navs, start = _coords_from_navaids(report, start) 465 coords += navs 466 for target in ("FROM", "WI", "BOUNDED", "OBS"): 467 index = report.find(f"{target} ") 468 if index != -1 and index < start: 469 start = index 470 report = report[:start] + report[report.rfind(" ") :] 471 data = [s for s in report.split() if s] 472 return data, coords, bounds 473 474 475def _altitudes(data: list[str], units: Units) -> tuple[list[str], Units, Number | None, Number | None]: 476 """Extract the floor and ceiling altitudes""" 477 floor, ceiling = None, None 478 for i, item in enumerate(data): 479 # BTN FL180 AND FL330 480 if item == "BTN" and len(data) > i + 2 and data[i + 2] == "AND": 481 floor, units = core.make_altitude(data[i + 1], units) 482 ceiling, units = core.make_altitude(data[i + 3], units) 483 data = data[:i] + data[i + 4 :] 484 break 485 # TOPS ABV FL450 486 if item in ("TOP", "TOPS", "BLW"): 487 if data[i + 1] == "ABV": 488 ceiling = core.make_number(f"ABV {data[i + 2]}") 489 data = data[:i] + data[i + 3 :] 490 break 491 if data[i + 1] == "BLW": 492 ceiling = core.make_number(f"BLW {data[i + 2]}") 493 data = data[:i] + data[i + 3 :] 494 break 495 # TOPS TO FL310 496 if data[i + 1] == "TO": 497 data.pop(i) 498 ceiling, units = core.make_altitude(data[i + 1], units) 499 data = data[:i] + data[i + 2 :] 500 # CIG BLW 010 501 if data[i - 1] == "CIG": 502 data.pop(i - 1) 503 break 504 # FL060/300 SFC/FL160 505 if core.is_altitude(item): 506 if "/" in item: 507 floor_val, ceiling_val = item.split("/") 508 floor, units = core.make_altitude(floor_val, units) 509 if (floor_val == "SFC" or floor_val[:2] == "FL") and ceiling_val[:2] != "FL": 510 ceiling, units = core.make_altitude(ceiling_val, units, force_fl=True) 511 else: 512 ceiling, units = core.make_altitude(ceiling_val, units) 513 else: 514 ceiling, units = core.make_altitude(item, units) 515 data.pop(i) 516 break 517 return data, units, floor, ceiling 518 519 520def _weather_type(data: list[str]) -> tuple[list[str], Code | None]: 521 weather = None 522 report = " ".join(data) 523 for key, val in WEATHER_TYPES.items(): 524 if key in report: 525 weather = Code(repr=key, value=val) 526 data = [i for i in report.replace(key, "").split() if i] 527 break 528 return data, weather 529 530 531def _intensity(data: list[str]) -> tuple[list[str], Code | None]: 532 if not data: 533 return data, None 534 try: 535 value = INTENSITY[data[-1]] 536 code = data.pop() 537 return data, Code(repr=code, value=value) 538 except KeyError: 539 return data, None 540 541 542def _sigmet_observation(data: list[str], units: Units, issued: date | None = None) -> tuple[AirSigObservation, Units]: 543 data, start_time, end_time = _time(data, issued) 544 data, position = _position(data) 545 data, coords, bounds = _bounds(data) 546 data, units, movement = _movement(data, units) 547 data, intensity = _intensity(data) 548 data, units, floor, ceiling = _altitudes(data, units) 549 data, weather = _weather_type(data) 550 struct = AirSigObservation( 551 type=weather, 552 start_time=start_time, 553 end_time=end_time, 554 position=position, 555 floor=floor, 556 ceiling=ceiling, 557 coords=coords, 558 bounds=bounds, 559 movement=movement, 560 intensity=intensity, 561 other=_clean_flags(data), 562 ) 563 return struct, units 564 565 566def _observations( 567 data: list[str], units: Units, issued: date | None = None 568) -> tuple[Units, AirSigObservation | None, AirSigObservation | None]: 569 observation, forecast, forecast_index = None, None, -1 570 forecast_index = _first_index(data, "FCST", "OUTLOOK", "OTLK") 571 if forecast_index == -1: 572 observation, units = _sigmet_observation(data, units, issued) 573 # 6 is arbitrary. Will likely change or be more precise later 574 elif forecast_index < 6: 575 forecast, units = _sigmet_observation(data, units, issued) 576 else: 577 observation, units = _sigmet_observation(data[:forecast_index], units, issued) 578 forecast, units = _sigmet_observation(data[forecast_index:], units, issued) 579 return units, observation, forecast 580 581 582_REPLACE = { 583 " MO V ": " MOV ", 584 " STNRY": " STNR", 585 " STCNRY": " STNR", 586 " N-NE ": " NNE ", 587 " N-NW ": " NNW ", 588 " E-NE ": " ENE ", 589 " E-SE ": " ESE ", 590 " S-SE ": " SSE ", 591 " S-SW ": " SSW ", 592 " W-SW ": " WSW ", 593 " W-NW ": " WNW ", 594} 595 596 597def _find_first_digit(item: str) -> int: 598 return next((i for i, char in enumerate(item) if char.isdigit()), -1) 599 600 601def sanitize(report: str) -> str: 602 """Sanitized AIRMET / SIGMET report string""" 603 report = report.strip(" =") 604 for key, val in _REPLACE.items(): 605 report = report.replace(key, val) 606 data = report.split() 607 for i, item in reversed(list(enumerate(data))): 608 # Remove extra element on altitude Ex: FL450Z skip 1000FT 609 if ( 610 len(item) > 4 611 and not item[-1].isdigit() 612 and item[-2:] != "FT" 613 and item[-1] != "M" 614 and core.is_altitude(item[:-1]) 615 ): 616 data[i] = item[:-1] 617 # Split attached movement direction Ex: NE05KT 618 if len(item) >= 4 and item.endswith(("KT", "KMH")) and item[: _find_first_digit(item)] in CARDINALS: 619 index = _find_first_digit(item) 620 direction = item[:index] 621 data.insert(i + 1, item[index:]) 622 data[i] = direction 623 return " ".join(data) 624 625 626def parse(report: str, issued: date | None = None) -> tuple[AirSigmetData, Units]: 627 """Parse AIRMET / SIGMET report string""" 628 units = Units.international() 629 sanitized = sanitize(report) 630 data, bulletin, issuer, time, correction = _header(_parse_prep(sanitized)) 631 data, area, report_type, start_time, end_time, station = _spacetime(data) 632 body = sanitized[sanitized.find(" ".join(data[:2])) :] 633 # Trim AIRMET type 634 if data[0] == "AIRMET": 635 with suppress(ValueError): 636 data = data[data.index("<elip>") + 1 :] 637 data, region = _region(data) 638 units, observation, forecast = _observations(data, units, issued) 639 struct = AirSigmetData( 640 raw=report, 641 sanitized=sanitized, 642 station=station, 643 time=core.make_timestamp(time, target_date=issued), 644 remarks=None, 645 bulletin=bulletin, 646 issuer=issuer, 647 correction=correction, 648 area=area, 649 type=report_type, 650 start_time=core.make_timestamp(start_time, target_date=issued), 651 end_time=core.make_timestamp(end_time, target_date=issued), 652 body=body, 653 region=region, 654 observation=observation, 655 forecast=forecast, 656 ) 657 return struct, units
61class AirSigmet(AVWXBase): 62 """ 63 In addition to the manager, you can use the `avwx.AirSigmet` class like any 64 other report when you supply the report string via `parse` or 65 `from_report`. 66 67 ```python 68 >>> from avwx import AirSigmet 69 >>> report = 'WSPR31 SPJC 270529 SPIM SIGMET 3 VALID 270530/270830 SPJC- SPIM LIMA FIR EMBD TS OBS AT 0510Z NE OF LINE S0406 W07103 - S0358 W07225 - S0235 W07432 - S0114 W07503 TOP FL410 MOV SW NC=' 70 >>> sigmet = AirSigmet.from_report(report) 71 True 72 >>> sigmet.last_updated 73 datetime.datetime(2022, 3, 27, 6, 29, 33, 300935, tzinfo=datetime.timezone.utc) 74 >>> sigmet.data.observation.coords 75 [Coord(lat=-4.06, lon=-71.03, repr='S0406 W07103'), 76 Coord(lat=-3.58, lon=-72.25, repr='S0358 W07225'), 77 Coord(lat=-2.35, lon=-74.32, repr='S0235 W07432'), 78 Coord(lat=-1.14, lon=-75.03, repr='S0114 W07503')] 79 >>> sigmet.data.observation.intensity 80 Code(repr='NC', value='No change') 81 >>> sigmet.data.observation.ceiling 82 Number(repr='FL410', value=410, spoken='flight level four one zero') 83 ``` 84 """ 85 86 data: AirSigmetData | None = None 87 88 def _post_parse(self) -> None: 89 if self.raw: 90 self.data, self.units = parse(self.raw, self.issued) 91 92 @staticmethod 93 def sanitize(report: str) -> str: 94 """Sanitizes the report string""" 95 return sanitize(report) 96 97 def intersects(self, path: LineString) -> bool: 98 """Returns True if the report area intersects a flight path""" 99 if LineString is None: 100 extra = "shape" 101 raise MissingExtraModule(extra) 102 if not self.data: 103 return False 104 for data in (self.data.observation, self.data.forecast): 105 if data: 106 poly = data.poly 107 if poly and path.intersects(poly): 108 return True 109 return False 110 111 def contains(self, coord: Coord) -> bool: 112 """Returns True if the report area contains a coordinate""" 113 if not self.data: 114 return False 115 for data in (self.data.observation, self.data.forecast): 116 if data: 117 poly = data.poly 118 if poly and coord.point.within(poly): 119 return True 120 return False
In addition to the manager, you can use the avwx.AirSigmet
class like any
other report when you supply the report string via parse
or
from_report
.
>>> from avwx import AirSigmet
>>> report = 'WSPR31 SPJC 270529 SPIM SIGMET 3 VALID 270530/270830 SPJC- SPIM LIMA FIR EMBD TS OBS AT 0510Z NE OF LINE S0406 W07103 - S0358 W07225 - S0235 W07432 - S0114 W07503 TOP FL410 MOV SW NC='
>>> sigmet = AirSigmet.from_report(report)
True
>>> sigmet.last_updated
datetime.datetime(2022, 3, 27, 6, 29, 33, 300935, tzinfo=datetime.timezone.utc)
>>> sigmet.data.observation.coords
[Coord(lat=-4.06, lon=-71.03, repr='S0406 W07103'),
Coord(lat=-3.58, lon=-72.25, repr='S0358 W07225'),
Coord(lat=-2.35, lon=-74.32, repr='S0235 W07432'),
Coord(lat=-1.14, lon=-75.03, repr='S0114 W07503')]
>>> sigmet.data.observation.intensity
Code(repr='NC', value='No change')
>>> sigmet.data.observation.ceiling
Number(repr='FL410', value=410, spoken='flight level four one zero')
92 @staticmethod 93 def sanitize(report: str) -> str: 94 """Sanitizes the report string""" 95 return sanitize(report)
Sanitizes the report string
97 def intersects(self, path: LineString) -> bool: 98 """Returns True if the report area intersects a flight path""" 99 if LineString is None: 100 extra = "shape" 101 raise MissingExtraModule(extra) 102 if not self.data: 103 return False 104 for data in (self.data.observation, self.data.forecast): 105 if data: 106 poly = data.poly 107 if poly and path.intersects(poly): 108 return True 109 return False
Returns True if the report area intersects a flight path
111 def contains(self, coord: Coord) -> bool: 112 """Returns True if the report area contains a coordinate""" 113 if not self.data: 114 return False 115 for data in (self.data.observation, self.data.forecast): 116 if data: 117 poly = data.poly 118 if poly and coord.point.within(poly): 119 return True 120 return False
Returns True if the report area contains a coordinate
Inherited Members
123class AirSigManager: 124 """ 125 Because of the global nature of these report types, we don't initialize a 126 report class with a station ident like the other report types. Instead, we 127 use a class to manage and update the list of all active SIGMET and AIRMET 128 reports. 129 130 ```python 131 >>> from avwx import AirSigManager 132 >>> from avwx.structs import Coord 133 >>> manager = AirSigManager() 134 >>> manager.update() 135 True 136 >>> manager.last_updated 137 datetime.datetime(2022, 3, 27, 5, 54, 21, 516741, tzinfo=datetime.timezone.utc) 138 >>> len(manager.reports) 139 113 140 >>> len(manager.contains(Coord(lat=33.12, lon=-105))) 141 5 142 >>> manager.reports[0].data.bulletin.type 143 Code(repr='WA', value='airmet') 144 >>> manager.reports[0].data.type 145 'AIRMET SIERRA FOR IFR AND MTN OBSCN' 146 ``` 147 """ 148 149 _services: list[Service] 150 _raw: list[tuple[str, str | None]] 151 last_updated: datetime | None = None 152 raw: list[str] 153 reports: list[AirSigmet] | None = None 154 155 def __init__(self): # type: ignore 156 self._services = [NoaaBulk("airsigmet"), NoaaIntl("airsigmet")] 157 self._raw, self.raw = [], [] 158 159 async def _update(self, index: int, timeout: int) -> list[tuple[str, str | None]]: 160 source = self._services[index].root 161 reports = await self._services[index].async_fetch(timeout=timeout) # type: ignore 162 raw: list[tuple[str, str | None]] = [(report, source) for report in reports if report] 163 return raw 164 165 def update(self, timeout: int = 10, *, disable_post: bool = False) -> bool: 166 """Updates fetched reports and returns whether they've changed""" 167 return aio.run(self.async_update(timeout, disable_post=disable_post)) 168 169 async def async_update(self, timeout: int = 10, *, disable_post: bool = False) -> bool: 170 """Updates fetched reports and returns whether they've changed""" 171 coros = [self._update(i, timeout) for i in range(len(self._services))] 172 data = await aio.gather(*coros) 173 raw = list(chain.from_iterable(data)) 174 reports = [i[0] for i in raw] 175 if raw == self._raw: 176 return False 177 self._raw, self.raw = raw, reports 178 self.last_updated = datetime.now(tz=timezone.utc) 179 # Parse reports if not disabled 180 if not disable_post: 181 parsed = [] 182 for report, source in raw: 183 try: 184 if obj := AirSigmet.from_report(report): 185 obj.source = source 186 parsed.append(obj) 187 except Exception as exc: # noqa: BLE001 188 exceptions.exception_intercept(exc, raw={"report": report}) 189 self.reports = parsed 190 return True 191 192 def along(self, coords: list[Coord]) -> list[AirSigmet]: 193 """Returns available reports the intersect a flight path""" 194 if LineString is None: 195 extra = "shape" 196 raise MissingExtraModule(extra) 197 if self.reports is None: 198 return [] 199 path = LineString([c.pair for c in coords]) 200 return [r for r in self.reports if r.intersects(path)] 201 202 def contains(self, coord: Coord) -> list[AirSigmet]: 203 """Returns available reports that contain a coordinate""" 204 if self.reports is None: 205 return [] 206 return [r for r in self.reports if r.contains(coord)]
Because of the global nature of these report types, we don't initialize a report class with a station ident like the other report types. Instead, we use a class to manage and update the list of all active SIGMET and AIRMET reports.
>>> from avwx import AirSigManager
>>> from avwx.structs import Coord
>>> manager = AirSigManager()
>>> manager.update()
True
>>> manager.last_updated
datetime.datetime(2022, 3, 27, 5, 54, 21, 516741, tzinfo=datetime.timezone.utc)
>>> len(manager.reports)
113
>>> len(manager.contains(Coord(lat=33.12, lon=-105)))
5
>>> manager.reports[0].data.bulletin.type
Code(repr='WA', value='airmet')
>>> manager.reports[0].data.type
'AIRMET SIERRA FOR IFR AND MTN OBSCN'
165 def update(self, timeout: int = 10, *, disable_post: bool = False) -> bool: 166 """Updates fetched reports and returns whether they've changed""" 167 return aio.run(self.async_update(timeout, disable_post=disable_post))
Updates fetched reports and returns whether they've changed
169 async def async_update(self, timeout: int = 10, *, disable_post: bool = False) -> bool: 170 """Updates fetched reports and returns whether they've changed""" 171 coros = [self._update(i, timeout) for i in range(len(self._services))] 172 data = await aio.gather(*coros) 173 raw = list(chain.from_iterable(data)) 174 reports = [i[0] for i in raw] 175 if raw == self._raw: 176 return False 177 self._raw, self.raw = raw, reports 178 self.last_updated = datetime.now(tz=timezone.utc) 179 # Parse reports if not disabled 180 if not disable_post: 181 parsed = [] 182 for report, source in raw: 183 try: 184 if obj := AirSigmet.from_report(report): 185 obj.source = source 186 parsed.append(obj) 187 except Exception as exc: # noqa: BLE001 188 exceptions.exception_intercept(exc, raw={"report": report}) 189 self.reports = parsed 190 return True
Updates fetched reports and returns whether they've changed
192 def along(self, coords: list[Coord]) -> list[AirSigmet]: 193 """Returns available reports the intersect a flight path""" 194 if LineString is None: 195 extra = "shape" 196 raise MissingExtraModule(extra) 197 if self.reports is None: 198 return [] 199 path = LineString([c.pair for c in coords]) 200 return [r for r in self.reports if r.intersects(path)]
Returns available reports the intersect a flight path
602def sanitize(report: str) -> str: 603 """Sanitized AIRMET / SIGMET report string""" 604 report = report.strip(" =") 605 for key, val in _REPLACE.items(): 606 report = report.replace(key, val) 607 data = report.split() 608 for i, item in reversed(list(enumerate(data))): 609 # Remove extra element on altitude Ex: FL450Z skip 1000FT 610 if ( 611 len(item) > 4 612 and not item[-1].isdigit() 613 and item[-2:] != "FT" 614 and item[-1] != "M" 615 and core.is_altitude(item[:-1]) 616 ): 617 data[i] = item[:-1] 618 # Split attached movement direction Ex: NE05KT 619 if len(item) >= 4 and item.endswith(("KT", "KMH")) and item[: _find_first_digit(item)] in CARDINALS: 620 index = _find_first_digit(item) 621 direction = item[:index] 622 data.insert(i + 1, item[index:]) 623 data[i] = direction 624 return " ".join(data)
Sanitized AIRMET / SIGMET report string
627def parse(report: str, issued: date | None = None) -> tuple[AirSigmetData, Units]: 628 """Parse AIRMET / SIGMET report string""" 629 units = Units.international() 630 sanitized = sanitize(report) 631 data, bulletin, issuer, time, correction = _header(_parse_prep(sanitized)) 632 data, area, report_type, start_time, end_time, station = _spacetime(data) 633 body = sanitized[sanitized.find(" ".join(data[:2])) :] 634 # Trim AIRMET type 635 if data[0] == "AIRMET": 636 with suppress(ValueError): 637 data = data[data.index("<elip>") + 1 :] 638 data, region = _region(data) 639 units, observation, forecast = _observations(data, units, issued) 640 struct = AirSigmetData( 641 raw=report, 642 sanitized=sanitized, 643 station=station, 644 time=core.make_timestamp(time, target_date=issued), 645 remarks=None, 646 bulletin=bulletin, 647 issuer=issuer, 648 correction=correction, 649 area=area, 650 type=report_type, 651 start_time=core.make_timestamp(start_time, target_date=issued), 652 end_time=core.make_timestamp(end_time, target_date=issued), 653 body=body, 654 region=region, 655 observation=observation, 656 forecast=forecast, 657 ) 658 return struct, units
Parse AIRMET / SIGMET report string