avwx.parsing.core
Contains the core parsing and indent functions of avwx.
1"""Contains the core parsing and indent functions of avwx.""" 2 3# stdlib 4from __future__ import annotations 5 6import datetime as dt 7import math 8import re 9from calendar import monthrange 10from contextlib import suppress 11from copy import copy 12from typing import TYPE_CHECKING, Any 13 14# library 15from dateutil.relativedelta import relativedelta 16 17# module 18from avwx.static.core import ( 19 CARDINALS, 20 CLOUD_LIST, 21 FRACTIONS, 22 NUMBER_REPL, 23 SPECIAL_NUMBERS, 24 WIND_UNITS, 25) 26from avwx.structs import Cloud, Fraction, Number, Timestamp, Units 27 28if TYPE_CHECKING: 29 from collections.abc import Iterable 30 31 32def dedupe(items: Iterable[Any], *, only_neighbors: bool = False) -> list[Any]: 33 """Deduplicate a list while keeping order. 34 35 If only_neighbors is True, dedupe will only check neighboring values. 36 """ 37 ret: list[Any] = [] 38 for item in items: 39 if (only_neighbors and ret and ret[-1] != item) or item not in ret: 40 ret.append(item) 41 return ret 42 43 44def is_unknown(value: str) -> bool: 45 """Return True if val represents and unknown value.""" 46 if not isinstance(value, str): 47 raise TypeError 48 if not value or value.upper() in {"UNKN", "UNK", "UKN"}: 49 return True 50 for char in value: 51 if char not in ("/", "X", "."): 52 break 53 else: 54 return True 55 return False 56 57 58def get_digit_list(data: list[str], from_index: int) -> tuple[list[str], list[str]]: 59 """Return a list of items removed from a given list of strings 60 that are all digits from 'from_index' until hitting a non-digit item. 61 """ 62 ret = [] 63 data.pop(from_index) 64 while len(data) > from_index and data[from_index].isdigit(): 65 ret.append(data.pop(from_index)) 66 return data, ret 67 68 69def unpack_fraction(num: str) -> str: 70 """Return unpacked fraction string 5/2 -> 2 1/2.""" 71 numbers = [int(n) for n in num.split("/") if n] 72 if len(numbers) != 2 or numbers[0] <= numbers[1]: 73 return num 74 numerator, denominator = numbers 75 over = numerator // denominator 76 rem = numerator % denominator 77 return f"{over} {rem}/{denominator}" 78 79 80def remove_leading_zeros(num: str) -> str: 81 """Strip zeros while handling -, M, and empty strings.""" 82 if not num: 83 return num 84 if num.startswith("M"): 85 ret = "M" + num[1:].lstrip("0") 86 elif num.startswith("-"): 87 ret = "-" + num[1:].lstrip("0") 88 else: 89 ret = num.lstrip("0") 90 return "0" if ret in ("", "M", "-") else ret 91 92 93SPOKEN_POSTFIX = ( 94 (" zero zero zero", " thousand"), 95 (" zero zero", " hundred"), 96) 97 98 99def spoken_number(num: str, *, literal: bool = False) -> str: 100 """Return the spoken version of a number. 101 102 If literal, no conversion to hundreds/thousands 103 104 Ex: 1.2 -> one point two 105 1 1/2 -> one and one half 106 25000 -> two five thousand 107 """ 108 ret = [] 109 for part in num.split(): 110 if part in FRACTIONS: 111 ret.append(FRACTIONS[part]) 112 else: 113 val = " ".join(NUMBER_REPL[char] for char in part if char in NUMBER_REPL) 114 if not literal: 115 for target, replacement in SPOKEN_POSTFIX: 116 if val.endswith(target): 117 val = val[: -len(target)] + replacement 118 ret.append(val) 119 return " and ".join(ret) 120 121 122def make_fraction( 123 num: str, 124 repr: str | None = None, # noqa: A002 125 *, 126 literal: bool = False, 127 speak_prefix: str = "", 128) -> Fraction: 129 """Return a fraction dataclass for numbers with / in them.""" 130 num_str, den_str = num.split("/") 131 # 2-1/2 but not -2 1/2 132 if "-" in num_str and not num_str.startswith("-"): 133 num_str = num_str.replace("-", " ") 134 denominator = int(den_str) 135 # Multiply multi-digit numerator 136 if len(num_str) > 1: 137 numerator = int(num_str[:-1]) * denominator + int(num_str[-1]) 138 num = f"{numerator}/{denominator}" 139 else: 140 numerator = int(num_str) 141 value = numerator / denominator 142 unpacked = unpack_fraction(num) 143 spoken = speak_prefix + spoken_number(unpacked, literal=literal) 144 return Fraction(repr or num, value, spoken, numerator, denominator, unpacked) 145 146 147def make_number( 148 num: str | None, 149 repr: str | None = None, # noqa: A002 150 speak: str | None = None, 151 *, 152 literal: bool = False, 153 special: dict | None = None, 154 m_minus: bool = True, 155) -> Number | Fraction | None: 156 """Return a Number or Fraction dataclass for a number string. 157 158 If literal, spoken string will not convert to hundreds/thousands. 159 160 NOTE: Numerators are assumed to have a single digit. Additional are whole numbers. 161 """ 162 if not num or is_unknown(num): 163 return None 164 # Check special 165 with suppress(KeyError): 166 item = (special or {}).get(num) or SPECIAL_NUMBERS[num] 167 if isinstance(item, tuple): 168 value, spoken = item 169 else: 170 value = item 171 spoken = spoken_number(str(value), literal=literal) 172 return Number(repr or num, value, spoken) 173 # Check cardinal direction 174 if num in CARDINALS: 175 if not repr: 176 repr = num # noqa: A001 177 num = str(CARDINALS[num]) 178 val_str = num 179 # Remove unit suffixes 180 if val_str.endswith("SM"): 181 repr = val_str[:] # noqa: A001 182 val_str = val_str[:-2] 183 # Remove spurious characters from the end 184 num = num.rstrip("M.") 185 num = num.replace("O", "0") 186 num = num.replace("+", "") 187 num = num.replace(",", "") 188 # Handle Minus values with errors like 0M04 189 if m_minus and "M" in num: 190 val_str = num.replace("MM", "-").replace("M", "-") 191 while val_str[0] != "-": 192 val_str = val_str[1:] 193 # Check value prefixes 194 speak_prefix = "" 195 if val_str.startswith("ABV "): 196 speak_prefix += "above " 197 val_str = val_str[4:] 198 if val_str.startswith("BLW "): 199 speak_prefix += "below " 200 val_str = val_str[4:] 201 if val_str.startswith("FL"): 202 speak_prefix += "flight level " 203 val_str, literal = val_str[2:], True 204 if val_str.startswith("M"): 205 speak_prefix += "less than " 206 repr = repr or val_str # noqa: A001 207 val_str = val_str[1:] 208 if val_str.startswith("P"): 209 speak_prefix += "greater than " 210 repr = repr or val_str # noqa: A001 211 val_str = val_str[1:] 212 # Create Number 213 if not val_str: 214 return None 215 ret: Number | Fraction | None = None 216 # Create Fraction 217 if "/" in val_str: 218 ret = make_fraction(val_str, repr, literal=literal, speak_prefix=speak_prefix) 219 else: 220 val_str = val_str.replace(",", "") 221 # Overwrite float 0 due to "0.0" literal 222 value = float(val_str) or 0 if "." in num else int(val_str) 223 spoken = speak_prefix + spoken_number(speak or str(value), literal=literal) 224 ret = Number(repr or num, value, spoken) 225 # Null the value if "greater than"/"less than" 226 if ret and not m_minus and repr and repr.startswith(("M", "P")): 227 ret.value = None 228 return ret 229 230 231def find_first_in_list(txt: str, str_list: list[str]) -> int: 232 """Return the index of the earliest occurrence of an item from a list in a string. 233 234 Ex: find_first_in_list('foobar', ['bar', 'fin']) -> 3 235 """ 236 start = len(txt) + 1 237 for item in str_list: 238 if start > txt.find(item) > -1: 239 start = txt.find(item) 240 return start if len(txt) + 1 > start > -1 else -1 241 242 243def is_timestamp(item: str) -> bool: 244 """Return True if the item matches the timestamp format.""" 245 return len(item) == 7 and item[-1] == "Z" and item[:-1].isdigit() 246 247 248def is_timerange(item: str) -> bool: 249 """Return True if the item is a TAF to-from time range.""" 250 return len(item) == 9 and item[4] == "/" and item[:4].isdigit() and item[5:].isdigit() 251 252 253def is_possible_temp(temp: str) -> bool: 254 """Return True if all characters are digits or 'M' for minus.""" 255 return all((char.isdigit() or char == "M") for char in temp) 256 257 258_Numeric = int | float 259 260 261def relative_humidity(temperature: _Numeric, dewpoint: _Numeric, unit: str = "C") -> float: 262 """Calculate the relative humidity as a 0 to 1 percentage.""" 263 264 def saturation(value: _Numeric) -> float: 265 """Return the saturation vapor pressure without the C constant for humidity calc.""" 266 return math.exp((17.67 * value) / (243.5 + value)) 267 268 if unit == "F": 269 dewpoint = (dewpoint - 32) * 5 / 9 270 temperature = (temperature - 32) * 5 / 9 271 return saturation(dewpoint) / saturation(temperature) 272 273 274# https://aviation.stackexchange.com/questions/47971/how-do-i-calculate-density-altitude-by-hand 275 276 277def pressure_altitude(pressure: float, altitude: _Numeric, unit: str = "inHg") -> int: 278 """Calculate the pressure altitude in feet. Converts pressure units.""" 279 if unit == "hPa": 280 pressure *= 0.02953 281 return round((29.92 - pressure) * 1000 + altitude) 282 283 284def density_altitude(pressure: float, temperature: _Numeric, altitude: _Numeric, units: Units) -> int: 285 """Calculate the density altitude in feet. Converts pressure and temperature units.""" 286 if units.temperature == "F": 287 temperature = (temperature - 32) * 5 / 9 288 if units.altimeter == "hPa": 289 pressure *= 0.02953 290 pressure_alt = pressure_altitude(pressure, altitude) 291 standard = 15 - (2 * altitude / 1000) 292 return round(((temperature - standard) * 120) + pressure_alt) 293 294 295def get_station_and_time( 296 data: list[str], 297) -> tuple[list[str], str | None, str | None]: 298 """Return the report list and removed station ident and time strings.""" 299 if not data: 300 return data, None, None 301 station = data.pop(0) 302 if not data: 303 return data, station, None 304 q_time, r_time = data[0], None 305 if data and q_time.endswith("Z") and q_time[:-1].isdigit(): 306 r_time = data.pop(0) 307 elif data and len(q_time) == 6 and q_time.isdigit(): 308 r_time = f"{data.pop(0)}Z" 309 return data, station, r_time 310 311 312def is_wind(text: str) -> bool: 313 """Return True if the text is likely a normal wind element.""" 314 # Ignore wind shear 315 if text.startswith("WS"): 316 return False 317 # 09010KT, 09010G15KT 318 if len(text) > 4: 319 for ending in WIND_UNITS: 320 unit_index = text.find(ending) 321 if text.endswith(ending) and text[unit_index - 2 : unit_index].isdigit(): 322 return True 323 # 09010 09010G15 VRB10 324 if len(text) != 5 and (len(text) < 8 or "G" not in text or "/" in text): 325 return False 326 return text[:5].isdigit() or (text.startswith("VRB") and text[3:5].isdigit()) 327 328 329VARIABLE_DIRECTION_PATTERN = re.compile(r"\d{3}V\d{3}") 330 331 332def is_variable_wind_direction(text: str) -> bool: 333 """Return True if element looks like 350V040.""" 334 if len(text) < 7: 335 return False 336 return VARIABLE_DIRECTION_PATTERN.match(text[:7]) is not None 337 338 339def separate_wind(text: str) -> tuple[str, str, str]: 340 """Extract the direction, speed, and gust from a wind element.""" 341 direction, speed, gust = "", "", "" 342 # Remove gust 343 if "G" in text: 344 g_index = text.find("G") 345 start, end = g_index + 1, g_index + 3 346 # 16006GP99KT ie gust greater than 347 if "GP" in text: 348 end += 1 349 gust = text[start:end] 350 text = text[:g_index] + text[end:] 351 if text: 352 # 10G18KT 353 if len(text) == 2: 354 speed = text 355 else: 356 direction = text[:3] 357 speed = text[3:] 358 return direction, speed, gust 359 360 361def get_wind( 362 data: list[str], units: Units 363) -> tuple[ 364 list[str], 365 Number | None, 366 Number | None, 367 Number | None, 368 list[Number], 369]: 370 """Return the report list, direction string, speed string, gust string, and variable direction list.""" 371 direction, speed, gust = "", "", "" 372 variable: list[Number] = [] 373 # Remove unit and split elements 374 if data: 375 item = copy(data[0]) 376 if is_wind(item): 377 for key, unit in WIND_UNITS.items(): 378 if item.endswith(key): 379 units.wind_speed = unit 380 item = item.replace(key, "") 381 break 382 direction, speed, gust = separate_wind(item) 383 data.pop(0) 384 # Separated Gust 385 if data and 1 < len(data[0]) < 4 and data[0][0] == "G" and data[0][1:].isdigit(): 386 gust = data.pop(0)[1:] 387 # Variable Wind Direction 388 if data and is_variable_wind_direction(data[0]): 389 for item in data.pop(0).split("V"): 390 value = make_number(item, speak=item, literal=True) 391 if value is not None: 392 variable.append(value) 393 # Convert to Number 394 direction_value = make_number(direction, speak=direction, literal=True) 395 speed_value = make_number(speed.strip("BV"), m_minus=False) 396 gust_value = make_number(gust, m_minus=False) 397 return data, direction_value, speed_value, gust_value, variable 398 399 400def get_visibility(data: list[str], units: Units) -> tuple[list[str], Number | None]: 401 """Return the report list and removed visibility string.""" 402 visibility = "" 403 if data: 404 item = copy(data[0]) 405 # Vis reported in statue miles 406 if item.endswith("SM"): # 10SM 407 if item[:-2].isdigit(): 408 visibility = str(int(item[:-2])) 409 elif "/" in item: 410 visibility = item[: item.find("SM")] # 1/2SM 411 else: 412 visibility = item[:-2] 413 data.pop(0) 414 units.visibility = "sm" 415 # Vis reported in meters 416 elif len(item) == 4 and item.isdigit(): 417 visibility = data.pop(0) 418 units.visibility = "m" 419 elif 7 >= len(item) >= 5 and item[:4].isdigit() and (item[4] in ["M", "N", "S", "E", "W"] or item[4:] == "NDV"): 420 visibility = data.pop(0)[:4] 421 units.visibility = "m" 422 elif len(item) == 5 and item[1:].isdigit() and item[0] in ["M", "P", "B"]: 423 visibility = data.pop(0)[1:] 424 units.visibility = "m" 425 elif item.endswith("KM"): 426 visibility = f"{item[:-2]}000" 427 data.pop(0) 428 units.visibility = "m" 429 # Vis statute miles but split Ex: 2 1/2SM 430 elif len(data) > 1 and data[1].endswith("SM") and "/" in data[1] and item.isdigit(): 431 vis1 = data.pop(0) # 2 432 vis2 = data.pop(0).replace("SM", "") # 1/2 433 visibility = str(int(vis1) * int(vis2[2]) + int(vis2[0])) + vis2[1:] # 5/2 434 units.visibility = "sm" 435 return data, make_number(visibility, m_minus=False) 436 437 438def sanitize_cloud(cloud: str) -> str: 439 """Fix rare cloud layer issues.""" 440 if len(cloud) < 4: 441 return cloud 442 if not cloud[3].isdigit() and cloud[3] not in ("/", "-"): 443 # Bad "O": FEWO03 -> FEW003 444 if cloud[3] == "O": 445 cloud = f"{cloud[:3]}0{cloud[4:]}" 446 # Move modifiers to end: BKNC015 -> BKN015C 447 elif cloud[3] != "U" and cloud[:4] not in {"BASE", "UNKN"}: 448 cloud = cloud[:3] + cloud[4:] + cloud[3] 449 return cloud 450 451 452def _null_or_int(val: str | None) -> int | None: 453 """Nullify unknown elements and convert ints.""" 454 return None if not isinstance(val, str) or is_unknown(val) else int(val) 455 456 457_TOP_OFFSETS = ("-TOPS", "-TOP") 458 459 460def make_cloud(cloud: str) -> Cloud: 461 """Return a Cloud dataclass for a cloud string. 462 463 This function assumes the input is potentially valid. 464 """ 465 raw_cloud = cloud 466 cloud_type = "" 467 base: str | None = None 468 top: str | None = None 469 cloud = sanitize_cloud(cloud).replace("/", "") 470 # Separate top 471 for target in _TOP_OFFSETS: 472 topi = cloud.find(target) 473 if topi > -1: 474 top, cloud = cloud[topi + len(target) :], cloud[:topi] 475 break 476 # Separate type 477 ## BASE027 478 if cloud.startswith("BASES"): 479 cloud = cloud[5:] 480 elif cloud.startswith("BASE"): 481 cloud = cloud[4:] 482 ## VV003 483 elif cloud.startswith("VV"): 484 cloud_type, cloud = cloud[:2], cloud[2:] 485 ## FEW010 486 elif len(cloud) >= 3 and cloud[:3] in CLOUD_LIST: 487 cloud_type, cloud = cloud[:3], cloud[3:] 488 ## BKN-OVC065 489 if len(cloud) > 4 and cloud[0] == "-" and cloud[1:4] in CLOUD_LIST: 490 cloud_type += cloud[:4] 491 cloud = cloud[4:] 492 # Separate base 493 if len(cloud) >= 3 and cloud[:3].isdigit(): 494 base, cloud = cloud[:3], cloud[3:] 495 elif len(cloud) >= 4 and cloud[:4] == "UNKN": 496 cloud = cloud[4:] 497 # Remainder is considered modifiers 498 modifier = cloud or None 499 # Make Cloud 500 return Cloud(raw_cloud, cloud_type or None, _null_or_int(base), _null_or_int(top), modifier) 501 502 503def get_clouds(data: list[str]) -> tuple[list[str], list]: 504 """Return the report list and removed list of split cloud layers.""" 505 clouds = [] 506 for i, item in reversed(list(enumerate(data))): 507 if item[:3] in CLOUD_LIST or item[:2] == "VV": 508 cloud = data.pop(i) 509 clouds.append(make_cloud(cloud)) 510 # Attempt cloud sort. Fails if None values are present 511 try: 512 clouds.sort(key=lambda cloud: (cloud.base, cloud.type)) 513 except TypeError: 514 clouds.reverse() # Restores original report order 515 return data, clouds 516 517 518def get_flight_rules(visibility: Number | None, ceiling: Cloud | None) -> int: 519 """Return int based on current flight rules from parsed METAR data. 520 521 0=VFR, 1=MVFR, 2=IFR, 3=LIFR 522 523 Note: Common practice is to report no higher than IFR if visibility unavailable. 524 """ 525 # Parse visibility 526 vis: _Numeric 527 if visibility is None: 528 vis = 2 529 elif visibility.repr == "CAVOK" or visibility.repr.startswith("P6"): 530 vis = 10 531 elif visibility.repr.startswith("M"): 532 vis = 0 533 elif visibility.value is None: 534 vis = 2 535 # Convert meters to miles 536 elif len(visibility.repr) == 4: 537 vis = (visibility.value or 0) * 0.000621371 538 else: 539 vis = visibility.value or 0 540 # Parse ceiling 541 cld = (ceiling.base if ceiling else 99) or 99 542 # Determine flight rules 543 if (vis <= 5) or (cld <= 30): 544 if (vis < 3) or (cld < 10): 545 if (vis < 1) or (cld < 5): 546 return 3 # LIFR 547 return 2 # IFR 548 return 1 # MVFR 549 return 0 # VFR 550 551 552def get_ceiling(clouds: list[Cloud]) -> Cloud | None: 553 """Return ceiling layer from Cloud-List or None if none found. 554 555 Assumes that the clouds are already sorted lowest to highest. 556 557 Only 'Broken', 'Overcast', and 'Vertical Visibility' are considered ceilings. 558 559 Prevents errors due to lack of cloud information (eg. '' or 'FEW///') 560 """ 561 return next((c for c in clouds if c.base and c.type in {"OVC", "BKN", "VV"}), None) 562 563 564def is_altitude(value: str) -> bool: 565 """Return True if the value is a possible altitude.""" 566 if len(value) < 5: 567 return False 568 if value.startswith("SFC/"): 569 return True 570 if value.startswith("FL") and value[2:5].isdigit(): 571 return True 572 first, *_ = value.split("/") 573 return bool(first[-2:] == "FT" and first[-5:-2].isdigit()) 574 575 576def make_altitude( 577 value: str, 578 units: Units, 579 repr: str | None = None, # noqa: A002 580 *, 581 force_fl: bool = False, 582) -> tuple[Number | None, Units]: 583 """Convert altitude string into a number.""" 584 if not value: 585 return None, units 586 raw = repr or value 587 for end in ("FT", "M"): 588 if value.endswith(end): 589 force_fl = False 590 units.altitude = end.lower() 591 value = value.removesuffix(end) 592 # F430 593 if value[0] == "F" and value[1:].isdigit(): 594 value = f"FL{value[1:]}" 595 if force_fl and value[:2] != "FL": 596 value = f"FL{value}" 597 return make_number(value, repr=raw), units 598 599 600def parse_date( 601 date: str, 602 hour_threshold: int = 200, 603 *, 604 time_only: bool = False, 605 target: dt.date | None = None, 606) -> dt.datetime | None: 607 """Parse a report timestamp in ddhhZ or ddhhmmZ format. 608 609 If time_only, assumes hhmm format with current or previous day. 610 611 This function assumes the given timestamp is within the hour threshold from current date. 612 """ 613 # Format date string 614 date = date.strip("Z") 615 if not date.isdigit(): 616 return None 617 if time_only: 618 if len(date) != 4: 619 return None 620 index_hour = 0 621 else: 622 if len(date) == 4: 623 date += "00" 624 if len(date) != 6: 625 return None 626 index_hour = 2 627 # Create initial guess 628 if target: 629 target = dt.datetime(target.year, target.month, target.day, tzinfo=dt.timezone.utc) 630 else: 631 target = dt.datetime.now(tz=dt.timezone.utc) 632 day = target.day if time_only else int(date[:2]) 633 hour = int(date[index_hour : index_hour + 2]) 634 # Handle situation where next month has less days than current month 635 # Shifted value makes sure that a month shift doesn't happen twice 636 shifted = False 637 if day > monthrange(target.year, target.month)[1]: 638 target += relativedelta(months=-1) 639 shifted = True 640 try: 641 guess = target.replace( 642 day=day, 643 hour=hour % 24, 644 minute=int(date[index_hour + 2 : index_hour + 4]) % 60, 645 second=0, 646 microsecond=0, 647 ) 648 except ValueError: 649 return None 650 # Handle overflow hour 651 if hour > 23: 652 guess += dt.timedelta(days=1) 653 # Handle changing months if not already shifted 654 if not shifted: 655 hourdiff = (guess - target) / dt.timedelta(minutes=1) / 60 656 if hourdiff > hour_threshold: 657 guess += relativedelta(months=-1) 658 elif hourdiff < -hour_threshold: 659 guess += relativedelta(months=+1) 660 return guess 661 662 663def make_timestamp( 664 timestamp: str | None, 665 *, 666 time_only: bool = False, 667 target_date: dt.date | None = None, 668) -> Timestamp | None: 669 """Return a Timestamp dataclass for a report timestamp in ddhhZ or ddhhmmZ format.""" 670 if not timestamp: 671 return None 672 date_obj = parse_date(timestamp, time_only=time_only, target=target_date) 673 return Timestamp(timestamp, date_obj) 674 675 676def is_runway_visibility(item: str) -> bool: 677 """Return True if the item is a runway visibility range string.""" 678 return ( 679 len(item) > 4 680 and item[0] == "R" 681 and (item[3] == "/" or item[4] == "/") 682 and item[1:3].isdigit() 683 and "CLRD" not in item # R28/CLRD70 Runway State 684 )
33def dedupe(items: Iterable[Any], *, only_neighbors: bool = False) -> list[Any]: 34 """Deduplicate a list while keeping order. 35 36 If only_neighbors is True, dedupe will only check neighboring values. 37 """ 38 ret: list[Any] = [] 39 for item in items: 40 if (only_neighbors and ret and ret[-1] != item) or item not in ret: 41 ret.append(item) 42 return ret
Deduplicate a list while keeping order.
If only_neighbors is True, dedupe will only check neighboring values.
45def is_unknown(value: str) -> bool: 46 """Return True if val represents and unknown value.""" 47 if not isinstance(value, str): 48 raise TypeError 49 if not value or value.upper() in {"UNKN", "UNK", "UKN"}: 50 return True 51 for char in value: 52 if char not in ("/", "X", "."): 53 break 54 else: 55 return True 56 return False
Return True if val represents and unknown value.
59def get_digit_list(data: list[str], from_index: int) -> tuple[list[str], list[str]]: 60 """Return a list of items removed from a given list of strings 61 that are all digits from 'from_index' until hitting a non-digit item. 62 """ 63 ret = [] 64 data.pop(from_index) 65 while len(data) > from_index and data[from_index].isdigit(): 66 ret.append(data.pop(from_index)) 67 return data, ret
Return a list of items removed from a given list of strings that are all digits from 'from_index' until hitting a non-digit item.
70def unpack_fraction(num: str) -> str: 71 """Return unpacked fraction string 5/2 -> 2 1/2.""" 72 numbers = [int(n) for n in num.split("/") if n] 73 if len(numbers) != 2 or numbers[0] <= numbers[1]: 74 return num 75 numerator, denominator = numbers 76 over = numerator // denominator 77 rem = numerator % denominator 78 return f"{over} {rem}/{denominator}"
Return unpacked fraction string 5/2 -> 2 1/2.
81def remove_leading_zeros(num: str) -> str: 82 """Strip zeros while handling -, M, and empty strings.""" 83 if not num: 84 return num 85 if num.startswith("M"): 86 ret = "M" + num[1:].lstrip("0") 87 elif num.startswith("-"): 88 ret = "-" + num[1:].lstrip("0") 89 else: 90 ret = num.lstrip("0") 91 return "0" if ret in ("", "M", "-") else ret
Strip zeros while handling -, M, and empty strings.
100def spoken_number(num: str, *, literal: bool = False) -> str: 101 """Return the spoken version of a number. 102 103 If literal, no conversion to hundreds/thousands 104 105 Ex: 1.2 -> one point two 106 1 1/2 -> one and one half 107 25000 -> two five thousand 108 """ 109 ret = [] 110 for part in num.split(): 111 if part in FRACTIONS: 112 ret.append(FRACTIONS[part]) 113 else: 114 val = " ".join(NUMBER_REPL[char] for char in part if char in NUMBER_REPL) 115 if not literal: 116 for target, replacement in SPOKEN_POSTFIX: 117 if val.endswith(target): 118 val = val[: -len(target)] + replacement 119 ret.append(val) 120 return " and ".join(ret)
Return the spoken version of a number.
If literal, no conversion to hundreds/thousands
Ex: 1.2 -> one point two 1 1/2 -> one and one half 25000 -> two five thousand
123def make_fraction( 124 num: str, 125 repr: str | None = None, # noqa: A002 126 *, 127 literal: bool = False, 128 speak_prefix: str = "", 129) -> Fraction: 130 """Return a fraction dataclass for numbers with / in them.""" 131 num_str, den_str = num.split("/") 132 # 2-1/2 but not -2 1/2 133 if "-" in num_str and not num_str.startswith("-"): 134 num_str = num_str.replace("-", " ") 135 denominator = int(den_str) 136 # Multiply multi-digit numerator 137 if len(num_str) > 1: 138 numerator = int(num_str[:-1]) * denominator + int(num_str[-1]) 139 num = f"{numerator}/{denominator}" 140 else: 141 numerator = int(num_str) 142 value = numerator / denominator 143 unpacked = unpack_fraction(num) 144 spoken = speak_prefix + spoken_number(unpacked, literal=literal) 145 return Fraction(repr or num, value, spoken, numerator, denominator, unpacked)
Return a fraction dataclass for numbers with / in them.
148def make_number( 149 num: str | None, 150 repr: str | None = None, # noqa: A002 151 speak: str | None = None, 152 *, 153 literal: bool = False, 154 special: dict | None = None, 155 m_minus: bool = True, 156) -> Number | Fraction | None: 157 """Return a Number or Fraction dataclass for a number string. 158 159 If literal, spoken string will not convert to hundreds/thousands. 160 161 NOTE: Numerators are assumed to have a single digit. Additional are whole numbers. 162 """ 163 if not num or is_unknown(num): 164 return None 165 # Check special 166 with suppress(KeyError): 167 item = (special or {}).get(num) or SPECIAL_NUMBERS[num] 168 if isinstance(item, tuple): 169 value, spoken = item 170 else: 171 value = item 172 spoken = spoken_number(str(value), literal=literal) 173 return Number(repr or num, value, spoken) 174 # Check cardinal direction 175 if num in CARDINALS: 176 if not repr: 177 repr = num # noqa: A001 178 num = str(CARDINALS[num]) 179 val_str = num 180 # Remove unit suffixes 181 if val_str.endswith("SM"): 182 repr = val_str[:] # noqa: A001 183 val_str = val_str[:-2] 184 # Remove spurious characters from the end 185 num = num.rstrip("M.") 186 num = num.replace("O", "0") 187 num = num.replace("+", "") 188 num = num.replace(",", "") 189 # Handle Minus values with errors like 0M04 190 if m_minus and "M" in num: 191 val_str = num.replace("MM", "-").replace("M", "-") 192 while val_str[0] != "-": 193 val_str = val_str[1:] 194 # Check value prefixes 195 speak_prefix = "" 196 if val_str.startswith("ABV "): 197 speak_prefix += "above " 198 val_str = val_str[4:] 199 if val_str.startswith("BLW "): 200 speak_prefix += "below " 201 val_str = val_str[4:] 202 if val_str.startswith("FL"): 203 speak_prefix += "flight level " 204 val_str, literal = val_str[2:], True 205 if val_str.startswith("M"): 206 speak_prefix += "less than " 207 repr = repr or val_str # noqa: A001 208 val_str = val_str[1:] 209 if val_str.startswith("P"): 210 speak_prefix += "greater than " 211 repr = repr or val_str # noqa: A001 212 val_str = val_str[1:] 213 # Create Number 214 if not val_str: 215 return None 216 ret: Number | Fraction | None = None 217 # Create Fraction 218 if "/" in val_str: 219 ret = make_fraction(val_str, repr, literal=literal, speak_prefix=speak_prefix) 220 else: 221 val_str = val_str.replace(",", "") 222 # Overwrite float 0 due to "0.0" literal 223 value = float(val_str) or 0 if "." in num else int(val_str) 224 spoken = speak_prefix + spoken_number(speak or str(value), literal=literal) 225 ret = Number(repr or num, value, spoken) 226 # Null the value if "greater than"/"less than" 227 if ret and not m_minus and repr and repr.startswith(("M", "P")): 228 ret.value = None 229 return ret
Return a Number or Fraction dataclass for a number string.
If literal, spoken string will not convert to hundreds/thousands.
NOTE: Numerators are assumed to have a single digit. Additional are whole numbers.
232def find_first_in_list(txt: str, str_list: list[str]) -> int: 233 """Return the index of the earliest occurrence of an item from a list in a string. 234 235 Ex: find_first_in_list('foobar', ['bar', 'fin']) -> 3 236 """ 237 start = len(txt) + 1 238 for item in str_list: 239 if start > txt.find(item) > -1: 240 start = txt.find(item) 241 return start if len(txt) + 1 > start > -1 else -1
Return the index of the earliest occurrence of an item from a list in a string.
Ex: find_first_in_list('foobar', ['bar', 'fin']) -> 3
244def is_timestamp(item: str) -> bool: 245 """Return True if the item matches the timestamp format.""" 246 return len(item) == 7 and item[-1] == "Z" and item[:-1].isdigit()
Return True if the item matches the timestamp format.
249def is_timerange(item: str) -> bool: 250 """Return True if the item is a TAF to-from time range.""" 251 return len(item) == 9 and item[4] == "/" and item[:4].isdigit() and item[5:].isdigit()
Return True if the item is a TAF to-from time range.
254def is_possible_temp(temp: str) -> bool: 255 """Return True if all characters are digits or 'M' for minus.""" 256 return all((char.isdigit() or char == "M") for char in temp)
Return True if all characters are digits or 'M' for minus.
262def relative_humidity(temperature: _Numeric, dewpoint: _Numeric, unit: str = "C") -> float: 263 """Calculate the relative humidity as a 0 to 1 percentage.""" 264 265 def saturation(value: _Numeric) -> float: 266 """Return the saturation vapor pressure without the C constant for humidity calc.""" 267 return math.exp((17.67 * value) / (243.5 + value)) 268 269 if unit == "F": 270 dewpoint = (dewpoint - 32) * 5 / 9 271 temperature = (temperature - 32) * 5 / 9 272 return saturation(dewpoint) / saturation(temperature)
Calculate the relative humidity as a 0 to 1 percentage.
278def pressure_altitude(pressure: float, altitude: _Numeric, unit: str = "inHg") -> int: 279 """Calculate the pressure altitude in feet. Converts pressure units.""" 280 if unit == "hPa": 281 pressure *= 0.02953 282 return round((29.92 - pressure) * 1000 + altitude)
Calculate the pressure altitude in feet. Converts pressure units.
285def density_altitude(pressure: float, temperature: _Numeric, altitude: _Numeric, units: Units) -> int: 286 """Calculate the density altitude in feet. Converts pressure and temperature units.""" 287 if units.temperature == "F": 288 temperature = (temperature - 32) * 5 / 9 289 if units.altimeter == "hPa": 290 pressure *= 0.02953 291 pressure_alt = pressure_altitude(pressure, altitude) 292 standard = 15 - (2 * altitude / 1000) 293 return round(((temperature - standard) * 120) + pressure_alt)
Calculate the density altitude in feet. Converts pressure and temperature units.
296def get_station_and_time( 297 data: list[str], 298) -> tuple[list[str], str | None, str | None]: 299 """Return the report list and removed station ident and time strings.""" 300 if not data: 301 return data, None, None 302 station = data.pop(0) 303 if not data: 304 return data, station, None 305 q_time, r_time = data[0], None 306 if data and q_time.endswith("Z") and q_time[:-1].isdigit(): 307 r_time = data.pop(0) 308 elif data and len(q_time) == 6 and q_time.isdigit(): 309 r_time = f"{data.pop(0)}Z" 310 return data, station, r_time
Return the report list and removed station ident and time strings.
313def is_wind(text: str) -> bool: 314 """Return True if the text is likely a normal wind element.""" 315 # Ignore wind shear 316 if text.startswith("WS"): 317 return False 318 # 09010KT, 09010G15KT 319 if len(text) > 4: 320 for ending in WIND_UNITS: 321 unit_index = text.find(ending) 322 if text.endswith(ending) and text[unit_index - 2 : unit_index].isdigit(): 323 return True 324 # 09010 09010G15 VRB10 325 if len(text) != 5 and (len(text) < 8 or "G" not in text or "/" in text): 326 return False 327 return text[:5].isdigit() or (text.startswith("VRB") and text[3:5].isdigit())
Return True if the text is likely a normal wind element.
333def is_variable_wind_direction(text: str) -> bool: 334 """Return True if element looks like 350V040.""" 335 if len(text) < 7: 336 return False 337 return VARIABLE_DIRECTION_PATTERN.match(text[:7]) is not None
Return True if element looks like 350V040.
340def separate_wind(text: str) -> tuple[str, str, str]: 341 """Extract the direction, speed, and gust from a wind element.""" 342 direction, speed, gust = "", "", "" 343 # Remove gust 344 if "G" in text: 345 g_index = text.find("G") 346 start, end = g_index + 1, g_index + 3 347 # 16006GP99KT ie gust greater than 348 if "GP" in text: 349 end += 1 350 gust = text[start:end] 351 text = text[:g_index] + text[end:] 352 if text: 353 # 10G18KT 354 if len(text) == 2: 355 speed = text 356 else: 357 direction = text[:3] 358 speed = text[3:] 359 return direction, speed, gust
Extract the direction, speed, and gust from a wind element.
362def get_wind( 363 data: list[str], units: Units 364) -> tuple[ 365 list[str], 366 Number | None, 367 Number | None, 368 Number | None, 369 list[Number], 370]: 371 """Return the report list, direction string, speed string, gust string, and variable direction list.""" 372 direction, speed, gust = "", "", "" 373 variable: list[Number] = [] 374 # Remove unit and split elements 375 if data: 376 item = copy(data[0]) 377 if is_wind(item): 378 for key, unit in WIND_UNITS.items(): 379 if item.endswith(key): 380 units.wind_speed = unit 381 item = item.replace(key, "") 382 break 383 direction, speed, gust = separate_wind(item) 384 data.pop(0) 385 # Separated Gust 386 if data and 1 < len(data[0]) < 4 and data[0][0] == "G" and data[0][1:].isdigit(): 387 gust = data.pop(0)[1:] 388 # Variable Wind Direction 389 if data and is_variable_wind_direction(data[0]): 390 for item in data.pop(0).split("V"): 391 value = make_number(item, speak=item, literal=True) 392 if value is not None: 393 variable.append(value) 394 # Convert to Number 395 direction_value = make_number(direction, speak=direction, literal=True) 396 speed_value = make_number(speed.strip("BV"), m_minus=False) 397 gust_value = make_number(gust, m_minus=False) 398 return data, direction_value, speed_value, gust_value, variable
Return the report list, direction string, speed string, gust string, and variable direction list.
401def get_visibility(data: list[str], units: Units) -> tuple[list[str], Number | None]: 402 """Return the report list and removed visibility string.""" 403 visibility = "" 404 if data: 405 item = copy(data[0]) 406 # Vis reported in statue miles 407 if item.endswith("SM"): # 10SM 408 if item[:-2].isdigit(): 409 visibility = str(int(item[:-2])) 410 elif "/" in item: 411 visibility = item[: item.find("SM")] # 1/2SM 412 else: 413 visibility = item[:-2] 414 data.pop(0) 415 units.visibility = "sm" 416 # Vis reported in meters 417 elif len(item) == 4 and item.isdigit(): 418 visibility = data.pop(0) 419 units.visibility = "m" 420 elif 7 >= len(item) >= 5 and item[:4].isdigit() and (item[4] in ["M", "N", "S", "E", "W"] or item[4:] == "NDV"): 421 visibility = data.pop(0)[:4] 422 units.visibility = "m" 423 elif len(item) == 5 and item[1:].isdigit() and item[0] in ["M", "P", "B"]: 424 visibility = data.pop(0)[1:] 425 units.visibility = "m" 426 elif item.endswith("KM"): 427 visibility = f"{item[:-2]}000" 428 data.pop(0) 429 units.visibility = "m" 430 # Vis statute miles but split Ex: 2 1/2SM 431 elif len(data) > 1 and data[1].endswith("SM") and "/" in data[1] and item.isdigit(): 432 vis1 = data.pop(0) # 2 433 vis2 = data.pop(0).replace("SM", "") # 1/2 434 visibility = str(int(vis1) * int(vis2[2]) + int(vis2[0])) + vis2[1:] # 5/2 435 units.visibility = "sm" 436 return data, make_number(visibility, m_minus=False)
Return the report list and removed visibility string.
439def sanitize_cloud(cloud: str) -> str: 440 """Fix rare cloud layer issues.""" 441 if len(cloud) < 4: 442 return cloud 443 if not cloud[3].isdigit() and cloud[3] not in ("/", "-"): 444 # Bad "O": FEWO03 -> FEW003 445 if cloud[3] == "O": 446 cloud = f"{cloud[:3]}0{cloud[4:]}" 447 # Move modifiers to end: BKNC015 -> BKN015C 448 elif cloud[3] != "U" and cloud[:4] not in {"BASE", "UNKN"}: 449 cloud = cloud[:3] + cloud[4:] + cloud[3] 450 return cloud
Fix rare cloud layer issues.
461def make_cloud(cloud: str) -> Cloud: 462 """Return a Cloud dataclass for a cloud string. 463 464 This function assumes the input is potentially valid. 465 """ 466 raw_cloud = cloud 467 cloud_type = "" 468 base: str | None = None 469 top: str | None = None 470 cloud = sanitize_cloud(cloud).replace("/", "") 471 # Separate top 472 for target in _TOP_OFFSETS: 473 topi = cloud.find(target) 474 if topi > -1: 475 top, cloud = cloud[topi + len(target) :], cloud[:topi] 476 break 477 # Separate type 478 ## BASE027 479 if cloud.startswith("BASES"): 480 cloud = cloud[5:] 481 elif cloud.startswith("BASE"): 482 cloud = cloud[4:] 483 ## VV003 484 elif cloud.startswith("VV"): 485 cloud_type, cloud = cloud[:2], cloud[2:] 486 ## FEW010 487 elif len(cloud) >= 3 and cloud[:3] in CLOUD_LIST: 488 cloud_type, cloud = cloud[:3], cloud[3:] 489 ## BKN-OVC065 490 if len(cloud) > 4 and cloud[0] == "-" and cloud[1:4] in CLOUD_LIST: 491 cloud_type += cloud[:4] 492 cloud = cloud[4:] 493 # Separate base 494 if len(cloud) >= 3 and cloud[:3].isdigit(): 495 base, cloud = cloud[:3], cloud[3:] 496 elif len(cloud) >= 4 and cloud[:4] == "UNKN": 497 cloud = cloud[4:] 498 # Remainder is considered modifiers 499 modifier = cloud or None 500 # Make Cloud 501 return Cloud(raw_cloud, cloud_type or None, _null_or_int(base), _null_or_int(top), modifier)
Return a Cloud dataclass for a cloud string.
This function assumes the input is potentially valid.
504def get_clouds(data: list[str]) -> tuple[list[str], list]: 505 """Return the report list and removed list of split cloud layers.""" 506 clouds = [] 507 for i, item in reversed(list(enumerate(data))): 508 if item[:3] in CLOUD_LIST or item[:2] == "VV": 509 cloud = data.pop(i) 510 clouds.append(make_cloud(cloud)) 511 # Attempt cloud sort. Fails if None values are present 512 try: 513 clouds.sort(key=lambda cloud: (cloud.base, cloud.type)) 514 except TypeError: 515 clouds.reverse() # Restores original report order 516 return data, clouds
Return the report list and removed list of split cloud layers.
519def get_flight_rules(visibility: Number | None, ceiling: Cloud | None) -> int: 520 """Return int based on current flight rules from parsed METAR data. 521 522 0=VFR, 1=MVFR, 2=IFR, 3=LIFR 523 524 Note: Common practice is to report no higher than IFR if visibility unavailable. 525 """ 526 # Parse visibility 527 vis: _Numeric 528 if visibility is None: 529 vis = 2 530 elif visibility.repr == "CAVOK" or visibility.repr.startswith("P6"): 531 vis = 10 532 elif visibility.repr.startswith("M"): 533 vis = 0 534 elif visibility.value is None: 535 vis = 2 536 # Convert meters to miles 537 elif len(visibility.repr) == 4: 538 vis = (visibility.value or 0) * 0.000621371 539 else: 540 vis = visibility.value or 0 541 # Parse ceiling 542 cld = (ceiling.base if ceiling else 99) or 99 543 # Determine flight rules 544 if (vis <= 5) or (cld <= 30): 545 if (vis < 3) or (cld < 10): 546 if (vis < 1) or (cld < 5): 547 return 3 # LIFR 548 return 2 # IFR 549 return 1 # MVFR 550 return 0 # VFR
Return int based on current flight rules from parsed METAR data.
0=VFR, 1=MVFR, 2=IFR, 3=LIFR
Note: Common practice is to report no higher than IFR if visibility unavailable.
553def get_ceiling(clouds: list[Cloud]) -> Cloud | None: 554 """Return ceiling layer from Cloud-List or None if none found. 555 556 Assumes that the clouds are already sorted lowest to highest. 557 558 Only 'Broken', 'Overcast', and 'Vertical Visibility' are considered ceilings. 559 560 Prevents errors due to lack of cloud information (eg. '' or 'FEW///') 561 """ 562 return next((c for c in clouds if c.base and c.type in {"OVC", "BKN", "VV"}), None)
Return ceiling layer from Cloud-List or None if none found.
Assumes that the clouds are already sorted lowest to highest.
Only 'Broken', 'Overcast', and 'Vertical Visibility' are considered ceilings.
Prevents errors due to lack of cloud information (eg. '' or 'FEW///')
565def is_altitude(value: str) -> bool: 566 """Return True if the value is a possible altitude.""" 567 if len(value) < 5: 568 return False 569 if value.startswith("SFC/"): 570 return True 571 if value.startswith("FL") and value[2:5].isdigit(): 572 return True 573 first, *_ = value.split("/") 574 return bool(first[-2:] == "FT" and first[-5:-2].isdigit())
Return True if the value is a possible altitude.
577def make_altitude( 578 value: str, 579 units: Units, 580 repr: str | None = None, # noqa: A002 581 *, 582 force_fl: bool = False, 583) -> tuple[Number | None, Units]: 584 """Convert altitude string into a number.""" 585 if not value: 586 return None, units 587 raw = repr or value 588 for end in ("FT", "M"): 589 if value.endswith(end): 590 force_fl = False 591 units.altitude = end.lower() 592 value = value.removesuffix(end) 593 # F430 594 if value[0] == "F" and value[1:].isdigit(): 595 value = f"FL{value[1:]}" 596 if force_fl and value[:2] != "FL": 597 value = f"FL{value}" 598 return make_number(value, repr=raw), units
Convert altitude string into a number.
601def parse_date( 602 date: str, 603 hour_threshold: int = 200, 604 *, 605 time_only: bool = False, 606 target: dt.date | None = None, 607) -> dt.datetime | None: 608 """Parse a report timestamp in ddhhZ or ddhhmmZ format. 609 610 If time_only, assumes hhmm format with current or previous day. 611 612 This function assumes the given timestamp is within the hour threshold from current date. 613 """ 614 # Format date string 615 date = date.strip("Z") 616 if not date.isdigit(): 617 return None 618 if time_only: 619 if len(date) != 4: 620 return None 621 index_hour = 0 622 else: 623 if len(date) == 4: 624 date += "00" 625 if len(date) != 6: 626 return None 627 index_hour = 2 628 # Create initial guess 629 if target: 630 target = dt.datetime(target.year, target.month, target.day, tzinfo=dt.timezone.utc) 631 else: 632 target = dt.datetime.now(tz=dt.timezone.utc) 633 day = target.day if time_only else int(date[:2]) 634 hour = int(date[index_hour : index_hour + 2]) 635 # Handle situation where next month has less days than current month 636 # Shifted value makes sure that a month shift doesn't happen twice 637 shifted = False 638 if day > monthrange(target.year, target.month)[1]: 639 target += relativedelta(months=-1) 640 shifted = True 641 try: 642 guess = target.replace( 643 day=day, 644 hour=hour % 24, 645 minute=int(date[index_hour + 2 : index_hour + 4]) % 60, 646 second=0, 647 microsecond=0, 648 ) 649 except ValueError: 650 return None 651 # Handle overflow hour 652 if hour > 23: 653 guess += dt.timedelta(days=1) 654 # Handle changing months if not already shifted 655 if not shifted: 656 hourdiff = (guess - target) / dt.timedelta(minutes=1) / 60 657 if hourdiff > hour_threshold: 658 guess += relativedelta(months=-1) 659 elif hourdiff < -hour_threshold: 660 guess += relativedelta(months=+1) 661 return guess
Parse a report timestamp in ddhhZ or ddhhmmZ format.
If time_only, assumes hhmm format with current or previous day.
This function assumes the given timestamp is within the hour threshold from current date.
664def make_timestamp( 665 timestamp: str | None, 666 *, 667 time_only: bool = False, 668 target_date: dt.date | None = None, 669) -> Timestamp | None: 670 """Return a Timestamp dataclass for a report timestamp in ddhhZ or ddhhmmZ format.""" 671 if not timestamp: 672 return None 673 date_obj = parse_date(timestamp, time_only=time_only, target=target_date) 674 return Timestamp(timestamp, date_obj)
Return a Timestamp dataclass for a report timestamp in ddhhZ or ddhhmmZ format.
677def is_runway_visibility(item: str) -> bool: 678 """Return True if the item is a runway visibility range string.""" 679 return ( 680 len(item) > 4 681 and item[0] == "R" 682 and (item[3] == "/" or item[4] == "/") 683 and item[1:3].isdigit() 684 and "CLRD" not in item # R28/CLRD70 Runway State 685 )
Return True if the item is a runway visibility range string.