
Contains the core parsing and indent functions of avwx.

  1"""Contains the core parsing and indent functions of avwx."""
  3# stdlib
  4from __future__ import annotations
  6import datetime as dt
  7import math
  8import re
  9from calendar import monthrange
 10from contextlib import suppress
 11from copy import copy
 12from typing import TYPE_CHECKING, Any
 14# library
 15from dateutil.relativedelta import relativedelta
 17# module
 18from avwx.static.core import (
 20    CLOUD_LIST,
 24    WIND_UNITS,
 26from avwx.structs import Cloud, Fraction, Number, Timestamp, Units
 29    from collections.abc import Iterable
 32def dedupe(items: Iterable[Any], *, only_neighbors: bool = False) -> list[Any]:
 33    """Deduplicate a list while keeping order.
 35    If only_neighbors is True, dedupe will only check neighboring values.
 36    """
 37    ret: list[Any] = []
 38    for item in items:
 39        if (only_neighbors and ret and ret[-1] != item) or item not in ret:
 40            ret.append(item)
 41    return ret
 44def is_unknown(value: str) -> bool:
 45    """Return True if val represents and unknown value."""
 46    if not isinstance(value, str):
 47        raise TypeError
 48    if not value or value.upper() in {"UNKN", "UNK", "UKN"}:
 49        return True
 50    for char in value:
 51        if char not in ("/", "X", "."):
 52            break
 53    else:
 54        return True
 55    return False
 58def get_digit_list(data: list[str], from_index: int) -> tuple[list[str], list[str]]:
 59    """Return a list of items removed from a given list of strings
 60    that are all digits from 'from_index' until hitting a non-digit item.
 61    """
 62    ret = []
 63    data.pop(from_index)
 64    while len(data) > from_index and data[from_index].isdigit():
 65        ret.append(data.pop(from_index))
 66    return data, ret
 69def unpack_fraction(num: str) -> str:
 70    """Return unpacked fraction string 5/2 -> 2 1/2."""
 71    numbers = [int(n) for n in num.split("/") if n]
 72    if len(numbers) != 2 or numbers[0] <= numbers[1]:
 73        return num
 74    numerator, denominator = numbers
 75    over = numerator // denominator
 76    rem = numerator % denominator
 77    return f"{over} {rem}/{denominator}"
 80def remove_leading_zeros(num: str) -> str:
 81    """Strip zeros while handling -, M, and empty strings."""
 82    if not num:
 83        return num
 84    if num.startswith("M"):
 85        ret = "M" + num[1:].lstrip("0")
 86    elif num.startswith("-"):
 87        ret = "-" + num[1:].lstrip("0")
 88    else:
 89        ret = num.lstrip("0")
 90    return "0" if ret in ("", "M", "-") else ret
 94    (" zero zero zero", " thousand"),
 95    (" zero zero", " hundred"),
 99def spoken_number(num: str, *, literal: bool = False) -> str:
100    """Return the spoken version of a number.
102    If literal, no conversion to hundreds/thousands
104    Ex: 1.2 -> one point two
105        1 1/2 -> one and one half
106        25000 -> two five thousand
107    """
108    ret = []
109    for part in num.split():
110        if part in FRACTIONS:
111            ret.append(FRACTIONS[part])
112        else:
113            val = " ".join(NUMBER_REPL[char] for char in part if char in NUMBER_REPL)
114            if not literal:
115                for target, replacement in SPOKEN_POSTFIX:
116                    if val.endswith(target):
117                        val = val[: -len(target)] + replacement
118            ret.append(val)
119    return " and ".join(ret)
122def make_fraction(
123    num: str,
124    repr: str | None = None,  # noqa: A002
125    *,
126    literal: bool = False,
127    speak_prefix: str = "",
128) -> Fraction:
129    """Return a fraction dataclass for numbers with / in them."""
130    num_str, den_str = num.split("/")
131    # 2-1/2 but not -2 1/2
132    if "-" in num_str and not num_str.startswith("-"):
133        num_str = num_str.replace("-", " ")
134    denominator = int(den_str)
135    # Multiply multi-digit numerator
136    if len(num_str) > 1:
137        numerator = int(num_str[:-1]) * denominator + int(num_str[-1])
138        num = f"{numerator}/{denominator}"
139    else:
140        numerator = int(num_str)
141    value = numerator / denominator
142    unpacked = unpack_fraction(num)
143    spoken = speak_prefix + spoken_number(unpacked, literal=literal)
144    return Fraction(repr or num, value, spoken, numerator, denominator, unpacked)
147def make_number(
148    num: str | None,
149    repr: str | None = None,  # noqa: A002
150    speak: str | None = None,
151    *,
152    literal: bool = False,
153    special: dict | None = None,
154    m_minus: bool = True,
155) -> Number | Fraction | None:
156    """Return a Number or Fraction dataclass for a number string.
158    If literal, spoken string will not convert to hundreds/thousands.
160    NOTE: Numerators are assumed to have a single digit. Additional are whole numbers.
161    """
162    if not num or is_unknown(num):
163        return None
164    # Check special
165    with suppress(KeyError):
166        item = (special or {}).get(num) or SPECIAL_NUMBERS[num]
167        if isinstance(item, tuple):
168            value, spoken = item
169        else:
170            value = item
171            spoken = spoken_number(str(value), literal=literal)
172        return Number(repr or num, value, spoken)
173    # Check cardinal direction
174    if num in CARDINALS:
175        if not repr:
176            repr = num  # noqa: A001
177        num = str(CARDINALS[num])
178    val_str = num
179    # Remove unit suffixes
180    if val_str.endswith("SM"):
181        repr = val_str[:]  # noqa: A001
182        val_str = val_str[:-2]
183    # Remove spurious characters from the end
184    num = num.rstrip("M.")
185    num = num.replace("O", "0")
186    num = num.replace("+", "")
187    num = num.replace(",", "")
188    # Handle Minus values with errors like 0M04
189    if m_minus and "M" in num:
190        val_str = num.replace("MM", "-").replace("M", "-")
191        while val_str[0] != "-":
192            val_str = val_str[1:]
193    # Check value prefixes
194    speak_prefix = ""
195    if val_str.startswith("ABV "):
196        speak_prefix += "above "
197        val_str = val_str[4:]
198    if val_str.startswith("BLW "):
199        speak_prefix += "below "
200        val_str = val_str[4:]
201    if val_str.startswith("FL"):
202        speak_prefix += "flight level "
203        val_str, literal = val_str[2:], True
204    if val_str.startswith("M"):
205        speak_prefix += "less than "
206        repr = repr or val_str  # noqa: A001
207        val_str = val_str[1:]
208    if val_str.startswith("P"):
209        speak_prefix += "greater than "
210        repr = repr or val_str  # noqa: A001
211        val_str = val_str[1:]
212    # Create Number
213    if not val_str:
214        return None
215    ret: Number | Fraction | None = None
216    # Create Fraction
217    if "/" in val_str:
218        ret = make_fraction(val_str, repr, literal=literal, speak_prefix=speak_prefix)
219    else:
220        # Overwrite float 0 due to "0.0" literal
221        value = float(val_str) or 0 if "." in num else int(val_str)
222        spoken = speak_prefix + spoken_number(speak or str(value), literal=literal)
223        ret = Number(repr or num, value, spoken)
224    # Null the value if "greater than"/"less than"
225    if ret and not m_minus and repr and repr.startswith(("M", "P")):
226        ret.value = None
227    return ret
230def find_first_in_list(txt: str, str_list: list[str]) -> int:
231    """Return the index of the earliest occurrence of an item from a list in a string.
233    Ex: find_first_in_list('foobar', ['bar', 'fin']) -> 3
234    """
235    start = len(txt) + 1
236    for item in str_list:
237        if start > txt.find(item) > -1:
238            start = txt.find(item)
239    return start if len(txt) + 1 > start > -1 else -1
242def is_timestamp(item: str) -> bool:
243    """Return True if the item matches the timestamp format."""
244    return len(item) == 7 and item[-1] == "Z" and item[:-1].isdigit()
247def is_timerange(item: str) -> bool:
248    """Return True if the item is a TAF to-from time range."""
249    return len(item) == 9 and item[4] == "/" and item[:4].isdigit() and item[5:].isdigit()
252def is_possible_temp(temp: str) -> bool:
253    """Return True if all characters are digits or 'M' for minus."""
254    return all((char.isdigit() or char == "M") for char in temp)
257_Numeric = int | float
260def relative_humidity(temperature: _Numeric, dewpoint: _Numeric, unit: str = "C") -> float:
261    """Calculate the relative humidity as a 0 to 1 percentage."""
263    def saturation(value: _Numeric) -> float:
264        """Return the saturation vapor pressure without the C constant for humidity calc."""
265        return math.exp((17.67 * value) / (243.5 + value))
267    if unit == "F":
268        dewpoint = (dewpoint - 32) * 5 / 9
269        temperature = (temperature - 32) * 5 / 9
270    return saturation(dewpoint) / saturation(temperature)
273# https://aviation.stackexchange.com/questions/47971/how-do-i-calculate-density-altitude-by-hand
276def pressure_altitude(pressure: float, altitude: _Numeric, unit: str = "inHg") -> int:
277    """Calculate the pressure altitude in feet. Converts pressure units."""
278    if unit == "hPa":
279        pressure *= 0.02953
280    return round((29.92 - pressure) * 1000 + altitude)
283def density_altitude(pressure: float, temperature: _Numeric, altitude: _Numeric, units: Units) -> int:
284    """Calculate the density altitude in feet. Converts pressure and temperature units."""
285    if units.temperature == "F":
286        temperature = (temperature - 32) * 5 / 9
287    if units.altimeter == "hPa":
288        pressure *= 0.02953
289    pressure_alt = pressure_altitude(pressure, altitude)
290    standard = 15 - (2 * altitude / 1000)
291    return round(((temperature - standard) * 120) + pressure_alt)
294def get_station_and_time(
295    data: list[str],
296) -> tuple[list[str], str | None, str | None]:
297    """Return the report list and removed station ident and time strings."""
298    if not data:
299        return data, None, None
300    station = data.pop(0)
301    if not data:
302        return data, station, None
303    q_time, r_time = data[0], None
304    if data and q_time.endswith("Z") and q_time[:-1].isdigit():
305        r_time = data.pop(0)
306    elif data and len(q_time) == 6 and q_time.isdigit():
307        r_time = f"{data.pop(0)}Z"
308    return data, station, r_time
311def is_wind(text: str) -> bool:
312    """Return True if the text is likely a normal wind element."""
313    # Ignore wind shear
314    if text.startswith("WS"):
315        return False
316    # 09010KT, 09010G15KT
317    if len(text) > 4:
318        for ending in WIND_UNITS:
319            unit_index = text.find(ending)
320            if text.endswith(ending) and text[unit_index - 2 : unit_index].isdigit():
321                return True
322    # 09010  09010G15 VRB10
323    if len(text) != 5 and (len(text) < 8 or "G" not in text or "/" in text):
324        return False
325    return text[:5].isdigit() or (text.startswith("VRB") and text[3:5].isdigit())
328VARIABLE_DIRECTION_PATTERN = re.compile(r"\d{3}V\d{3}")
331def is_variable_wind_direction(text: str) -> bool:
332    """Return True if element looks like 350V040."""
333    if len(text) < 7:
334        return False
335    return VARIABLE_DIRECTION_PATTERN.match(text[:7]) is not None
338def separate_wind(text: str) -> tuple[str, str, str]:
339    """Extract the direction, speed, and gust from a wind element."""
340    direction, speed, gust = "", "", ""
341    # Remove gust
342    if "G" in text:
343        g_index = text.find("G")
344        start, end = g_index + 1, g_index + 3
345        # 16006GP99KT ie gust greater than
346        if "GP" in text:
347            end += 1
348        gust = text[start:end]
349        text = text[:g_index] + text[end:]
350    if text:
351        # 10G18KT
352        if len(text) == 2:
353            speed = text
354        else:
355            direction = text[:3]
356            speed = text[3:]
357    return direction, speed, gust
360def get_wind(
361    data: list[str], units: Units
362) -> tuple[
363    list[str],
364    Number | None,
365    Number | None,
366    Number | None,
367    list[Number],
369    """Return the report list, direction string, speed string, gust string, and variable direction list."""
370    direction, speed, gust = "", "", ""
371    variable: list[Number] = []
372    # Remove unit and split elements
373    if data:
374        item = copy(data[0])
375        if is_wind(item):
376            for key, unit in WIND_UNITS.items():
377                if item.endswith(key):
378                    units.wind_speed = unit
379                    item = item.replace(key, "")
380                    break
381            direction, speed, gust = separate_wind(item)
382            data.pop(0)
383    # Separated Gust
384    if data and 1 < len(data[0]) < 4 and data[0][0] == "G" and data[0][1:].isdigit():
385        gust = data.pop(0)[1:]
386    # Variable Wind Direction
387    if data and is_variable_wind_direction(data[0]):
388        for item in data.pop(0).split("V"):
389            value = make_number(item, speak=item, literal=True)
390            if value is not None:
391                variable.append(value)
392    # Convert to Number
393    direction_value = make_number(direction, speak=direction, literal=True)
394    speed_value = make_number(speed.strip("BV"), m_minus=False)
395    gust_value = make_number(gust, m_minus=False)
396    return data, direction_value, speed_value, gust_value, variable
399def get_visibility(data: list[str], units: Units) -> tuple[list[str], Number | None]:
400    """Return the report list and removed visibility string."""
401    visibility = ""
402    if data:
403        item = copy(data[0])
404        # Vis reported in statue miles
405        if item.endswith("SM"):  # 10SM
406            if item[:-2].isdigit():
407                visibility = str(int(item[:-2]))
408            elif "/" in item:
409                visibility = item[: item.find("SM")]  # 1/2SM
410            else:
411                visibility = item[:-2]
412            data.pop(0)
413            units.visibility = "sm"
414        # Vis reported in meters
415        elif len(item) == 4 and item.isdigit():
416            visibility = data.pop(0)
417            units.visibility = "m"
418        elif 7 >= len(item) >= 5 and item[:4].isdigit() and (item[4] in ["M", "N", "S", "E", "W"] or item[4:] == "NDV"):
419            visibility = data.pop(0)[:4]
420            units.visibility = "m"
421        elif len(item) == 5 and item[1:].isdigit() and item[0] in ["M", "P", "B"]:
422            visibility = data.pop(0)[1:]
423            units.visibility = "m"
424        elif item.endswith("KM"):
425            visibility = f"{item[:-2]}000"
426            data.pop(0)
427            units.visibility = "m"
428        # Vis statute miles but split Ex: 2 1/2SM
429        elif len(data) > 1 and data[1].endswith("SM") and "/" in data[1] and item.isdigit():
430            vis1 = data.pop(0)  # 2
431            vis2 = data.pop(0).replace("SM", "")  # 1/2
432            visibility = str(int(vis1) * int(vis2[2]) + int(vis2[0])) + vis2[1:]  # 5/2
433            units.visibility = "sm"
434    return data, make_number(visibility, m_minus=False)
437def sanitize_cloud(cloud: str) -> str:
438    """Fix rare cloud layer issues."""
439    if len(cloud) < 4:
440        return cloud
441    if not cloud[3].isdigit() and cloud[3] not in ("/", "-"):
442        # Bad "O": FEWO03 -> FEW003
443        if cloud[3] == "O":
444            cloud = f"{cloud[:3]}0{cloud[4:]}"
445        # Move modifiers to end: BKNC015 -> BKN015C
446        elif cloud[3] != "U" and cloud[:4] not in {"BASE", "UNKN"}:
447            cloud = cloud[:3] + cloud[4:] + cloud[3]
448    return cloud
451def _null_or_int(val: str | None) -> int | None:
452    """Nullify unknown elements and convert ints."""
453    return None if not isinstance(val, str) or is_unknown(val) else int(val)
456_TOP_OFFSETS = ("-TOPS", "-TOP")
459def make_cloud(cloud: str) -> Cloud:
460    """Return a Cloud dataclass for a cloud string.
462    This function assumes the input is potentially valid.
463    """
464    raw_cloud = cloud
465    cloud_type = ""
466    base: str | None = None
467    top: str | None = None
468    cloud = sanitize_cloud(cloud).replace("/", "")
469    # Separate top
470    for target in _TOP_OFFSETS:
471        topi = cloud.find(target)
472        if topi > -1:
473            top, cloud = cloud[topi + len(target) :], cloud[:topi]
474            break
475    # Separate type
476    ## BASE027
477    if cloud.startswith("BASES"):
478        cloud = cloud[5:]
479    elif cloud.startswith("BASE"):
480        cloud = cloud[4:]
481    ## VV003
482    elif cloud.startswith("VV"):
483        cloud_type, cloud = cloud[:2], cloud[2:]
484    ## FEW010
485    elif len(cloud) >= 3 and cloud[:3] in CLOUD_LIST:
486        cloud_type, cloud = cloud[:3], cloud[3:]
487    ## BKN-OVC065
488    if len(cloud) > 4 and cloud[0] == "-" and cloud[1:4] in CLOUD_LIST:
489        cloud_type += cloud[:4]
490        cloud = cloud[4:]
491    # Separate base
492    if len(cloud) >= 3 and cloud[:3].isdigit():
493        base, cloud = cloud[:3], cloud[3:]
494    elif len(cloud) >= 4 and cloud[:4] == "UNKN":
495        cloud = cloud[4:]
496    # Remainder is considered modifiers
497    modifier = cloud or None
498    # Make Cloud
499    return Cloud(raw_cloud, cloud_type or None, _null_or_int(base), _null_or_int(top), modifier)
502def get_clouds(data: list[str]) -> tuple[list[str], list]:
503    """Return the report list and removed list of split cloud layers."""
504    clouds = []
505    for i, item in reversed(list(enumerate(data))):
506        if item[:3] in CLOUD_LIST or item[:2] == "VV":
507            cloud = data.pop(i)
508            clouds.append(make_cloud(cloud))
509    # Attempt cloud sort. Fails if None values are present
510    try:
511        clouds.sort(key=lambda cloud: (cloud.base, cloud.type))
512    except TypeError:
513        clouds.reverse()  # Restores original report order
514    return data, clouds
517def get_flight_rules(visibility: Number | None, ceiling: Cloud | None) -> int:
518    """Return int based on current flight rules from parsed METAR data.
520    0=VFR, 1=MVFR, 2=IFR, 3=LIFR
522    Note: Common practice is to report no higher than IFR if visibility unavailable.
523    """
524    # Parse visibility
525    vis: _Numeric
526    if visibility is None:
527        vis = 2
528    elif visibility.repr == "CAVOK" or visibility.repr.startswith("P6"):
529        vis = 10
530    elif visibility.repr.startswith("M"):
531        vis = 0
532    elif visibility.value is None:
533        vis = 2
534    # Convert meters to miles
535    elif len(visibility.repr) == 4:
536        vis = (visibility.value or 0) * 0.000621371
537    else:
538        vis = visibility.value or 0
539    # Parse ceiling
540    cld = (ceiling.base if ceiling else 99) or 99
541    # Determine flight rules
542    if (vis <= 5) or (cld <= 30):
543        if (vis < 3) or (cld < 10):
544            if (vis < 1) or (cld < 5):
545                return 3  # LIFR
546            return 2  # IFR
547        return 1  # MVFR
548    return 0  # VFR
551def get_ceiling(clouds: list[Cloud]) -> Cloud | None:
552    """Return ceiling layer from Cloud-List or None if none found.
554    Assumes that the clouds are already sorted lowest to highest.
556    Only 'Broken', 'Overcast', and 'Vertical Visibility' are considered ceilings.
558    Prevents errors due to lack of cloud information (eg. '' or 'FEW///')
559    """
560    return next((c for c in clouds if c.base and c.type in {"OVC", "BKN", "VV"}), None)
563def is_altitude(value: str) -> bool:
564    """Return True if the value is a possible altitude."""
565    if len(value) < 5:
566        return False
567    if value.startswith("SFC/"):
568        return True
569    if value.startswith("FL") and value[2:5].isdigit():
570        return True
571    first, *_ = value.split("/")
572    return bool(first[-2:] == "FT" and first[-5:-2].isdigit())
575def make_altitude(
576    value: str,
577    units: Units,
578    repr: str | None = None,  # noqa: A002
579    *,
580    force_fl: bool = False,
581) -> tuple[Number | None, Units]:
582    """Convert altitude string into a number."""
583    if not value:
584        return None, units
585    raw = repr or value
586    for end in ("FT", "M"):
587        if value.endswith(end):
588            force_fl = False
589            units.altitude = end.lower()
590            value = value.removesuffix(end)
591    # F430
592    if value[0] == "F" and value[1:].isdigit():
593        value = f"FL{value[1:]}"
594    if force_fl and value[:2] != "FL":
595        value = f"FL{value}"
596    return make_number(value, repr=raw), units
599def parse_date(
600    date: str,
601    hour_threshold: int = 200,
602    *,
603    time_only: bool = False,
604    target: dt.date | None = None,
605) -> dt.datetime | None:
606    """Parse a report timestamp in ddhhZ or ddhhmmZ format.
608    If time_only, assumes hhmm format with current or previous day.
610    This function assumes the given timestamp is within the hour threshold from current date.
611    """
612    # Format date string
613    date = date.strip("Z")
614    if not date.isdigit():
615        return None
616    if time_only:
617        if len(date) != 4:
618            return None
619        index_hour = 0
620    else:
621        if len(date) == 4:
622            date += "00"
623        if len(date) != 6:
624            return None
625        index_hour = 2
626    # Create initial guess
627    if target:
628        target = dt.datetime(target.year, target.month, target.day, tzinfo=dt.timezone.utc)
629    else:
630        target = dt.datetime.now(tz=dt.timezone.utc)
631    day = target.day if time_only else int(date[:2])
632    hour = int(date[index_hour : index_hour + 2])
633    # Handle situation where next month has less days than current month
634    # Shifted value makes sure that a month shift doesn't happen twice
635    shifted = False
636    if day > monthrange(target.year, target.month)[1]:
637        target += relativedelta(months=-1)
638        shifted = True
639    try:
640        guess = target.replace(
641            day=day,
642            hour=hour % 24,
643            minute=int(date[index_hour + 2 : index_hour + 4]) % 60,
644            second=0,
645            microsecond=0,
646        )
647    except ValueError:
648        return None
649    # Handle overflow hour
650    if hour > 23:
651        guess += dt.timedelta(days=1)
652    # Handle changing months if not already shifted
653    if not shifted:
654        hourdiff = (guess - target) / dt.timedelta(minutes=1) / 60
655        if hourdiff > hour_threshold:
656            guess += relativedelta(months=-1)
657        elif hourdiff < -hour_threshold:
658            guess += relativedelta(months=+1)
659    return guess
662def make_timestamp(
663    timestamp: str | None,
664    *,
665    time_only: bool = False,
666    target_date: dt.date | None = None,
667) -> Timestamp | None:
668    """Return a Timestamp dataclass for a report timestamp in ddhhZ or ddhhmmZ format."""
669    if not timestamp:
670        return None
671    date_obj = parse_date(timestamp, time_only=time_only, target=target_date)
672    return Timestamp(timestamp, date_obj)
675def is_runway_visibility(item: str) -> bool:
676    """Return True if the item is a runway visibility range string."""
677    return (
678        len(item) > 4
679        and item[0] == "R"
680        and (item[3] == "/" or item[4] == "/")
681        and item[1:3].isdigit()
682        and "CLRD" not in item  # R28/CLRD70 Runway State
683    )
