avwx.parsing.core

Contains the core parsing and indent functions of avwx.

  1"""Contains the core parsing and indent functions of avwx."""
  2
  3# stdlib
  4from __future__ import annotations
  5
  6import datetime as dt
  7import math
  8import re
  9from calendar import monthrange
 10from contextlib import suppress
 11from copy import copy
 12from typing import TYPE_CHECKING, Any
 13
 14# library
 15from dateutil.relativedelta import relativedelta
 16
 17# module
 18from avwx.static.core import (
 19    CARDINALS,
 20    CLOUD_LIST,
 21    FRACTIONS,
 22    NUMBER_REPL,
 23    SPECIAL_NUMBERS,
 24    WIND_UNITS,
 25)
 26from avwx.structs import Cloud, Fraction, Number, Timestamp, Units
 27
 28if TYPE_CHECKING:
 29    from collections.abc import Iterable
 30
 31
 32def dedupe(items: Iterable[Any], *, only_neighbors: bool = False) -> list[Any]:
 33    """Deduplicate a list while keeping order.
 34
 35    If only_neighbors is True, dedupe will only check neighboring values.
 36    """
 37    ret: list[Any] = []
 38    for item in items:
 39        if (only_neighbors and ret and ret[-1] != item) or item not in ret:
 40            ret.append(item)
 41    return ret
 42
 43
 44def is_unknown(value: str) -> bool:
 45    """Return True if val represents and unknown value."""
 46    if not isinstance(value, str):
 47        raise TypeError
 48    if not value or value.upper() in {"UNKN", "UNK", "UKN"}:
 49        return True
 50    for char in value:
 51        if char not in ("/", "X", "."):
 52            break
 53    else:
 54        return True
 55    return False
 56
 57
 58def get_digit_list(data: list[str], from_index: int) -> tuple[list[str], list[str]]:
 59    """Return a list of items removed from a given list of strings
 60    that are all digits from 'from_index' until hitting a non-digit item.
 61    """
 62    ret = []
 63    data.pop(from_index)
 64    while len(data) > from_index and data[from_index].isdigit():
 65        ret.append(data.pop(from_index))
 66    return data, ret
 67
 68
 69def unpack_fraction(num: str) -> str:
 70    """Return unpacked fraction string 5/2 -> 2 1/2."""
 71    numbers = [int(n) for n in num.split("/") if n]
 72    if len(numbers) != 2 or numbers[0] <= numbers[1]:
 73        return num
 74    numerator, denominator = numbers
 75    over = numerator // denominator
 76    rem = numerator % denominator
 77    return f"{over} {rem}/{denominator}"
 78
 79
 80def remove_leading_zeros(num: str) -> str:
 81    """Strip zeros while handling -, M, and empty strings."""
 82    if not num:
 83        return num
 84    if num.startswith("M"):
 85        ret = "M" + num[1:].lstrip("0")
 86    elif num.startswith("-"):
 87        ret = "-" + num[1:].lstrip("0")
 88    else:
 89        ret = num.lstrip("0")
 90    return "0" if ret in ("", "M", "-") else ret
 91
 92
 93SPOKEN_POSTFIX = (
 94    (" zero zero zero", " thousand"),
 95    (" zero zero", " hundred"),
 96)
 97
 98
 99def spoken_number(num: str, *, literal: bool = False) -> str:
100    """Return the spoken version of a number.
101
102    If literal, no conversion to hundreds/thousands
103
104    Ex: 1.2 -> one point two
105        1 1/2 -> one and one half
106        25000 -> two five thousand
107    """
108    ret = []
109    for part in num.split():
110        if part in FRACTIONS:
111            ret.append(FRACTIONS[part])
112        else:
113            val = " ".join(NUMBER_REPL[char] for char in part if char in NUMBER_REPL)
114            if not literal:
115                for target, replacement in SPOKEN_POSTFIX:
116                    if val.endswith(target):
117                        val = val[: -len(target)] + replacement
118            ret.append(val)
119    return " and ".join(ret)
120
121
122def make_fraction(
123    num: str,
124    repr: str | None = None,  # noqa: A002
125    *,
126    literal: bool = False,
127    speak_prefix: str = "",
128) -> Fraction:
129    """Return a fraction dataclass for numbers with / in them."""
130    num_str, den_str = num.split("/")
131    # 2-1/2 but not -2 1/2
132    if "-" in num_str and not num_str.startswith("-"):
133        num_str = num_str.replace("-", " ")
134    denominator = int(den_str)
135    # Multiply multi-digit numerator
136    if len(num_str) > 1:
137        numerator = int(num_str[:-1]) * denominator + int(num_str[-1])
138        num = f"{numerator}/{denominator}"
139    else:
140        numerator = int(num_str)
141    value = numerator / denominator
142    unpacked = unpack_fraction(num)
143    spoken = speak_prefix + spoken_number(unpacked, literal=literal)
144    return Fraction(repr or num, value, spoken, numerator, denominator, unpacked)
145
146
147def make_number(
148    num: str | None,
149    repr: str | None = None,  # noqa: A002
150    speak: str | None = None,
151    *,
152    literal: bool = False,
153    special: dict | None = None,
154    m_minus: bool = True,
155) -> Number | Fraction | None:
156    """Return a Number or Fraction dataclass for a number string.
157
158    If literal, spoken string will not convert to hundreds/thousands.
159
160    NOTE: Numerators are assumed to have a single digit. Additional are whole numbers.
161    """
162    if not num or is_unknown(num):
163        return None
164    # Check special
165    with suppress(KeyError):
166        item = (special or {}).get(num) or SPECIAL_NUMBERS[num]
167        if isinstance(item, tuple):
168            value, spoken = item
169        else:
170            value = item
171            spoken = spoken_number(str(value), literal=literal)
172        return Number(repr or num, value, spoken)
173    # Check cardinal direction
174    if num in CARDINALS:
175        if not repr:
176            repr = num  # noqa: A001
177        num = str(CARDINALS[num])
178    val_str = num
179    # Remove unit suffixes
180    if val_str.endswith("SM"):
181        repr = val_str[:]  # noqa: A001
182        val_str = val_str[:-2]
183    # Remove spurious characters from the end
184    num = num.rstrip("M.")
185    num = num.replace("O", "0")
186    num = num.replace("+", "")
187    num = num.replace(",", "")
188    # Handle Minus values with errors like 0M04
189    if m_minus and "M" in num:
190        val_str = num.replace("MM", "-").replace("M", "-")
191        while val_str[0] != "-":
192            val_str = val_str[1:]
193    # Check value prefixes
194    speak_prefix = ""
195    if val_str.startswith("ABV "):
196        speak_prefix += "above "
197        val_str = val_str[4:]
198    if val_str.startswith("BLW "):
199        speak_prefix += "below "
200        val_str = val_str[4:]
201    if val_str.startswith("FL"):
202        speak_prefix += "flight level "
203        val_str, literal = val_str[2:], True
204    if val_str.startswith("M"):
205        speak_prefix += "less than "
206        repr = repr or val_str  # noqa: A001
207        val_str = val_str[1:]
208    if val_str.startswith("P"):
209        speak_prefix += "greater than "
210        repr = repr or val_str  # noqa: A001
211        val_str = val_str[1:]
212    # Create Number
213    if not val_str:
214        return None
215    ret: Number | Fraction | None = None
216    # Create Fraction
217    if "/" in val_str:
218        ret = make_fraction(val_str, repr, literal=literal, speak_prefix=speak_prefix)
219    else:
220        # Overwrite float 0 due to "0.0" literal
221        value = float(val_str) or 0 if "." in num else int(val_str)
222        spoken = speak_prefix + spoken_number(speak or str(value), literal=literal)
223        ret = Number(repr or num, value, spoken)
224    # Null the value if "greater than"/"less than"
225    if ret and not m_minus and repr and repr.startswith(("M", "P")):
226        ret.value = None
227    return ret
228
229
230def find_first_in_list(txt: str, str_list: list[str]) -> int:
231    """Return the index of the earliest occurrence of an item from a list in a string.
232
233    Ex: find_first_in_list('foobar', ['bar', 'fin']) -> 3
234    """
235    start = len(txt) + 1
236    for item in str_list:
237        if start > txt.find(item) > -1:
238            start = txt.find(item)
239    return start if len(txt) + 1 > start > -1 else -1
240
241
242def is_timestamp(item: str) -> bool:
243    """Return True if the item matches the timestamp format."""
244    return len(item) == 7 and item[-1] == "Z" and item[:-1].isdigit()
245
246
247def is_timerange(item: str) -> bool:
248    """Return True if the item is a TAF to-from time range."""
249    return len(item) == 9 and item[4] == "/" and item[:4].isdigit() and item[5:].isdigit()
250
251
252def is_possible_temp(temp: str) -> bool:
253    """Return True if all characters are digits or 'M' for minus."""
254    return all((char.isdigit() or char == "M") for char in temp)
255
256
257_Numeric = int | float
258
259
260def relative_humidity(temperature: _Numeric, dewpoint: _Numeric, unit: str = "C") -> float:
261    """Calculate the relative humidity as a 0 to 1 percentage."""
262
263    def saturation(value: _Numeric) -> float:
264        """Return the saturation vapor pressure without the C constant for humidity calc."""
265        return math.exp((17.67 * value) / (243.5 + value))
266
267    if unit == "F":
268        dewpoint = (dewpoint - 32) * 5 / 9
269        temperature = (temperature - 32) * 5 / 9
270    return saturation(dewpoint) / saturation(temperature)
271
272
273# https://aviation.stackexchange.com/questions/47971/how-do-i-calculate-density-altitude-by-hand
274
275
276def pressure_altitude(pressure: float, altitude: _Numeric, unit: str = "inHg") -> int:
277    """Calculate the pressure altitude in feet. Converts pressure units."""
278    if unit == "hPa":
279        pressure *= 0.02953
280    return round((29.92 - pressure) * 1000 + altitude)
281
282
283def density_altitude(pressure: float, temperature: _Numeric, altitude: _Numeric, units: Units) -> int:
284    """Calculate the density altitude in feet. Converts pressure and temperature units."""
285    if units.temperature == "F":
286        temperature = (temperature - 32) * 5 / 9
287    if units.altimeter == "hPa":
288        pressure *= 0.02953
289    pressure_alt = pressure_altitude(pressure, altitude)
290    standard = 15 - (2 * altitude / 1000)
291    return round(((temperature - standard) * 120) + pressure_alt)
292
293
294def get_station_and_time(
295    data: list[str],
296) -> tuple[list[str], str | None, str | None]:
297    """Return the report list and removed station ident and time strings."""
298    if not data:
299        return data, None, None
300    station = data.pop(0)
301    if not data:
302        return data, station, None
303    q_time, r_time = data[0], None
304    if data and q_time.endswith("Z") and q_time[:-1].isdigit():
305        r_time = data.pop(0)
306    elif data and len(q_time) == 6 and q_time.isdigit():
307        r_time = f"{data.pop(0)}Z"
308    return data, station, r_time
309
310
311def is_wind(text: str) -> bool:
312    """Return True if the text is likely a normal wind element."""
313    # Ignore wind shear
314    if text.startswith("WS"):
315        return False
316    # 09010KT, 09010G15KT
317    if len(text) > 4:
318        for ending in WIND_UNITS:
319            unit_index = text.find(ending)
320            if text.endswith(ending) and text[unit_index - 2 : unit_index].isdigit():
321                return True
322    # 09010  09010G15 VRB10
323    if len(text) != 5 and (len(text) < 8 or "G" not in text or "/" in text):
324        return False
325    return text[:5].isdigit() or (text.startswith("VRB") and text[3:5].isdigit())
326
327
328VARIABLE_DIRECTION_PATTERN = re.compile(r"\d{3}V\d{3}")
329
330
331def is_variable_wind_direction(text: str) -> bool:
332    """Return True if element looks like 350V040."""
333    if len(text) < 7:
334        return False
335    return VARIABLE_DIRECTION_PATTERN.match(text[:7]) is not None
336
337
338def separate_wind(text: str) -> tuple[str, str, str]:
339    """Extract the direction, speed, and gust from a wind element."""
340    direction, speed, gust = "", "", ""
341    # Remove gust
342    if "G" in text:
343        g_index = text.find("G")
344        start, end = g_index + 1, g_index + 3
345        # 16006GP99KT ie gust greater than
346        if "GP" in text:
347            end += 1
348        gust = text[start:end]
349        text = text[:g_index] + text[end:]
350    if text:
351        # 10G18KT
352        if len(text) == 2:
353            speed = text
354        else:
355            direction = text[:3]
356            speed = text[3:]
357    return direction, speed, gust
358
359
360def get_wind(
361    data: list[str], units: Units
362) -> tuple[
363    list[str],
364    Number | None,
365    Number | None,
366    Number | None,
367    list[Number],
368]:
369    """Return the report list, direction string, speed string, gust string, and variable direction list."""
370    direction, speed, gust = "", "", ""
371    variable: list[Number] = []
372    # Remove unit and split elements
373    if data:
374        item = copy(data[0])
375        if is_wind(item):
376            for key, unit in WIND_UNITS.items():
377                if item.endswith(key):
378                    units.wind_speed = unit
379                    item = item.replace(key, "")
380                    break
381            direction, speed, gust = separate_wind(item)
382            data.pop(0)
383    # Separated Gust
384    if data and 1 < len(data[0]) < 4 and data[0][0] == "G" and data[0][1:].isdigit():
385        gust = data.pop(0)[1:]
386    # Variable Wind Direction
387    if data and is_variable_wind_direction(data[0]):
388        for item in data.pop(0).split("V"):
389            value = make_number(item, speak=item, literal=True)
390            if value is not None:
391                variable.append(value)
392    # Convert to Number
393    direction_value = make_number(direction, speak=direction, literal=True)
394    speed_value = make_number(speed.strip("BV"), m_minus=False)
395    gust_value = make_number(gust, m_minus=False)
396    return data, direction_value, speed_value, gust_value, variable
397
398
399def get_visibility(data: list[str], units: Units) -> tuple[list[str], Number | None]:
400    """Return the report list and removed visibility string."""
401    visibility = ""
402    if data:
403        item = copy(data[0])
404        # Vis reported in statue miles
405        if item.endswith("SM"):  # 10SM
406            if item[:-2].isdigit():
407                visibility = str(int(item[:-2]))
408            elif "/" in item:
409                visibility = item[: item.find("SM")]  # 1/2SM
410            else:
411                visibility = item[:-2]
412            data.pop(0)
413            units.visibility = "sm"
414        # Vis reported in meters
415        elif len(item) == 4 and item.isdigit():
416            visibility = data.pop(0)
417            units.visibility = "m"
418        elif 7 >= len(item) >= 5 and item[:4].isdigit() and (item[4] in ["M", "N", "S", "E", "W"] or item[4:] == "NDV"):
419            visibility = data.pop(0)[:4]
420            units.visibility = "m"
421        elif len(item) == 5 and item[1:].isdigit() and item[0] in ["M", "P", "B"]:
422            visibility = data.pop(0)[1:]
423            units.visibility = "m"
424        elif item.endswith("KM"):
425            visibility = f"{item[:-2]}000"
426            data.pop(0)
427            units.visibility = "m"
428        # Vis statute miles but split Ex: 2 1/2SM
429        elif len(data) > 1 and data[1].endswith("SM") and "/" in data[1] and item.isdigit():
430            vis1 = data.pop(0)  # 2
431            vis2 = data.pop(0).replace("SM", "")  # 1/2
432            visibility = str(int(vis1) * int(vis2[2]) + int(vis2[0])) + vis2[1:]  # 5/2
433            units.visibility = "sm"
434    return data, make_number(visibility, m_minus=False)
435
436
437def sanitize_cloud(cloud: str) -> str:
438    """Fix rare cloud layer issues."""
439    if len(cloud) < 4:
440        return cloud
441    if not cloud[3].isdigit() and cloud[3] not in ("/", "-"):
442        # Bad "O": FEWO03 -> FEW003
443        if cloud[3] == "O":
444            cloud = f"{cloud[:3]}0{cloud[4:]}"
445        # Move modifiers to end: BKNC015 -> BKN015C
446        elif cloud[3] != "U" and cloud[:4] not in {"BASE", "UNKN"}:
447            cloud = cloud[:3] + cloud[4:] + cloud[3]
448    return cloud
449
450
451def _null_or_int(val: str | None) -> int | None:
452    """Nullify unknown elements and convert ints."""
453    return None if not isinstance(val, str) or is_unknown(val) else int(val)
454
455
456_TOP_OFFSETS = ("-TOPS", "-TOP")
457
458
459def make_cloud(cloud: str) -> Cloud:
460    """Return a Cloud dataclass for a cloud string.
461
462    This function assumes the input is potentially valid.
463    """
464    raw_cloud = cloud
465    cloud_type = ""
466    base: str | None = None
467    top: str | None = None
468    cloud = sanitize_cloud(cloud).replace("/", "")
469    # Separate top
470    for target in _TOP_OFFSETS:
471        topi = cloud.find(target)
472        if topi > -1:
473            top, cloud = cloud[topi + len(target) :], cloud[:topi]
474            break
475    # Separate type
476    ## BASE027
477    if cloud.startswith("BASES"):
478        cloud = cloud[5:]
479    elif cloud.startswith("BASE"):
480        cloud = cloud[4:]
481    ## VV003
482    elif cloud.startswith("VV"):
483        cloud_type, cloud = cloud[:2], cloud[2:]
484    ## FEW010
485    elif len(cloud) >= 3 and cloud[:3] in CLOUD_LIST:
486        cloud_type, cloud = cloud[:3], cloud[3:]
487    ## BKN-OVC065
488    if len(cloud) > 4 and cloud[0] == "-" and cloud[1:4] in CLOUD_LIST:
489        cloud_type += cloud[:4]
490        cloud = cloud[4:]
491    # Separate base
492    if len(cloud) >= 3 and cloud[:3].isdigit():
493        base, cloud = cloud[:3], cloud[3:]
494    elif len(cloud) >= 4 and cloud[:4] == "UNKN":
495        cloud = cloud[4:]
496    # Remainder is considered modifiers
497    modifier = cloud or None
498    # Make Cloud
499    return Cloud(raw_cloud, cloud_type or None, _null_or_int(base), _null_or_int(top), modifier)
500
501
502def get_clouds(data: list[str]) -> tuple[list[str], list]:
503    """Return the report list and removed list of split cloud layers."""
504    clouds = []
505    for i, item in reversed(list(enumerate(data))):
506        if item[:3] in CLOUD_LIST or item[:2] == "VV":
507            cloud = data.pop(i)
508            clouds.append(make_cloud(cloud))
509    # Attempt cloud sort. Fails if None values are present
510    try:
511        clouds.sort(key=lambda cloud: (cloud.base, cloud.type))
512    except TypeError:
513        clouds.reverse()  # Restores original report order
514    return data, clouds
515
516
517def get_flight_rules(visibility: Number | None, ceiling: Cloud | None) -> int:
518    """Return int based on current flight rules from parsed METAR data.
519
520    0=VFR, 1=MVFR, 2=IFR, 3=LIFR
521
522    Note: Common practice is to report no higher than IFR if visibility unavailable.
523    """
524    # Parse visibility
525    vis: _Numeric
526    if visibility is None:
527        vis = 2
528    elif visibility.repr == "CAVOK" or visibility.repr.startswith("P6"):
529        vis = 10
530    elif visibility.repr.startswith("M"):
531        vis = 0
532    elif visibility.value is None:
533        vis = 2
534    # Convert meters to miles
535    elif len(visibility.repr) == 4:
536        vis = (visibility.value or 0) * 0.000621371
537    else:
538        vis = visibility.value or 0
539    # Parse ceiling
540    cld = (ceiling.base if ceiling else 99) or 99
541    # Determine flight rules
542    if (vis <= 5) or (cld <= 30):
543        if (vis < 3) or (cld < 10):
544            if (vis < 1) or (cld < 5):
545                return 3  # LIFR
546            return 2  # IFR
547        return 1  # MVFR
548    return 0  # VFR
549
550
551def get_ceiling(clouds: list[Cloud]) -> Cloud | None:
552    """Return ceiling layer from Cloud-List or None if none found.
553
554    Assumes that the clouds are already sorted lowest to highest.
555
556    Only 'Broken', 'Overcast', and 'Vertical Visibility' are considered ceilings.
557
558    Prevents errors due to lack of cloud information (eg. '' or 'FEW///')
559    """
560    return next((c for c in clouds if c.base and c.type in {"OVC", "BKN", "VV"}), None)
561
562
563def is_altitude(value: str) -> bool:
564    """Return True if the value is a possible altitude."""
565    if len(value) < 5:
566        return False
567    if value.startswith("SFC/"):
568        return True
569    if value.startswith("FL") and value[2:5].isdigit():
570        return True
571    first, *_ = value.split("/")
572    return bool(first[-2:] == "FT" and first[-5:-2].isdigit())
573
574
575def make_altitude(
576    value: str,
577    units: Units,
578    repr: str | None = None,  # noqa: A002
579    *,
580    force_fl: bool = False,
581) -> tuple[Number | None, Units]:
582    """Convert altitude string into a number."""
583    if not value:
584        return None, units
585    raw = repr or value
586    for end in ("FT", "M"):
587        if value.endswith(end):
588            force_fl = False
589            units.altitude = end.lower()
590            value = value.removesuffix(end)
591    # F430
592    if value[0] == "F" and value[1:].isdigit():
593        value = f"FL{value[1:]}"
594    if force_fl and value[:2] != "FL":
595        value = f"FL{value}"
596    return make_number(value, repr=raw), units
597
598
599def parse_date(
600    date: str,
601    hour_threshold: int = 200,
602    *,
603    time_only: bool = False,
604    target: dt.date | None = None,
605) -> dt.datetime | None:
606    """Parse a report timestamp in ddhhZ or ddhhmmZ format.
607
608    If time_only, assumes hhmm format with current or previous day.
609
610    This function assumes the given timestamp is within the hour threshold from current date.
611    """
612    # Format date string
613    date = date.strip("Z")
614    if not date.isdigit():
615        return None
616    if time_only:
617        if len(date) != 4:
618            return None
619        index_hour = 0
620    else:
621        if len(date) == 4:
622            date += "00"
623        if len(date) != 6:
624            return None
625        index_hour = 2
626    # Create initial guess
627    if target:
628        target = dt.datetime(target.year, target.month, target.day, tzinfo=dt.timezone.utc)
629    else:
630        target = dt.datetime.now(tz=dt.timezone.utc)
631    day = target.day if time_only else int(date[:2])
632    hour = int(date[index_hour : index_hour + 2])
633    # Handle situation where next month has less days than current month
634    # Shifted value makes sure that a month shift doesn't happen twice
635    shifted = False
636    if day > monthrange(target.year, target.month)[1]:
637        target += relativedelta(months=-1)
638        shifted = True
639    try:
640        guess = target.replace(
641            day=day,
642            hour=hour % 24,
643            minute=int(date[index_hour + 2 : index_hour + 4]) % 60,
644            second=0,
645            microsecond=0,
646        )
647    except ValueError:
648        return None
649    # Handle overflow hour
650    if hour > 23:
651        guess += dt.timedelta(days=1)
652    # Handle changing months if not already shifted
653    if not shifted:
654        hourdiff = (guess - target) / dt.timedelta(minutes=1) / 60
655        if hourdiff > hour_threshold:
656            guess += relativedelta(months=-1)
657        elif hourdiff < -hour_threshold:
658            guess += relativedelta(months=+1)
659    return guess
660
661
662def make_timestamp(
663    timestamp: str | None,
664    *,
665    time_only: bool = False,
666    target_date: dt.date | None = None,
667) -> Timestamp | None:
668    """Return a Timestamp dataclass for a report timestamp in ddhhZ or ddhhmmZ format."""
669    if not timestamp:
670        return None
671    date_obj = parse_date(timestamp, time_only=time_only, target=target_date)
672    return Timestamp(timestamp, date_obj)
673
674
675def is_runway_visibility(item: str) -> bool:
676    """Return True if the item is a runway visibility range string."""
677    return (
678        len(item) > 4
679        and item[0] == "R"
680        and (item[3] == "/" or item[4] == "/")
681        and item[1:3].isdigit()
682        and "CLRD" not in item  # R28/CLRD70 Runway State
683    )
def dedupe( items: Iterable[typing.Any], *, only_neighbors: bool = False) -> list[typing.Any]:
33def dedupe(items: Iterable[Any], *, only_neighbors: bool = False) -> list[Any]:
34    """Deduplicate a list while keeping order.
35
36    If only_neighbors is True, dedupe will only check neighboring values.
37    """
38    ret: list[Any] = []
39    for item in items:
40        if (only_neighbors and ret and ret[-1] != item) or item not in ret:
41            ret.append(item)
42    return ret

Deduplicate a list while keeping order.

If only_neighbors is True, dedupe will only check neighboring values.

def is_unknown(value: str) -> bool:
45def is_unknown(value: str) -> bool:
46    """Return True if val represents and unknown value."""
47    if not isinstance(value, str):
48        raise TypeError
49    if not value or value.upper() in {"UNKN", "UNK", "UKN"}:
50        return True
51    for char in value:
52        if char not in ("/", "X", "."):
53            break
54    else:
55        return True
56    return False

Return True if val represents and unknown value.

def get_digit_list(data: list[str], from_index: int) -> tuple[list[str], list[str]]:
59def get_digit_list(data: list[str], from_index: int) -> tuple[list[str], list[str]]:
60    """Return a list of items removed from a given list of strings
61    that are all digits from 'from_index' until hitting a non-digit item.
62    """
63    ret = []
64    data.pop(from_index)
65    while len(data) > from_index and data[from_index].isdigit():
66        ret.append(data.pop(from_index))
67    return data, ret

Return a list of items removed from a given list of strings that are all digits from 'from_index' until hitting a non-digit item.

def unpack_fraction(num: str) -> str:
70def unpack_fraction(num: str) -> str:
71    """Return unpacked fraction string 5/2 -> 2 1/2."""
72    numbers = [int(n) for n in num.split("/") if n]
73    if len(numbers) != 2 or numbers[0] <= numbers[1]:
74        return num
75    numerator, denominator = numbers
76    over = numerator // denominator
77    rem = numerator % denominator
78    return f"{over} {rem}/{denominator}"

Return unpacked fraction string 5/2 -> 2 1/2.

def remove_leading_zeros(num: str) -> str:
81def remove_leading_zeros(num: str) -> str:
82    """Strip zeros while handling -, M, and empty strings."""
83    if not num:
84        return num
85    if num.startswith("M"):
86        ret = "M" + num[1:].lstrip("0")
87    elif num.startswith("-"):
88        ret = "-" + num[1:].lstrip("0")
89    else:
90        ret = num.lstrip("0")
91    return "0" if ret in ("", "M", "-") else ret

Strip zeros while handling -, M, and empty strings.

SPOKEN_POSTFIX = ((' zero zero zero', ' thousand'), (' zero zero', ' hundred'))
def spoken_number(num: str, *, literal: bool = False) -> str:
100def spoken_number(num: str, *, literal: bool = False) -> str:
101    """Return the spoken version of a number.
102
103    If literal, no conversion to hundreds/thousands
104
105    Ex: 1.2 -> one point two
106        1 1/2 -> one and one half
107        25000 -> two five thousand
108    """
109    ret = []
110    for part in num.split():
111        if part in FRACTIONS:
112            ret.append(FRACTIONS[part])
113        else:
114            val = " ".join(NUMBER_REPL[char] for char in part if char in NUMBER_REPL)
115            if not literal:
116                for target, replacement in SPOKEN_POSTFIX:
117                    if val.endswith(target):
118                        val = val[: -len(target)] + replacement
119            ret.append(val)
120    return " and ".join(ret)

Return the spoken version of a number.

If literal, no conversion to hundreds/thousands

Ex: 1.2 -> one point two 1 1/2 -> one and one half 25000 -> two five thousand

def make_fraction( num: str, repr: str | None = None, *, literal: bool = False, speak_prefix: str = '') -> avwx.structs.Fraction:
123def make_fraction(
124    num: str,
125    repr: str | None = None,  # noqa: A002
126    *,
127    literal: bool = False,
128    speak_prefix: str = "",
129) -> Fraction:
130    """Return a fraction dataclass for numbers with / in them."""
131    num_str, den_str = num.split("/")
132    # 2-1/2 but not -2 1/2
133    if "-" in num_str and not num_str.startswith("-"):
134        num_str = num_str.replace("-", " ")
135    denominator = int(den_str)
136    # Multiply multi-digit numerator
137    if len(num_str) > 1:
138        numerator = int(num_str[:-1]) * denominator + int(num_str[-1])
139        num = f"{numerator}/{denominator}"
140    else:
141        numerator = int(num_str)
142    value = numerator / denominator
143    unpacked = unpack_fraction(num)
144    spoken = speak_prefix + spoken_number(unpacked, literal=literal)
145    return Fraction(repr or num, value, spoken, numerator, denominator, unpacked)

Return a fraction dataclass for numbers with / in them.

def make_number( num: str | None, repr: str | None = None, speak: str | None = None, *, literal: bool = False, special: dict | None = None, m_minus: bool = True) -> avwx.structs.Number | avwx.structs.Fraction | None:
148def make_number(
149    num: str | None,
150    repr: str | None = None,  # noqa: A002
151    speak: str | None = None,
152    *,
153    literal: bool = False,
154    special: dict | None = None,
155    m_minus: bool = True,
156) -> Number | Fraction | None:
157    """Return a Number or Fraction dataclass for a number string.
158
159    If literal, spoken string will not convert to hundreds/thousands.
160
161    NOTE: Numerators are assumed to have a single digit. Additional are whole numbers.
162    """
163    if not num or is_unknown(num):
164        return None
165    # Check special
166    with suppress(KeyError):
167        item = (special or {}).get(num) or SPECIAL_NUMBERS[num]
168        if isinstance(item, tuple):
169            value, spoken = item
170        else:
171            value = item
172            spoken = spoken_number(str(value), literal=literal)
173        return Number(repr or num, value, spoken)
174    # Check cardinal direction
175    if num in CARDINALS:
176        if not repr:
177            repr = num  # noqa: A001
178        num = str(CARDINALS[num])
179    val_str = num
180    # Remove unit suffixes
181    if val_str.endswith("SM"):
182        repr = val_str[:]  # noqa: A001
183        val_str = val_str[:-2]
184    # Remove spurious characters from the end
185    num = num.rstrip("M.")
186    num = num.replace("O", "0")
187    num = num.replace("+", "")
188    num = num.replace(",", "")
189    # Handle Minus values with errors like 0M04
190    if m_minus and "M" in num:
191        val_str = num.replace("MM", "-").replace("M", "-")
192        while val_str[0] != "-":
193            val_str = val_str[1:]
194    # Check value prefixes
195    speak_prefix = ""
196    if val_str.startswith("ABV "):
197        speak_prefix += "above "
198        val_str = val_str[4:]
199    if val_str.startswith("BLW "):
200        speak_prefix += "below "
201        val_str = val_str[4:]
202    if val_str.startswith("FL"):
203        speak_prefix += "flight level "
204        val_str, literal = val_str[2:], True
205    if val_str.startswith("M"):
206        speak_prefix += "less than "
207        repr = repr or val_str  # noqa: A001
208        val_str = val_str[1:]
209    if val_str.startswith("P"):
210        speak_prefix += "greater than "
211        repr = repr or val_str  # noqa: A001
212        val_str = val_str[1:]
213    # Create Number
214    if not val_str:
215        return None
216    ret: Number | Fraction | None = None
217    # Create Fraction
218    if "/" in val_str:
219        ret = make_fraction(val_str, repr, literal=literal, speak_prefix=speak_prefix)
220    else:
221        # Overwrite float 0 due to "0.0" literal
222        value = float(val_str) or 0 if "." in num else int(val_str)
223        spoken = speak_prefix + spoken_number(speak or str(value), literal=literal)
224        ret = Number(repr or num, value, spoken)
225    # Null the value if "greater than"/"less than"
226    if ret and not m_minus and repr and repr.startswith(("M", "P")):
227        ret.value = None
228    return ret

Return a Number or Fraction dataclass for a number string.

If literal, spoken string will not convert to hundreds/thousands.

NOTE: Numerators are assumed to have a single digit. Additional are whole numbers.

def find_first_in_list(txt: str, str_list: list[str]) -> int:
231def find_first_in_list(txt: str, str_list: list[str]) -> int:
232    """Return the index of the earliest occurrence of an item from a list in a string.
233
234    Ex: find_first_in_list('foobar', ['bar', 'fin']) -> 3
235    """
236    start = len(txt) + 1
237    for item in str_list:
238        if start > txt.find(item) > -1:
239            start = txt.find(item)
240    return start if len(txt) + 1 > start > -1 else -1

Return the index of the earliest occurrence of an item from a list in a string.

Ex: find_first_in_list('foobar', ['bar', 'fin']) -> 3

def is_timestamp(item: str) -> bool:
243def is_timestamp(item: str) -> bool:
244    """Return True if the item matches the timestamp format."""
245    return len(item) == 7 and item[-1] == "Z" and item[:-1].isdigit()

Return True if the item matches the timestamp format.

def is_timerange(item: str) -> bool:
248def is_timerange(item: str) -> bool:
249    """Return True if the item is a TAF to-from time range."""
250    return len(item) == 9 and item[4] == "/" and item[:4].isdigit() and item[5:].isdigit()

Return True if the item is a TAF to-from time range.

def is_possible_temp(temp: str) -> bool:
253def is_possible_temp(temp: str) -> bool:
254    """Return True if all characters are digits or 'M' for minus."""
255    return all((char.isdigit() or char == "M") for char in temp)

Return True if all characters are digits or 'M' for minus.

def relative_humidity( temperature: int | float, dewpoint: int | float, unit: str = 'C') -> float:
261def relative_humidity(temperature: _Numeric, dewpoint: _Numeric, unit: str = "C") -> float:
262    """Calculate the relative humidity as a 0 to 1 percentage."""
263
264    def saturation(value: _Numeric) -> float:
265        """Return the saturation vapor pressure without the C constant for humidity calc."""
266        return math.exp((17.67 * value) / (243.5 + value))
267
268    if unit == "F":
269        dewpoint = (dewpoint - 32) * 5 / 9
270        temperature = (temperature - 32) * 5 / 9
271    return saturation(dewpoint) / saturation(temperature)

Calculate the relative humidity as a 0 to 1 percentage.

def pressure_altitude(pressure: float, altitude: int | float, unit: str = 'inHg') -> int:
277def pressure_altitude(pressure: float, altitude: _Numeric, unit: str = "inHg") -> int:
278    """Calculate the pressure altitude in feet. Converts pressure units."""
279    if unit == "hPa":
280        pressure *= 0.02953
281    return round((29.92 - pressure) * 1000 + altitude)

Calculate the pressure altitude in feet. Converts pressure units.

def density_altitude( pressure: float, temperature: int | float, altitude: int | float, units: avwx.structs.Units) -> int:
284def density_altitude(pressure: float, temperature: _Numeric, altitude: _Numeric, units: Units) -> int:
285    """Calculate the density altitude in feet. Converts pressure and temperature units."""
286    if units.temperature == "F":
287        temperature = (temperature - 32) * 5 / 9
288    if units.altimeter == "hPa":
289        pressure *= 0.02953
290    pressure_alt = pressure_altitude(pressure, altitude)
291    standard = 15 - (2 * altitude / 1000)
292    return round(((temperature - standard) * 120) + pressure_alt)

Calculate the density altitude in feet. Converts pressure and temperature units.

def get_station_and_time(data: list[str]) -> tuple[list[str], str | None, str | None]:
295def get_station_and_time(
296    data: list[str],
297) -> tuple[list[str], str | None, str | None]:
298    """Return the report list and removed station ident and time strings."""
299    if not data:
300        return data, None, None
301    station = data.pop(0)
302    if not data:
303        return data, station, None
304    q_time, r_time = data[0], None
305    if data and q_time.endswith("Z") and q_time[:-1].isdigit():
306        r_time = data.pop(0)
307    elif data and len(q_time) == 6 and q_time.isdigit():
308        r_time = f"{data.pop(0)}Z"
309    return data, station, r_time

Return the report list and removed station ident and time strings.

def is_wind(text: str) -> bool:
312def is_wind(text: str) -> bool:
313    """Return True if the text is likely a normal wind element."""
314    # Ignore wind shear
315    if text.startswith("WS"):
316        return False
317    # 09010KT, 09010G15KT
318    if len(text) > 4:
319        for ending in WIND_UNITS:
320            unit_index = text.find(ending)
321            if text.endswith(ending) and text[unit_index - 2 : unit_index].isdigit():
322                return True
323    # 09010  09010G15 VRB10
324    if len(text) != 5 and (len(text) < 8 or "G" not in text or "/" in text):
325        return False
326    return text[:5].isdigit() or (text.startswith("VRB") and text[3:5].isdigit())

Return True if the text is likely a normal wind element.

VARIABLE_DIRECTION_PATTERN = re.compile('\\d{3}V\\d{3}')
def is_variable_wind_direction(text: str) -> bool:
332def is_variable_wind_direction(text: str) -> bool:
333    """Return True if element looks like 350V040."""
334    if len(text) < 7:
335        return False
336    return VARIABLE_DIRECTION_PATTERN.match(text[:7]) is not None

Return True if element looks like 350V040.

def separate_wind(text: str) -> tuple[str, str, str]:
339def separate_wind(text: str) -> tuple[str, str, str]:
340    """Extract the direction, speed, and gust from a wind element."""
341    direction, speed, gust = "", "", ""
342    # Remove gust
343    if "G" in text:
344        g_index = text.find("G")
345        start, end = g_index + 1, g_index + 3
346        # 16006GP99KT ie gust greater than
347        if "GP" in text:
348            end += 1
349        gust = text[start:end]
350        text = text[:g_index] + text[end:]
351    if text:
352        # 10G18KT
353        if len(text) == 2:
354            speed = text
355        else:
356            direction = text[:3]
357            speed = text[3:]
358    return direction, speed, gust

Extract the direction, speed, and gust from a wind element.

def get_wind( data: list[str], units: avwx.structs.Units) -> tuple[list[str], avwx.structs.Number | None, avwx.structs.Number | None, avwx.structs.Number | None, list[avwx.structs.Number]]:
361def get_wind(
362    data: list[str], units: Units
363) -> tuple[
364    list[str],
365    Number | None,
366    Number | None,
367    Number | None,
368    list[Number],
369]:
370    """Return the report list, direction string, speed string, gust string, and variable direction list."""
371    direction, speed, gust = "", "", ""
372    variable: list[Number] = []
373    # Remove unit and split elements
374    if data:
375        item = copy(data[0])
376        if is_wind(item):
377            for key, unit in WIND_UNITS.items():
378                if item.endswith(key):
379                    units.wind_speed = unit
380                    item = item.replace(key, "")
381                    break
382            direction, speed, gust = separate_wind(item)
383            data.pop(0)
384    # Separated Gust
385    if data and 1 < len(data[0]) < 4 and data[0][0] == "G" and data[0][1:].isdigit():
386        gust = data.pop(0)[1:]
387    # Variable Wind Direction
388    if data and is_variable_wind_direction(data[0]):
389        for item in data.pop(0).split("V"):
390            value = make_number(item, speak=item, literal=True)
391            if value is not None:
392                variable.append(value)
393    # Convert to Number
394    direction_value = make_number(direction, speak=direction, literal=True)
395    speed_value = make_number(speed.strip("BV"), m_minus=False)
396    gust_value = make_number(gust, m_minus=False)
397    return data, direction_value, speed_value, gust_value, variable

Return the report list, direction string, speed string, gust string, and variable direction list.

def get_visibility( data: list[str], units: avwx.structs.Units) -> tuple[list[str], avwx.structs.Number | None]:
400def get_visibility(data: list[str], units: Units) -> tuple[list[str], Number | None]:
401    """Return the report list and removed visibility string."""
402    visibility = ""
403    if data:
404        item = copy(data[0])
405        # Vis reported in statue miles
406        if item.endswith("SM"):  # 10SM
407            if item[:-2].isdigit():
408                visibility = str(int(item[:-2]))
409            elif "/" in item:
410                visibility = item[: item.find("SM")]  # 1/2SM
411            else:
412                visibility = item[:-2]
413            data.pop(0)
414            units.visibility = "sm"
415        # Vis reported in meters
416        elif len(item) == 4 and item.isdigit():
417            visibility = data.pop(0)
418            units.visibility = "m"
419        elif 7 >= len(item) >= 5 and item[:4].isdigit() and (item[4] in ["M", "N", "S", "E", "W"] or item[4:] == "NDV"):
420            visibility = data.pop(0)[:4]
421            units.visibility = "m"
422        elif len(item) == 5 and item[1:].isdigit() and item[0] in ["M", "P", "B"]:
423            visibility = data.pop(0)[1:]
424            units.visibility = "m"
425        elif item.endswith("KM"):
426            visibility = f"{item[:-2]}000"
427            data.pop(0)
428            units.visibility = "m"
429        # Vis statute miles but split Ex: 2 1/2SM
430        elif len(data) > 1 and data[1].endswith("SM") and "/" in data[1] and item.isdigit():
431            vis1 = data.pop(0)  # 2
432            vis2 = data.pop(0).replace("SM", "")  # 1/2
433            visibility = str(int(vis1) * int(vis2[2]) + int(vis2[0])) + vis2[1:]  # 5/2
434            units.visibility = "sm"
435    return data, make_number(visibility, m_minus=False)

Return the report list and removed visibility string.

def sanitize_cloud(cloud: str) -> str:
438def sanitize_cloud(cloud: str) -> str:
439    """Fix rare cloud layer issues."""
440    if len(cloud) < 4:
441        return cloud
442    if not cloud[3].isdigit() and cloud[3] not in ("/", "-"):
443        # Bad "O": FEWO03 -> FEW003
444        if cloud[3] == "O":
445            cloud = f"{cloud[:3]}0{cloud[4:]}"
446        # Move modifiers to end: BKNC015 -> BKN015C
447        elif cloud[3] != "U" and cloud[:4] not in {"BASE", "UNKN"}:
448            cloud = cloud[:3] + cloud[4:] + cloud[3]
449    return cloud

Fix rare cloud layer issues.

def make_cloud(cloud: str) -> avwx.structs.Cloud:
460def make_cloud(cloud: str) -> Cloud:
461    """Return a Cloud dataclass for a cloud string.
462
463    This function assumes the input is potentially valid.
464    """
465    raw_cloud = cloud
466    cloud_type = ""
467    base: str | None = None
468    top: str | None = None
469    cloud = sanitize_cloud(cloud).replace("/", "")
470    # Separate top
471    for target in _TOP_OFFSETS:
472        topi = cloud.find(target)
473        if topi > -1:
474            top, cloud = cloud[topi + len(target) :], cloud[:topi]
475            break
476    # Separate type
477    ## BASE027
478    if cloud.startswith("BASES"):
479        cloud = cloud[5:]
480    elif cloud.startswith("BASE"):
481        cloud = cloud[4:]
482    ## VV003
483    elif cloud.startswith("VV"):
484        cloud_type, cloud = cloud[:2], cloud[2:]
485    ## FEW010
486    elif len(cloud) >= 3 and cloud[:3] in CLOUD_LIST:
487        cloud_type, cloud = cloud[:3], cloud[3:]
488    ## BKN-OVC065
489    if len(cloud) > 4 and cloud[0] == "-" and cloud[1:4] in CLOUD_LIST:
490        cloud_type += cloud[:4]
491        cloud = cloud[4:]
492    # Separate base
493    if len(cloud) >= 3 and cloud[:3].isdigit():
494        base, cloud = cloud[:3], cloud[3:]
495    elif len(cloud) >= 4 and cloud[:4] == "UNKN":
496        cloud = cloud[4:]
497    # Remainder is considered modifiers
498    modifier = cloud or None
499    # Make Cloud
500    return Cloud(raw_cloud, cloud_type or None, _null_or_int(base), _null_or_int(top), modifier)

Return a Cloud dataclass for a cloud string.

This function assumes the input is potentially valid.

def get_clouds(data: list[str]) -> tuple[list[str], list]:
503def get_clouds(data: list[str]) -> tuple[list[str], list]:
504    """Return the report list and removed list of split cloud layers."""
505    clouds = []
506    for i, item in reversed(list(enumerate(data))):
507        if item[:3] in CLOUD_LIST or item[:2] == "VV":
508            cloud = data.pop(i)
509            clouds.append(make_cloud(cloud))
510    # Attempt cloud sort. Fails if None values are present
511    try:
512        clouds.sort(key=lambda cloud: (cloud.base, cloud.type))
513    except TypeError:
514        clouds.reverse()  # Restores original report order
515    return data, clouds

Return the report list and removed list of split cloud layers.

def get_flight_rules( visibility: avwx.structs.Number | None, ceiling: avwx.structs.Cloud | None) -> int:
518def get_flight_rules(visibility: Number | None, ceiling: Cloud | None) -> int:
519    """Return int based on current flight rules from parsed METAR data.
520
521    0=VFR, 1=MVFR, 2=IFR, 3=LIFR
522
523    Note: Common practice is to report no higher than IFR if visibility unavailable.
524    """
525    # Parse visibility
526    vis: _Numeric
527    if visibility is None:
528        vis = 2
529    elif visibility.repr == "CAVOK" or visibility.repr.startswith("P6"):
530        vis = 10
531    elif visibility.repr.startswith("M"):
532        vis = 0
533    elif visibility.value is None:
534        vis = 2
535    # Convert meters to miles
536    elif len(visibility.repr) == 4:
537        vis = (visibility.value or 0) * 0.000621371
538    else:
539        vis = visibility.value or 0
540    # Parse ceiling
541    cld = (ceiling.base if ceiling else 99) or 99
542    # Determine flight rules
543    if (vis <= 5) or (cld <= 30):
544        if (vis < 3) or (cld < 10):
545            if (vis < 1) or (cld < 5):
546                return 3  # LIFR
547            return 2  # IFR
548        return 1  # MVFR
549    return 0  # VFR

Return int based on current flight rules from parsed METAR data.

0=VFR, 1=MVFR, 2=IFR, 3=LIFR

Note: Common practice is to report no higher than IFR if visibility unavailable.

def get_ceiling(clouds: list[avwx.structs.Cloud]) -> avwx.structs.Cloud | None:
552def get_ceiling(clouds: list[Cloud]) -> Cloud | None:
553    """Return ceiling layer from Cloud-List or None if none found.
554
555    Assumes that the clouds are already sorted lowest to highest.
556
557    Only 'Broken', 'Overcast', and 'Vertical Visibility' are considered ceilings.
558
559    Prevents errors due to lack of cloud information (eg. '' or 'FEW///')
560    """
561    return next((c for c in clouds if c.base and c.type in {"OVC", "BKN", "VV"}), None)

Return ceiling layer from Cloud-List or None if none found.

Assumes that the clouds are already sorted lowest to highest.

Only 'Broken', 'Overcast', and 'Vertical Visibility' are considered ceilings.

Prevents errors due to lack of cloud information (eg. '' or 'FEW///')

def is_altitude(value: str) -> bool:
564def is_altitude(value: str) -> bool:
565    """Return True if the value is a possible altitude."""
566    if len(value) < 5:
567        return False
568    if value.startswith("SFC/"):
569        return True
570    if value.startswith("FL") and value[2:5].isdigit():
571        return True
572    first, *_ = value.split("/")
573    return bool(first[-2:] == "FT" and first[-5:-2].isdigit())

Return True if the value is a possible altitude.

def make_altitude( value: str, units: avwx.structs.Units, repr: str | None = None, *, force_fl: bool = False) -> tuple[avwx.structs.Number | None, avwx.structs.Units]:
576def make_altitude(
577    value: str,
578    units: Units,
579    repr: str | None = None,  # noqa: A002
580    *,
581    force_fl: bool = False,
582) -> tuple[Number | None, Units]:
583    """Convert altitude string into a number."""
584    if not value:
585        return None, units
586    raw = repr or value
587    for end in ("FT", "M"):
588        if value.endswith(end):
589            force_fl = False
590            units.altitude = end.lower()
591            value = value.removesuffix(end)
592    # F430
593    if value[0] == "F" and value[1:].isdigit():
594        value = f"FL{value[1:]}"
595    if force_fl and value[:2] != "FL":
596        value = f"FL{value}"
597    return make_number(value, repr=raw), units

Convert altitude string into a number.

def parse_date( date: str, hour_threshold: int = 200, *, time_only: bool = False, target: datetime.date | None = None) -> datetime.datetime | None:
600def parse_date(
601    date: str,
602    hour_threshold: int = 200,
603    *,
604    time_only: bool = False,
605    target: dt.date | None = None,
606) -> dt.datetime | None:
607    """Parse a report timestamp in ddhhZ or ddhhmmZ format.
608
609    If time_only, assumes hhmm format with current or previous day.
610
611    This function assumes the given timestamp is within the hour threshold from current date.
612    """
613    # Format date string
614    date = date.strip("Z")
615    if not date.isdigit():
616        return None
617    if time_only:
618        if len(date) != 4:
619            return None
620        index_hour = 0
621    else:
622        if len(date) == 4:
623            date += "00"
624        if len(date) != 6:
625            return None
626        index_hour = 2
627    # Create initial guess
628    if target:
629        target = dt.datetime(target.year, target.month, target.day, tzinfo=dt.timezone.utc)
630    else:
631        target = dt.datetime.now(tz=dt.timezone.utc)
632    day = target.day if time_only else int(date[:2])
633    hour = int(date[index_hour : index_hour + 2])
634    # Handle situation where next month has less days than current month
635    # Shifted value makes sure that a month shift doesn't happen twice
636    shifted = False
637    if day > monthrange(target.year, target.month)[1]:
638        target += relativedelta(months=-1)
639        shifted = True
640    try:
641        guess = target.replace(
642            day=day,
643            hour=hour % 24,
644            minute=int(date[index_hour + 2 : index_hour + 4]) % 60,
645            second=0,
646            microsecond=0,
647        )
648    except ValueError:
649        return None
650    # Handle overflow hour
651    if hour > 23:
652        guess += dt.timedelta(days=1)
653    # Handle changing months if not already shifted
654    if not shifted:
655        hourdiff = (guess - target) / dt.timedelta(minutes=1) / 60
656        if hourdiff > hour_threshold:
657            guess += relativedelta(months=-1)
658        elif hourdiff < -hour_threshold:
659            guess += relativedelta(months=+1)
660    return guess

Parse a report timestamp in ddhhZ or ddhhmmZ format.

If time_only, assumes hhmm format with current or previous day.

This function assumes the given timestamp is within the hour threshold from current date.

def make_timestamp( timestamp: str | None, *, time_only: bool = False, target_date: datetime.date | None = None) -> avwx.structs.Timestamp | None:
663def make_timestamp(
664    timestamp: str | None,
665    *,
666    time_only: bool = False,
667    target_date: dt.date | None = None,
668) -> Timestamp | None:
669    """Return a Timestamp dataclass for a report timestamp in ddhhZ or ddhhmmZ format."""
670    if not timestamp:
671        return None
672    date_obj = parse_date(timestamp, time_only=time_only, target=target_date)
673    return Timestamp(timestamp, date_obj)

Return a Timestamp dataclass for a report timestamp in ddhhZ or ddhhmmZ format.

def is_runway_visibility(item: str) -> bool:
676def is_runway_visibility(item: str) -> bool:
677    """Return True if the item is a runway visibility range string."""
678    return (
679        len(item) > 4
680        and item[0] == "R"
681        and (item[3] == "/" or item[4] == "/")
682        and item[1:3].isdigit()
683        and "CLRD" not in item  # R28/CLRD70 Runway State
684    )

Return True if the item is a runway visibility range string.