avwx.parsing.core
Contains the core parsing and indent functions of avwx.
1"""Contains the core parsing and indent functions of avwx.""" 2 3# stdlib 4from __future__ import annotations 5 6import datetime as dt 7import math 8import re 9from calendar import monthrange 10from contextlib import suppress 11from copy import copy 12from typing import TYPE_CHECKING, Any 13 14# library 15from dateutil.relativedelta import relativedelta 16 17# module 18from avwx.static.core import ( 19 CARDINALS, 20 CLOUD_LIST, 21 FRACTIONS, 22 NUMBER_REPL, 23 SPECIAL_NUMBERS, 24 WIND_UNITS, 25) 26from avwx.structs import Cloud, Fraction, Number, Timestamp, Units 27 28if TYPE_CHECKING: 29 from collections.abc import Iterable 30 31 32def dedupe(items: Iterable[Any], *, only_neighbors: bool = False) -> list[Any]: 33 """Deduplicate a list while keeping order. 34 35 If only_neighbors is True, dedupe will only check neighboring values. 36 """ 37 ret: list[Any] = [] 38 for item in items: 39 if (only_neighbors and ret and ret[-1] != item) or item not in ret: 40 ret.append(item) 41 return ret 42 43 44def is_unknown(value: str) -> bool: 45 """Return True if val represents and unknown value.""" 46 if not isinstance(value, str): 47 raise TypeError 48 if not value or value.upper() in {"UNKN", "UNK", "UKN"}: 49 return True 50 for char in value: 51 if char not in ("/", "X", "."): 52 break 53 else: 54 return True 55 return False 56 57 58def get_digit_list(data: list[str], from_index: int) -> tuple[list[str], list[str]]: 59 """Return a list of items removed from a given list of strings 60 that are all digits from 'from_index' until hitting a non-digit item. 61 """ 62 ret = [] 63 data.pop(from_index) 64 while len(data) > from_index and data[from_index].isdigit(): 65 ret.append(data.pop(from_index)) 66 return data, ret 67 68 69def unpack_fraction(num: str) -> str: 70 """Return unpacked fraction string 5/2 -> 2 1/2.""" 71 numbers = [int(n) for n in num.split("/") if n] 72 if len(numbers) != 2 or numbers[0] <= numbers[1]: 73 return num 74 numerator, denominator = numbers 75 over = numerator // denominator 76 rem = numerator % denominator 77 return f"{over} {rem}/{denominator}" 78 79 80def remove_leading_zeros(num: str) -> str: 81 """Strip zeros while handling -, M, and empty strings.""" 82 if not num: 83 return num 84 if num.startswith("M"): 85 ret = "M" + num[1:].lstrip("0") 86 elif num.startswith("-"): 87 ret = "-" + num[1:].lstrip("0") 88 else: 89 ret = num.lstrip("0") 90 return "0" if ret in ("", "M", "-") else ret 91 92 93SPOKEN_POSTFIX = ( 94 (" zero zero zero", " thousand"), 95 (" zero zero", " hundred"), 96) 97 98 99def spoken_number(num: str, *, literal: bool = False) -> str: 100 """Return the spoken version of a number. 101 102 If literal, no conversion to hundreds/thousands 103 104 Ex: 1.2 -> one point two 105 1 1/2 -> one and one half 106 25000 -> two five thousand 107 """ 108 ret = [] 109 for part in num.split(): 110 if part in FRACTIONS: 111 ret.append(FRACTIONS[part]) 112 else: 113 val = " ".join(NUMBER_REPL[char] for char in part if char in NUMBER_REPL) 114 if not literal: 115 for target, replacement in SPOKEN_POSTFIX: 116 if val.endswith(target): 117 val = val[: -len(target)] + replacement 118 ret.append(val) 119 return " and ".join(ret) 120 121 122def make_fraction( 123 num: str, 124 repr: str | None = None, # noqa: A002 125 *, 126 literal: bool = False, 127 speak_prefix: str = "", 128) -> Fraction: 129 """Return a fraction dataclass for numbers with / in them.""" 130 num_str, den_str = num.split("/") 131 # 2-1/2 but not -2 1/2 132 if "-" in num_str and not num_str.startswith("-"): 133 num_str = num_str.replace("-", " ") 134 denominator = int(den_str) 135 # Multiply multi-digit numerator 136 if len(num_str) > 1: 137 numerator = int(num_str[:-1]) * denominator + int(num_str[-1]) 138 num = f"{numerator}/{denominator}" 139 else: 140 numerator = int(num_str) 141 value = numerator / denominator 142 unpacked = unpack_fraction(num) 143 spoken = speak_prefix + spoken_number(unpacked, literal=literal) 144 return Fraction(repr or num, value, spoken, numerator, denominator, unpacked) 145 146 147def make_number( 148 num: str | None, 149 repr: str | None = None, # noqa: A002 150 speak: str | None = None, 151 *, 152 literal: bool = False, 153 special: dict | None = None, 154 m_minus: bool = True, 155) -> Number | Fraction | None: 156 """Return a Number or Fraction dataclass for a number string. 157 158 If literal, spoken string will not convert to hundreds/thousands. 159 160 NOTE: Numerators are assumed to have a single digit. Additional are whole numbers. 161 """ 162 if not num or is_unknown(num): 163 return None 164 # Check special 165 with suppress(KeyError): 166 item = (special or {}).get(num) or SPECIAL_NUMBERS[num] 167 if isinstance(item, tuple): 168 value, spoken = item 169 else: 170 value = item 171 spoken = spoken_number(str(value), literal=literal) 172 return Number(repr or num, value, spoken) 173 # Check cardinal direction 174 if num in CARDINALS: 175 if not repr: 176 repr = num # noqa: A001 177 num = str(CARDINALS[num]) 178 val_str = num 179 # Remove unit suffixes 180 if val_str.endswith("SM"): 181 repr = val_str[:] # noqa: A001 182 val_str = val_str[:-2] 183 # Remove spurious characters from the end 184 num = num.rstrip("M.") 185 num = num.replace("O", "0") 186 num = num.replace("+", "") 187 num = num.replace(",", "") 188 # Handle Minus values with errors like 0M04 189 if m_minus and "M" in num: 190 val_str = num.replace("MM", "-").replace("M", "-") 191 while val_str[0] != "-": 192 val_str = val_str[1:] 193 # Check value prefixes 194 speak_prefix = "" 195 if val_str.startswith("ABV "): 196 speak_prefix += "above " 197 val_str = val_str[4:] 198 if val_str.startswith("BLW "): 199 speak_prefix += "below " 200 val_str = val_str[4:] 201 if val_str.startswith("FL"): 202 speak_prefix += "flight level " 203 val_str, literal = val_str[2:], True 204 if val_str.startswith("M"): 205 speak_prefix += "less than " 206 repr = repr or val_str # noqa: A001 207 val_str = val_str[1:] 208 if val_str.startswith("P"): 209 speak_prefix += "greater than " 210 repr = repr or val_str # noqa: A001 211 val_str = val_str[1:] 212 # Create Number 213 if not val_str: 214 return None 215 ret: Number | Fraction | None = None 216 # Create Fraction 217 if "/" in val_str: 218 ret = make_fraction(val_str, repr, literal=literal, speak_prefix=speak_prefix) 219 else: 220 # Overwrite float 0 due to "0.0" literal 221 value = float(val_str) or 0 if "." in num else int(val_str) 222 spoken = speak_prefix + spoken_number(speak or str(value), literal=literal) 223 ret = Number(repr or num, value, spoken) 224 # Null the value if "greater than"/"less than" 225 if ret and not m_minus and repr and repr.startswith(("M", "P")): 226 ret.value = None 227 return ret 228 229 230def find_first_in_list(txt: str, str_list: list[str]) -> int: 231 """Return the index of the earliest occurrence of an item from a list in a string. 232 233 Ex: find_first_in_list('foobar', ['bar', 'fin']) -> 3 234 """ 235 start = len(txt) + 1 236 for item in str_list: 237 if start > txt.find(item) > -1: 238 start = txt.find(item) 239 return start if len(txt) + 1 > start > -1 else -1 240 241 242def is_timestamp(item: str) -> bool: 243 """Return True if the item matches the timestamp format.""" 244 return len(item) == 7 and item[-1] == "Z" and item[:-1].isdigit() 245 246 247def is_timerange(item: str) -> bool: 248 """Return True if the item is a TAF to-from time range.""" 249 return len(item) == 9 and item[4] == "/" and item[:4].isdigit() and item[5:].isdigit() 250 251 252def is_possible_temp(temp: str) -> bool: 253 """Return True if all characters are digits or 'M' for minus.""" 254 return all((char.isdigit() or char == "M") for char in temp) 255 256 257_Numeric = int | float 258 259 260def relative_humidity(temperature: _Numeric, dewpoint: _Numeric, unit: str = "C") -> float: 261 """Calculate the relative humidity as a 0 to 1 percentage.""" 262 263 def saturation(value: _Numeric) -> float: 264 """Return the saturation vapor pressure without the C constant for humidity calc.""" 265 return math.exp((17.67 * value) / (243.5 + value)) 266 267 if unit == "F": 268 dewpoint = (dewpoint - 32) * 5 / 9 269 temperature = (temperature - 32) * 5 / 9 270 return saturation(dewpoint) / saturation(temperature) 271 272 273# https://aviation.stackexchange.com/questions/47971/how-do-i-calculate-density-altitude-by-hand 274 275 276def pressure_altitude(pressure: float, altitude: _Numeric, unit: str = "inHg") -> int: 277 """Calculate the pressure altitude in feet. Converts pressure units.""" 278 if unit == "hPa": 279 pressure *= 0.02953 280 return round((29.92 - pressure) * 1000 + altitude) 281 282 283def density_altitude(pressure: float, temperature: _Numeric, altitude: _Numeric, units: Units) -> int: 284 """Calculate the density altitude in feet. Converts pressure and temperature units.""" 285 if units.temperature == "F": 286 temperature = (temperature - 32) * 5 / 9 287 if units.altimeter == "hPa": 288 pressure *= 0.02953 289 pressure_alt = pressure_altitude(pressure, altitude) 290 standard = 15 - (2 * altitude / 1000) 291 return round(((temperature - standard) * 120) + pressure_alt) 292 293 294def get_station_and_time( 295 data: list[str], 296) -> tuple[list[str], str | None, str | None]: 297 """Return the report list and removed station ident and time strings.""" 298 if not data: 299 return data, None, None 300 station = data.pop(0) 301 if not data: 302 return data, station, None 303 q_time, r_time = data[0], None 304 if data and q_time.endswith("Z") and q_time[:-1].isdigit(): 305 r_time = data.pop(0) 306 elif data and len(q_time) == 6 and q_time.isdigit(): 307 r_time = f"{data.pop(0)}Z" 308 return data, station, r_time 309 310 311def is_wind(text: str) -> bool: 312 """Return True if the text is likely a normal wind element.""" 313 # Ignore wind shear 314 if text.startswith("WS"): 315 return False 316 # 09010KT, 09010G15KT 317 if len(text) > 4: 318 for ending in WIND_UNITS: 319 unit_index = text.find(ending) 320 if text.endswith(ending) and text[unit_index - 2 : unit_index].isdigit(): 321 return True 322 # 09010 09010G15 VRB10 323 if len(text) != 5 and (len(text) < 8 or "G" not in text or "/" in text): 324 return False 325 return text[:5].isdigit() or (text.startswith("VRB") and text[3:5].isdigit()) 326 327 328VARIABLE_DIRECTION_PATTERN = re.compile(r"\d{3}V\d{3}") 329 330 331def is_variable_wind_direction(text: str) -> bool: 332 """Return True if element looks like 350V040.""" 333 if len(text) < 7: 334 return False 335 return VARIABLE_DIRECTION_PATTERN.match(text[:7]) is not None 336 337 338def separate_wind(text: str) -> tuple[str, str, str]: 339 """Extract the direction, speed, and gust from a wind element.""" 340 direction, speed, gust = "", "", "" 341 # Remove gust 342 if "G" in text: 343 g_index = text.find("G") 344 start, end = g_index + 1, g_index + 3 345 # 16006GP99KT ie gust greater than 346 if "GP" in text: 347 end += 1 348 gust = text[start:end] 349 text = text[:g_index] + text[end:] 350 if text: 351 # 10G18KT 352 if len(text) == 2: 353 speed = text 354 else: 355 direction = text[:3] 356 speed = text[3:] 357 return direction, speed, gust 358 359 360def get_wind( 361 data: list[str], units: Units 362) -> tuple[ 363 list[str], 364 Number | None, 365 Number | None, 366 Number | None, 367 list[Number], 368]: 369 """Return the report list, direction string, speed string, gust string, and variable direction list.""" 370 direction, speed, gust = "", "", "" 371 variable: list[Number] = [] 372 # Remove unit and split elements 373 if data: 374 item = copy(data[0]) 375 if is_wind(item): 376 for key, unit in WIND_UNITS.items(): 377 if item.endswith(key): 378 units.wind_speed = unit 379 item = item.replace(key, "") 380 break 381 direction, speed, gust = separate_wind(item) 382 data.pop(0) 383 # Separated Gust 384 if data and 1 < len(data[0]) < 4 and data[0][0] == "G" and data[0][1:].isdigit(): 385 gust = data.pop(0)[1:] 386 # Variable Wind Direction 387 if data and is_variable_wind_direction(data[0]): 388 for item in data.pop(0).split("V"): 389 value = make_number(item, speak=item, literal=True) 390 if value is not None: 391 variable.append(value) 392 # Convert to Number 393 direction_value = make_number(direction, speak=direction, literal=True) 394 speed_value = make_number(speed.strip("BV"), m_minus=False) 395 gust_value = make_number(gust, m_minus=False) 396 return data, direction_value, speed_value, gust_value, variable 397 398 399def get_visibility(data: list[str], units: Units) -> tuple[list[str], Number | None]: 400 """Return the report list and removed visibility string.""" 401 visibility = "" 402 if data: 403 item = copy(data[0]) 404 # Vis reported in statue miles 405 if item.endswith("SM"): # 10SM 406 if item[:-2].isdigit(): 407 visibility = str(int(item[:-2])) 408 elif "/" in item: 409 visibility = item[: item.find("SM")] # 1/2SM 410 else: 411 visibility = item[:-2] 412 data.pop(0) 413 units.visibility = "sm" 414 # Vis reported in meters 415 elif len(item) == 4 and item.isdigit(): 416 visibility = data.pop(0) 417 units.visibility = "m" 418 elif 7 >= len(item) >= 5 and item[:4].isdigit() and (item[4] in ["M", "N", "S", "E", "W"] or item[4:] == "NDV"): 419 visibility = data.pop(0)[:4] 420 units.visibility = "m" 421 elif len(item) == 5 and item[1:].isdigit() and item[0] in ["M", "P", "B"]: 422 visibility = data.pop(0)[1:] 423 units.visibility = "m" 424 elif item.endswith("KM"): 425 visibility = f"{item[:-2]}000" 426 data.pop(0) 427 units.visibility = "m" 428 # Vis statute miles but split Ex: 2 1/2SM 429 elif len(data) > 1 and data[1].endswith("SM") and "/" in data[1] and item.isdigit(): 430 vis1 = data.pop(0) # 2 431 vis2 = data.pop(0).replace("SM", "") # 1/2 432 visibility = str(int(vis1) * int(vis2[2]) + int(vis2[0])) + vis2[1:] # 5/2 433 units.visibility = "sm" 434 return data, make_number(visibility, m_minus=False) 435 436 437def sanitize_cloud(cloud: str) -> str: 438 """Fix rare cloud layer issues.""" 439 if len(cloud) < 4: 440 return cloud 441 if not cloud[3].isdigit() and cloud[3] not in ("/", "-"): 442 # Bad "O": FEWO03 -> FEW003 443 if cloud[3] == "O": 444 cloud = f"{cloud[:3]}0{cloud[4:]}" 445 # Move modifiers to end: BKNC015 -> BKN015C 446 elif cloud[3] != "U" and cloud[:4] not in {"BASE", "UNKN"}: 447 cloud = cloud[:3] + cloud[4:] + cloud[3] 448 return cloud 449 450 451def _null_or_int(val: str | None) -> int | None: 452 """Nullify unknown elements and convert ints.""" 453 return None if not isinstance(val, str) or is_unknown(val) else int(val) 454 455 456_TOP_OFFSETS = ("-TOPS", "-TOP") 457 458 459def make_cloud(cloud: str) -> Cloud: 460 """Return a Cloud dataclass for a cloud string. 461 462 This function assumes the input is potentially valid. 463 """ 464 raw_cloud = cloud 465 cloud_type = "" 466 base: str | None = None 467 top: str | None = None 468 cloud = sanitize_cloud(cloud).replace("/", "") 469 # Separate top 470 for target in _TOP_OFFSETS: 471 topi = cloud.find(target) 472 if topi > -1: 473 top, cloud = cloud[topi + len(target) :], cloud[:topi] 474 break 475 # Separate type 476 ## BASE027 477 if cloud.startswith("BASES"): 478 cloud = cloud[5:] 479 elif cloud.startswith("BASE"): 480 cloud = cloud[4:] 481 ## VV003 482 elif cloud.startswith("VV"): 483 cloud_type, cloud = cloud[:2], cloud[2:] 484 ## FEW010 485 elif len(cloud) >= 3 and cloud[:3] in CLOUD_LIST: 486 cloud_type, cloud = cloud[:3], cloud[3:] 487 ## BKN-OVC065 488 if len(cloud) > 4 and cloud[0] == "-" and cloud[1:4] in CLOUD_LIST: 489 cloud_type += cloud[:4] 490 cloud = cloud[4:] 491 # Separate base 492 if len(cloud) >= 3 and cloud[:3].isdigit(): 493 base, cloud = cloud[:3], cloud[3:] 494 elif len(cloud) >= 4 and cloud[:4] == "UNKN": 495 cloud = cloud[4:] 496 # Remainder is considered modifiers 497 modifier = cloud or None 498 # Make Cloud 499 return Cloud(raw_cloud, cloud_type or None, _null_or_int(base), _null_or_int(top), modifier) 500 501 502def get_clouds(data: list[str]) -> tuple[list[str], list]: 503 """Return the report list and removed list of split cloud layers.""" 504 clouds = [] 505 for i, item in reversed(list(enumerate(data))): 506 if item[:3] in CLOUD_LIST or item[:2] == "VV": 507 cloud = data.pop(i) 508 clouds.append(make_cloud(cloud)) 509 # Attempt cloud sort. Fails if None values are present 510 try: 511 clouds.sort(key=lambda cloud: (cloud.base, cloud.type)) 512 except TypeError: 513 clouds.reverse() # Restores original report order 514 return data, clouds 515 516 517def get_flight_rules(visibility: Number | None, ceiling: Cloud | None) -> int: 518 """Return int based on current flight rules from parsed METAR data. 519 520 0=VFR, 1=MVFR, 2=IFR, 3=LIFR 521 522 Note: Common practice is to report no higher than IFR if visibility unavailable. 523 """ 524 # Parse visibility 525 vis: _Numeric 526 if visibility is None: 527 vis = 2 528 elif visibility.repr == "CAVOK" or visibility.repr.startswith("P6"): 529 vis = 10 530 elif visibility.repr.startswith("M"): 531 vis = 0 532 elif visibility.value is None: 533 vis = 2 534 # Convert meters to miles 535 elif len(visibility.repr) == 4: 536 vis = (visibility.value or 0) * 0.000621371 537 else: 538 vis = visibility.value or 0 539 # Parse ceiling 540 cld = (ceiling.base if ceiling else 99) or 99 541 # Determine flight rules 542 if (vis <= 5) or (cld <= 30): 543 if (vis < 3) or (cld < 10): 544 if (vis < 1) or (cld < 5): 545 return 3 # LIFR 546 return 2 # IFR 547 return 1 # MVFR 548 return 0 # VFR 549 550 551def get_ceiling(clouds: list[Cloud]) -> Cloud | None: 552 """Return ceiling layer from Cloud-List or None if none found. 553 554 Assumes that the clouds are already sorted lowest to highest. 555 556 Only 'Broken', 'Overcast', and 'Vertical Visibility' are considered ceilings. 557 558 Prevents errors due to lack of cloud information (eg. '' or 'FEW///') 559 """ 560 return next((c for c in clouds if c.base and c.type in {"OVC", "BKN", "VV"}), None) 561 562 563def is_altitude(value: str) -> bool: 564 """Return True if the value is a possible altitude.""" 565 if len(value) < 5: 566 return False 567 if value.startswith("SFC/"): 568 return True 569 if value.startswith("FL") and value[2:5].isdigit(): 570 return True 571 first, *_ = value.split("/") 572 return bool(first[-2:] == "FT" and first[-5:-2].isdigit()) 573 574 575def make_altitude( 576 value: str, 577 units: Units, 578 repr: str | None = None, # noqa: A002 579 *, 580 force_fl: bool = False, 581) -> tuple[Number | None, Units]: 582 """Convert altitude string into a number.""" 583 if not value: 584 return None, units 585 raw = repr or value 586 for end in ("FT", "M"): 587 if value.endswith(end): 588 force_fl = False 589 units.altitude = end.lower() 590 value = value.removesuffix(end) 591 # F430 592 if value[0] == "F" and value[1:].isdigit(): 593 value = f"FL{value[1:]}" 594 if force_fl and value[:2] != "FL": 595 value = f"FL{value}" 596 return make_number(value, repr=raw), units 597 598 599def parse_date( 600 date: str, 601 hour_threshold: int = 200, 602 *, 603 time_only: bool = False, 604 target: dt.date | None = None, 605) -> dt.datetime | None: 606 """Parse a report timestamp in ddhhZ or ddhhmmZ format. 607 608 If time_only, assumes hhmm format with current or previous day. 609 610 This function assumes the given timestamp is within the hour threshold from current date. 611 """ 612 # Format date string 613 date = date.strip("Z") 614 if not date.isdigit(): 615 return None 616 if time_only: 617 if len(date) != 4: 618 return None 619 index_hour = 0 620 else: 621 if len(date) == 4: 622 date += "00" 623 if len(date) != 6: 624 return None 625 index_hour = 2 626 # Create initial guess 627 if target: 628 target = dt.datetime(target.year, target.month, target.day, tzinfo=dt.timezone.utc) 629 else: 630 target = dt.datetime.now(tz=dt.timezone.utc) 631 day = target.day if time_only else int(date[:2]) 632 hour = int(date[index_hour : index_hour + 2]) 633 # Handle situation where next month has less days than current month 634 # Shifted value makes sure that a month shift doesn't happen twice 635 shifted = False 636 if day > monthrange(target.year, target.month)[1]: 637 target += relativedelta(months=-1) 638 shifted = True 639 try: 640 guess = target.replace( 641 day=day, 642 hour=hour % 24, 643 minute=int(date[index_hour + 2 : index_hour + 4]) % 60, 644 second=0, 645 microsecond=0, 646 ) 647 except ValueError: 648 return None 649 # Handle overflow hour 650 if hour > 23: 651 guess += dt.timedelta(days=1) 652 # Handle changing months if not already shifted 653 if not shifted: 654 hourdiff = (guess - target) / dt.timedelta(minutes=1) / 60 655 if hourdiff > hour_threshold: 656 guess += relativedelta(months=-1) 657 elif hourdiff < -hour_threshold: 658 guess += relativedelta(months=+1) 659 return guess 660 661 662def make_timestamp( 663 timestamp: str | None, 664 *, 665 time_only: bool = False, 666 target_date: dt.date | None = None, 667) -> Timestamp | None: 668 """Return a Timestamp dataclass for a report timestamp in ddhhZ or ddhhmmZ format.""" 669 if not timestamp: 670 return None 671 date_obj = parse_date(timestamp, time_only=time_only, target=target_date) 672 return Timestamp(timestamp, date_obj) 673 674 675def is_runway_visibility(item: str) -> bool: 676 """Return True if the item is a runway visibility range string.""" 677 return ( 678 len(item) > 4 679 and item[0] == "R" 680 and (item[3] == "/" or item[4] == "/") 681 and item[1:3].isdigit() 682 and "CLRD" not in item # R28/CLRD70 Runway State 683 )
33def dedupe(items: Iterable[Any], *, only_neighbors: bool = False) -> list[Any]: 34 """Deduplicate a list while keeping order. 35 36 If only_neighbors is True, dedupe will only check neighboring values. 37 """ 38 ret: list[Any] = [] 39 for item in items: 40 if (only_neighbors and ret and ret[-1] != item) or item not in ret: 41 ret.append(item) 42 return ret
Deduplicate a list while keeping order.
If only_neighbors is True, dedupe will only check neighboring values.
45def is_unknown(value: str) -> bool: 46 """Return True if val represents and unknown value.""" 47 if not isinstance(value, str): 48 raise TypeError 49 if not value or value.upper() in {"UNKN", "UNK", "UKN"}: 50 return True 51 for char in value: 52 if char not in ("/", "X", "."): 53 break 54 else: 55 return True 56 return False
Return True if val represents and unknown value.
59def get_digit_list(data: list[str], from_index: int) -> tuple[list[str], list[str]]: 60 """Return a list of items removed from a given list of strings 61 that are all digits from 'from_index' until hitting a non-digit item. 62 """ 63 ret = [] 64 data.pop(from_index) 65 while len(data) > from_index and data[from_index].isdigit(): 66 ret.append(data.pop(from_index)) 67 return data, ret
Return a list of items removed from a given list of strings that are all digits from 'from_index' until hitting a non-digit item.
70def unpack_fraction(num: str) -> str: 71 """Return unpacked fraction string 5/2 -> 2 1/2.""" 72 numbers = [int(n) for n in num.split("/") if n] 73 if len(numbers) != 2 or numbers[0] <= numbers[1]: 74 return num 75 numerator, denominator = numbers 76 over = numerator // denominator 77 rem = numerator % denominator 78 return f"{over} {rem}/{denominator}"
Return unpacked fraction string 5/2 -> 2 1/2.
81def remove_leading_zeros(num: str) -> str: 82 """Strip zeros while handling -, M, and empty strings.""" 83 if not num: 84 return num 85 if num.startswith("M"): 86 ret = "M" + num[1:].lstrip("0") 87 elif num.startswith("-"): 88 ret = "-" + num[1:].lstrip("0") 89 else: 90 ret = num.lstrip("0") 91 return "0" if ret in ("", "M", "-") else ret
Strip zeros while handling -, M, and empty strings.
100def spoken_number(num: str, *, literal: bool = False) -> str: 101 """Return the spoken version of a number. 102 103 If literal, no conversion to hundreds/thousands 104 105 Ex: 1.2 -> one point two 106 1 1/2 -> one and one half 107 25000 -> two five thousand 108 """ 109 ret = [] 110 for part in num.split(): 111 if part in FRACTIONS: 112 ret.append(FRACTIONS[part]) 113 else: 114 val = " ".join(NUMBER_REPL[char] for char in part if char in NUMBER_REPL) 115 if not literal: 116 for target, replacement in SPOKEN_POSTFIX: 117 if val.endswith(target): 118 val = val[: -len(target)] + replacement 119 ret.append(val) 120 return " and ".join(ret)
Return the spoken version of a number.
If literal, no conversion to hundreds/thousands
Ex: 1.2 -> one point two 1 1/2 -> one and one half 25000 -> two five thousand
123def make_fraction( 124 num: str, 125 repr: str | None = None, # noqa: A002 126 *, 127 literal: bool = False, 128 speak_prefix: str = "", 129) -> Fraction: 130 """Return a fraction dataclass for numbers with / in them.""" 131 num_str, den_str = num.split("/") 132 # 2-1/2 but not -2 1/2 133 if "-" in num_str and not num_str.startswith("-"): 134 num_str = num_str.replace("-", " ") 135 denominator = int(den_str) 136 # Multiply multi-digit numerator 137 if len(num_str) > 1: 138 numerator = int(num_str[:-1]) * denominator + int(num_str[-1]) 139 num = f"{numerator}/{denominator}" 140 else: 141 numerator = int(num_str) 142 value = numerator / denominator 143 unpacked = unpack_fraction(num) 144 spoken = speak_prefix + spoken_number(unpacked, literal=literal) 145 return Fraction(repr or num, value, spoken, numerator, denominator, unpacked)
Return a fraction dataclass for numbers with / in them.
148def make_number( 149 num: str | None, 150 repr: str | None = None, # noqa: A002 151 speak: str | None = None, 152 *, 153 literal: bool = False, 154 special: dict | None = None, 155 m_minus: bool = True, 156) -> Number | Fraction | None: 157 """Return a Number or Fraction dataclass for a number string. 158 159 If literal, spoken string will not convert to hundreds/thousands. 160 161 NOTE: Numerators are assumed to have a single digit. Additional are whole numbers. 162 """ 163 if not num or is_unknown(num): 164 return None 165 # Check special 166 with suppress(KeyError): 167 item = (special or {}).get(num) or SPECIAL_NUMBERS[num] 168 if isinstance(item, tuple): 169 value, spoken = item 170 else: 171 value = item 172 spoken = spoken_number(str(value), literal=literal) 173 return Number(repr or num, value, spoken) 174 # Check cardinal direction 175 if num in CARDINALS: 176 if not repr: 177 repr = num # noqa: A001 178 num = str(CARDINALS[num]) 179 val_str = num 180 # Remove unit suffixes 181 if val_str.endswith("SM"): 182 repr = val_str[:] # noqa: A001 183 val_str = val_str[:-2] 184 # Remove spurious characters from the end 185 num = num.rstrip("M.") 186 num = num.replace("O", "0") 187 num = num.replace("+", "") 188 num = num.replace(",", "") 189 # Handle Minus values with errors like 0M04 190 if m_minus and "M" in num: 191 val_str = num.replace("MM", "-").replace("M", "-") 192 while val_str[0] != "-": 193 val_str = val_str[1:] 194 # Check value prefixes 195 speak_prefix = "" 196 if val_str.startswith("ABV "): 197 speak_prefix += "above " 198 val_str = val_str[4:] 199 if val_str.startswith("BLW "): 200 speak_prefix += "below " 201 val_str = val_str[4:] 202 if val_str.startswith("FL"): 203 speak_prefix += "flight level " 204 val_str, literal = val_str[2:], True 205 if val_str.startswith("M"): 206 speak_prefix += "less than " 207 repr = repr or val_str # noqa: A001 208 val_str = val_str[1:] 209 if val_str.startswith("P"): 210 speak_prefix += "greater than " 211 repr = repr or val_str # noqa: A001 212 val_str = val_str[1:] 213 # Create Number 214 if not val_str: 215 return None 216 ret: Number | Fraction | None = None 217 # Create Fraction 218 if "/" in val_str: 219 ret = make_fraction(val_str, repr, literal=literal, speak_prefix=speak_prefix) 220 else: 221 # Overwrite float 0 due to "0.0" literal 222 value = float(val_str) or 0 if "." in num else int(val_str) 223 spoken = speak_prefix + spoken_number(speak or str(value), literal=literal) 224 ret = Number(repr or num, value, spoken) 225 # Null the value if "greater than"/"less than" 226 if ret and not m_minus and repr and repr.startswith(("M", "P")): 227 ret.value = None 228 return ret
Return a Number or Fraction dataclass for a number string.
If literal, spoken string will not convert to hundreds/thousands.
NOTE: Numerators are assumed to have a single digit. Additional are whole numbers.
231def find_first_in_list(txt: str, str_list: list[str]) -> int: 232 """Return the index of the earliest occurrence of an item from a list in a string. 233 234 Ex: find_first_in_list('foobar', ['bar', 'fin']) -> 3 235 """ 236 start = len(txt) + 1 237 for item in str_list: 238 if start > txt.find(item) > -1: 239 start = txt.find(item) 240 return start if len(txt) + 1 > start > -1 else -1
Return the index of the earliest occurrence of an item from a list in a string.
Ex: find_first_in_list('foobar', ['bar', 'fin']) -> 3
243def is_timestamp(item: str) -> bool: 244 """Return True if the item matches the timestamp format.""" 245 return len(item) == 7 and item[-1] == "Z" and item[:-1].isdigit()
Return True if the item matches the timestamp format.
248def is_timerange(item: str) -> bool: 249 """Return True if the item is a TAF to-from time range.""" 250 return len(item) == 9 and item[4] == "/" and item[:4].isdigit() and item[5:].isdigit()
Return True if the item is a TAF to-from time range.
253def is_possible_temp(temp: str) -> bool: 254 """Return True if all characters are digits or 'M' for minus.""" 255 return all((char.isdigit() or char == "M") for char in temp)
Return True if all characters are digits or 'M' for minus.
261def relative_humidity(temperature: _Numeric, dewpoint: _Numeric, unit: str = "C") -> float: 262 """Calculate the relative humidity as a 0 to 1 percentage.""" 263 264 def saturation(value: _Numeric) -> float: 265 """Return the saturation vapor pressure without the C constant for humidity calc.""" 266 return math.exp((17.67 * value) / (243.5 + value)) 267 268 if unit == "F": 269 dewpoint = (dewpoint - 32) * 5 / 9 270 temperature = (temperature - 32) * 5 / 9 271 return saturation(dewpoint) / saturation(temperature)
Calculate the relative humidity as a 0 to 1 percentage.
277def pressure_altitude(pressure: float, altitude: _Numeric, unit: str = "inHg") -> int: 278 """Calculate the pressure altitude in feet. Converts pressure units.""" 279 if unit == "hPa": 280 pressure *= 0.02953 281 return round((29.92 - pressure) * 1000 + altitude)
Calculate the pressure altitude in feet. Converts pressure units.
284def density_altitude(pressure: float, temperature: _Numeric, altitude: _Numeric, units: Units) -> int: 285 """Calculate the density altitude in feet. Converts pressure and temperature units.""" 286 if units.temperature == "F": 287 temperature = (temperature - 32) * 5 / 9 288 if units.altimeter == "hPa": 289 pressure *= 0.02953 290 pressure_alt = pressure_altitude(pressure, altitude) 291 standard = 15 - (2 * altitude / 1000) 292 return round(((temperature - standard) * 120) + pressure_alt)
Calculate the density altitude in feet. Converts pressure and temperature units.
295def get_station_and_time( 296 data: list[str], 297) -> tuple[list[str], str | None, str | None]: 298 """Return the report list and removed station ident and time strings.""" 299 if not data: 300 return data, None, None 301 station = data.pop(0) 302 if not data: 303 return data, station, None 304 q_time, r_time = data[0], None 305 if data and q_time.endswith("Z") and q_time[:-1].isdigit(): 306 r_time = data.pop(0) 307 elif data and len(q_time) == 6 and q_time.isdigit(): 308 r_time = f"{data.pop(0)}Z" 309 return data, station, r_time
Return the report list and removed station ident and time strings.
312def is_wind(text: str) -> bool: 313 """Return True if the text is likely a normal wind element.""" 314 # Ignore wind shear 315 if text.startswith("WS"): 316 return False 317 # 09010KT, 09010G15KT 318 if len(text) > 4: 319 for ending in WIND_UNITS: 320 unit_index = text.find(ending) 321 if text.endswith(ending) and text[unit_index - 2 : unit_index].isdigit(): 322 return True 323 # 09010 09010G15 VRB10 324 if len(text) != 5 and (len(text) < 8 or "G" not in text or "/" in text): 325 return False 326 return text[:5].isdigit() or (text.startswith("VRB") and text[3:5].isdigit())
Return True if the text is likely a normal wind element.
332def is_variable_wind_direction(text: str) -> bool: 333 """Return True if element looks like 350V040.""" 334 if len(text) < 7: 335 return False 336 return VARIABLE_DIRECTION_PATTERN.match(text[:7]) is not None
Return True if element looks like 350V040.
339def separate_wind(text: str) -> tuple[str, str, str]: 340 """Extract the direction, speed, and gust from a wind element.""" 341 direction, speed, gust = "", "", "" 342 # Remove gust 343 if "G" in text: 344 g_index = text.find("G") 345 start, end = g_index + 1, g_index + 3 346 # 16006GP99KT ie gust greater than 347 if "GP" in text: 348 end += 1 349 gust = text[start:end] 350 text = text[:g_index] + text[end:] 351 if text: 352 # 10G18KT 353 if len(text) == 2: 354 speed = text 355 else: 356 direction = text[:3] 357 speed = text[3:] 358 return direction, speed, gust
Extract the direction, speed, and gust from a wind element.
361def get_wind( 362 data: list[str], units: Units 363) -> tuple[ 364 list[str], 365 Number | None, 366 Number | None, 367 Number | None, 368 list[Number], 369]: 370 """Return the report list, direction string, speed string, gust string, and variable direction list.""" 371 direction, speed, gust = "", "", "" 372 variable: list[Number] = [] 373 # Remove unit and split elements 374 if data: 375 item = copy(data[0]) 376 if is_wind(item): 377 for key, unit in WIND_UNITS.items(): 378 if item.endswith(key): 379 units.wind_speed = unit 380 item = item.replace(key, "") 381 break 382 direction, speed, gust = separate_wind(item) 383 data.pop(0) 384 # Separated Gust 385 if data and 1 < len(data[0]) < 4 and data[0][0] == "G" and data[0][1:].isdigit(): 386 gust = data.pop(0)[1:] 387 # Variable Wind Direction 388 if data and is_variable_wind_direction(data[0]): 389 for item in data.pop(0).split("V"): 390 value = make_number(item, speak=item, literal=True) 391 if value is not None: 392 variable.append(value) 393 # Convert to Number 394 direction_value = make_number(direction, speak=direction, literal=True) 395 speed_value = make_number(speed.strip("BV"), m_minus=False) 396 gust_value = make_number(gust, m_minus=False) 397 return data, direction_value, speed_value, gust_value, variable
Return the report list, direction string, speed string, gust string, and variable direction list.
400def get_visibility(data: list[str], units: Units) -> tuple[list[str], Number | None]: 401 """Return the report list and removed visibility string.""" 402 visibility = "" 403 if data: 404 item = copy(data[0]) 405 # Vis reported in statue miles 406 if item.endswith("SM"): # 10SM 407 if item[:-2].isdigit(): 408 visibility = str(int(item[:-2])) 409 elif "/" in item: 410 visibility = item[: item.find("SM")] # 1/2SM 411 else: 412 visibility = item[:-2] 413 data.pop(0) 414 units.visibility = "sm" 415 # Vis reported in meters 416 elif len(item) == 4 and item.isdigit(): 417 visibility = data.pop(0) 418 units.visibility = "m" 419 elif 7 >= len(item) >= 5 and item[:4].isdigit() and (item[4] in ["M", "N", "S", "E", "W"] or item[4:] == "NDV"): 420 visibility = data.pop(0)[:4] 421 units.visibility = "m" 422 elif len(item) == 5 and item[1:].isdigit() and item[0] in ["M", "P", "B"]: 423 visibility = data.pop(0)[1:] 424 units.visibility = "m" 425 elif item.endswith("KM"): 426 visibility = f"{item[:-2]}000" 427 data.pop(0) 428 units.visibility = "m" 429 # Vis statute miles but split Ex: 2 1/2SM 430 elif len(data) > 1 and data[1].endswith("SM") and "/" in data[1] and item.isdigit(): 431 vis1 = data.pop(0) # 2 432 vis2 = data.pop(0).replace("SM", "") # 1/2 433 visibility = str(int(vis1) * int(vis2[2]) + int(vis2[0])) + vis2[1:] # 5/2 434 units.visibility = "sm" 435 return data, make_number(visibility, m_minus=False)
Return the report list and removed visibility string.
438def sanitize_cloud(cloud: str) -> str: 439 """Fix rare cloud layer issues.""" 440 if len(cloud) < 4: 441 return cloud 442 if not cloud[3].isdigit() and cloud[3] not in ("/", "-"): 443 # Bad "O": FEWO03 -> FEW003 444 if cloud[3] == "O": 445 cloud = f"{cloud[:3]}0{cloud[4:]}" 446 # Move modifiers to end: BKNC015 -> BKN015C 447 elif cloud[3] != "U" and cloud[:4] not in {"BASE", "UNKN"}: 448 cloud = cloud[:3] + cloud[4:] + cloud[3] 449 return cloud
Fix rare cloud layer issues.
460def make_cloud(cloud: str) -> Cloud: 461 """Return a Cloud dataclass for a cloud string. 462 463 This function assumes the input is potentially valid. 464 """ 465 raw_cloud = cloud 466 cloud_type = "" 467 base: str | None = None 468 top: str | None = None 469 cloud = sanitize_cloud(cloud).replace("/", "") 470 # Separate top 471 for target in _TOP_OFFSETS: 472 topi = cloud.find(target) 473 if topi > -1: 474 top, cloud = cloud[topi + len(target) :], cloud[:topi] 475 break 476 # Separate type 477 ## BASE027 478 if cloud.startswith("BASES"): 479 cloud = cloud[5:] 480 elif cloud.startswith("BASE"): 481 cloud = cloud[4:] 482 ## VV003 483 elif cloud.startswith("VV"): 484 cloud_type, cloud = cloud[:2], cloud[2:] 485 ## FEW010 486 elif len(cloud) >= 3 and cloud[:3] in CLOUD_LIST: 487 cloud_type, cloud = cloud[:3], cloud[3:] 488 ## BKN-OVC065 489 if len(cloud) > 4 and cloud[0] == "-" and cloud[1:4] in CLOUD_LIST: 490 cloud_type += cloud[:4] 491 cloud = cloud[4:] 492 # Separate base 493 if len(cloud) >= 3 and cloud[:3].isdigit(): 494 base, cloud = cloud[:3], cloud[3:] 495 elif len(cloud) >= 4 and cloud[:4] == "UNKN": 496 cloud = cloud[4:] 497 # Remainder is considered modifiers 498 modifier = cloud or None 499 # Make Cloud 500 return Cloud(raw_cloud, cloud_type or None, _null_or_int(base), _null_or_int(top), modifier)
Return a Cloud dataclass for a cloud string.
This function assumes the input is potentially valid.
503def get_clouds(data: list[str]) -> tuple[list[str], list]: 504 """Return the report list and removed list of split cloud layers.""" 505 clouds = [] 506 for i, item in reversed(list(enumerate(data))): 507 if item[:3] in CLOUD_LIST or item[:2] == "VV": 508 cloud = data.pop(i) 509 clouds.append(make_cloud(cloud)) 510 # Attempt cloud sort. Fails if None values are present 511 try: 512 clouds.sort(key=lambda cloud: (cloud.base, cloud.type)) 513 except TypeError: 514 clouds.reverse() # Restores original report order 515 return data, clouds
Return the report list and removed list of split cloud layers.
518def get_flight_rules(visibility: Number | None, ceiling: Cloud | None) -> int: 519 """Return int based on current flight rules from parsed METAR data. 520 521 0=VFR, 1=MVFR, 2=IFR, 3=LIFR 522 523 Note: Common practice is to report no higher than IFR if visibility unavailable. 524 """ 525 # Parse visibility 526 vis: _Numeric 527 if visibility is None: 528 vis = 2 529 elif visibility.repr == "CAVOK" or visibility.repr.startswith("P6"): 530 vis = 10 531 elif visibility.repr.startswith("M"): 532 vis = 0 533 elif visibility.value is None: 534 vis = 2 535 # Convert meters to miles 536 elif len(visibility.repr) == 4: 537 vis = (visibility.value or 0) * 0.000621371 538 else: 539 vis = visibility.value or 0 540 # Parse ceiling 541 cld = (ceiling.base if ceiling else 99) or 99 542 # Determine flight rules 543 if (vis <= 5) or (cld <= 30): 544 if (vis < 3) or (cld < 10): 545 if (vis < 1) or (cld < 5): 546 return 3 # LIFR 547 return 2 # IFR 548 return 1 # MVFR 549 return 0 # VFR
Return int based on current flight rules from parsed METAR data.
0=VFR, 1=MVFR, 2=IFR, 3=LIFR
Note: Common practice is to report no higher than IFR if visibility unavailable.
552def get_ceiling(clouds: list[Cloud]) -> Cloud | None: 553 """Return ceiling layer from Cloud-List or None if none found. 554 555 Assumes that the clouds are already sorted lowest to highest. 556 557 Only 'Broken', 'Overcast', and 'Vertical Visibility' are considered ceilings. 558 559 Prevents errors due to lack of cloud information (eg. '' or 'FEW///') 560 """ 561 return next((c for c in clouds if c.base and c.type in {"OVC", "BKN", "VV"}), None)
Return ceiling layer from Cloud-List or None if none found.
Assumes that the clouds are already sorted lowest to highest.
Only 'Broken', 'Overcast', and 'Vertical Visibility' are considered ceilings.
Prevents errors due to lack of cloud information (eg. '' or 'FEW///')
564def is_altitude(value: str) -> bool: 565 """Return True if the value is a possible altitude.""" 566 if len(value) < 5: 567 return False 568 if value.startswith("SFC/"): 569 return True 570 if value.startswith("FL") and value[2:5].isdigit(): 571 return True 572 first, *_ = value.split("/") 573 return bool(first[-2:] == "FT" and first[-5:-2].isdigit())
Return True if the value is a possible altitude.
576def make_altitude( 577 value: str, 578 units: Units, 579 repr: str | None = None, # noqa: A002 580 *, 581 force_fl: bool = False, 582) -> tuple[Number | None, Units]: 583 """Convert altitude string into a number.""" 584 if not value: 585 return None, units 586 raw = repr or value 587 for end in ("FT", "M"): 588 if value.endswith(end): 589 force_fl = False 590 units.altitude = end.lower() 591 value = value.removesuffix(end) 592 # F430 593 if value[0] == "F" and value[1:].isdigit(): 594 value = f"FL{value[1:]}" 595 if force_fl and value[:2] != "FL": 596 value = f"FL{value}" 597 return make_number(value, repr=raw), units
Convert altitude string into a number.
600def parse_date( 601 date: str, 602 hour_threshold: int = 200, 603 *, 604 time_only: bool = False, 605 target: dt.date | None = None, 606) -> dt.datetime | None: 607 """Parse a report timestamp in ddhhZ or ddhhmmZ format. 608 609 If time_only, assumes hhmm format with current or previous day. 610 611 This function assumes the given timestamp is within the hour threshold from current date. 612 """ 613 # Format date string 614 date = date.strip("Z") 615 if not date.isdigit(): 616 return None 617 if time_only: 618 if len(date) != 4: 619 return None 620 index_hour = 0 621 else: 622 if len(date) == 4: 623 date += "00" 624 if len(date) != 6: 625 return None 626 index_hour = 2 627 # Create initial guess 628 if target: 629 target = dt.datetime(target.year, target.month, target.day, tzinfo=dt.timezone.utc) 630 else: 631 target = dt.datetime.now(tz=dt.timezone.utc) 632 day = target.day if time_only else int(date[:2]) 633 hour = int(date[index_hour : index_hour + 2]) 634 # Handle situation where next month has less days than current month 635 # Shifted value makes sure that a month shift doesn't happen twice 636 shifted = False 637 if day > monthrange(target.year, target.month)[1]: 638 target += relativedelta(months=-1) 639 shifted = True 640 try: 641 guess = target.replace( 642 day=day, 643 hour=hour % 24, 644 minute=int(date[index_hour + 2 : index_hour + 4]) % 60, 645 second=0, 646 microsecond=0, 647 ) 648 except ValueError: 649 return None 650 # Handle overflow hour 651 if hour > 23: 652 guess += dt.timedelta(days=1) 653 # Handle changing months if not already shifted 654 if not shifted: 655 hourdiff = (guess - target) / dt.timedelta(minutes=1) / 60 656 if hourdiff > hour_threshold: 657 guess += relativedelta(months=-1) 658 elif hourdiff < -hour_threshold: 659 guess += relativedelta(months=+1) 660 return guess
Parse a report timestamp in ddhhZ or ddhhmmZ format.
If time_only, assumes hhmm format with current or previous day.
This function assumes the given timestamp is within the hour threshold from current date.
663def make_timestamp( 664 timestamp: str | None, 665 *, 666 time_only: bool = False, 667 target_date: dt.date | None = None, 668) -> Timestamp | None: 669 """Return a Timestamp dataclass for a report timestamp in ddhhZ or ddhhmmZ format.""" 670 if not timestamp: 671 return None 672 date_obj = parse_date(timestamp, time_only=time_only, target=target_date) 673 return Timestamp(timestamp, date_obj)
Return a Timestamp dataclass for a report timestamp in ddhhZ or ddhhmmZ format.
676def is_runway_visibility(item: str) -> bool: 677 """Return True if the item is a runway visibility range string.""" 678 return ( 679 len(item) > 4 680 and item[0] == "R" 681 and (item[3] == "/" or item[4] == "/") 682 and item[1:3].isdigit() 683 and "CLRD" not in item # R28/CLRD70 Runway State 684 )
Return True if the item is a runway visibility range string.