avwx.parsing.core

Contains the core parsing and indent functions of avwx.

  1"""Contains the core parsing and indent functions of avwx."""
  2
  3# stdlib
  4from __future__ import annotations
  5
  6import datetime as dt
  7import math
  8import re
  9from calendar import monthrange
 10from contextlib import suppress
 11from copy import copy
 12from typing import TYPE_CHECKING, Any
 13
 14# library
 15from dateutil.relativedelta import relativedelta
 16
 17# module
 18from avwx.static.core import (
 19    CARDINALS,
 20    CLOUD_LIST,
 21    FRACTIONS,
 22    NUMBER_REPL,
 23    SPECIAL_NUMBERS,
 24    WIND_UNITS,
 25)
 26from avwx.structs import Cloud, Fraction, Number, Timestamp, Units
 27
 28if TYPE_CHECKING:
 29    from collections.abc import Iterable
 30
 31
 32def dedupe(items: Iterable[Any], *, only_neighbors: bool = False) -> list[Any]:
 33    """Deduplicate a list while keeping order.
 34
 35    If only_neighbors is True, dedupe will only check neighboring values.
 36    """
 37    ret: list[Any] = []
 38    for item in items:
 39        if (only_neighbors and ret and ret[-1] != item) or item not in ret:
 40            ret.append(item)
 41    return ret
 42
 43
 44def is_unknown(value: str) -> bool:
 45    """Return True if val represents and unknown value."""
 46    if not isinstance(value, str):
 47        raise TypeError
 48    if not value or value.upper() in {"UNKN", "UNK", "UKN"}:
 49        return True
 50    for char in value:
 51        if char not in ("/", "X", "."):
 52            break
 53    else:
 54        return True
 55    return False
 56
 57
 58def get_digit_list(data: list[str], from_index: int) -> tuple[list[str], list[str]]:
 59    """Return a list of items removed from a given list of strings
 60    that are all digits from 'from_index' until hitting a non-digit item.
 61    """
 62    ret = []
 63    data.pop(from_index)
 64    while len(data) > from_index and data[from_index].isdigit():
 65        ret.append(data.pop(from_index))
 66    return data, ret
 67
 68
 69def unpack_fraction(num: str) -> str:
 70    """Return unpacked fraction string 5/2 -> 2 1/2."""
 71    numbers = [int(n) for n in num.split("/") if n]
 72    if len(numbers) != 2 or numbers[0] <= numbers[1]:
 73        return num
 74    numerator, denominator = numbers
 75    over = numerator // denominator
 76    rem = numerator % denominator
 77    return f"{over} {rem}/{denominator}"
 78
 79
 80def remove_leading_zeros(num: str) -> str:
 81    """Strip zeros while handling -, M, and empty strings."""
 82    if not num:
 83        return num
 84    if num.startswith("M"):
 85        ret = "M" + num[1:].lstrip("0")
 86    elif num.startswith("-"):
 87        ret = "-" + num[1:].lstrip("0")
 88    else:
 89        ret = num.lstrip("0")
 90    return "0" if ret in ("", "M", "-") else ret
 91
 92
 93SPOKEN_POSTFIX = (
 94    (" zero zero zero", " thousand"),
 95    (" zero zero", " hundred"),
 96)
 97
 98
 99def spoken_number(num: str, *, literal: bool = False) -> str:
100    """Return the spoken version of a number.
101
102    If literal, no conversion to hundreds/thousands
103
104    Ex: 1.2 -> one point two
105        1 1/2 -> one and one half
106        25000 -> two five thousand
107    """
108    ret = []
109    for part in num.split():
110        if part in FRACTIONS:
111            ret.append(FRACTIONS[part])
112        else:
113            val = " ".join(NUMBER_REPL[char] for char in part if char in NUMBER_REPL)
114            if not literal:
115                for target, replacement in SPOKEN_POSTFIX:
116                    if val.endswith(target):
117                        val = val[: -len(target)] + replacement
118            ret.append(val)
119    return " and ".join(ret)
120
121
122def make_fraction(
123    num: str,
124    repr: str | None = None,  # noqa: A002
125    *,
126    literal: bool = False,
127    speak_prefix: str = "",
128) -> Fraction:
129    """Return a fraction dataclass for numbers with / in them."""
130    num_str, den_str = num.split("/")
131    # 2-1/2 but not -2 1/2
132    if "-" in num_str and not num_str.startswith("-"):
133        num_str = num_str.replace("-", " ")
134    denominator = int(den_str)
135    # Multiply multi-digit numerator
136    if len(num_str) > 1:
137        numerator = int(num_str[:-1]) * denominator + int(num_str[-1])
138        num = f"{numerator}/{denominator}"
139    else:
140        numerator = int(num_str)
141    value = numerator / denominator
142    unpacked = unpack_fraction(num)
143    spoken = speak_prefix + spoken_number(unpacked, literal=literal)
144    return Fraction(repr or num, value, spoken, numerator, denominator, unpacked)
145
146
147def make_number(
148    num: str | None,
149    repr: str | None = None,  # noqa: A002
150    speak: str | None = None,
151    *,
152    literal: bool = False,
153    special: dict | None = None,
154    m_minus: bool = True,
155) -> Number | Fraction | None:
156    """Return a Number or Fraction dataclass for a number string.
157
158    If literal, spoken string will not convert to hundreds/thousands.
159
160    NOTE: Numerators are assumed to have a single digit. Additional are whole numbers.
161    """
162    if not num or is_unknown(num):
163        return None
164    # Check special
165    with suppress(KeyError):
166        item = (special or {}).get(num) or SPECIAL_NUMBERS[num]
167        if isinstance(item, tuple):
168            value, spoken = item
169        else:
170            value = item
171            spoken = spoken_number(str(value), literal=literal)
172        return Number(repr or num, value, spoken)
173    # Check cardinal direction
174    if num in CARDINALS:
175        if not repr:
176            repr = num  # noqa: A001
177        num = str(CARDINALS[num])
178    val_str = num
179    # Remove unit suffixes
180    if val_str.endswith("SM"):
181        repr = val_str[:]  # noqa: A001
182        val_str = val_str[:-2]
183    # Remove spurious characters from the end
184    num = num.rstrip("M.")
185    num = num.replace("O", "0")
186    num = num.replace("+", "")
187    num = num.replace(",", "")
188    # Handle Minus values with errors like 0M04
189    if m_minus and "M" in num:
190        val_str = num.replace("MM", "-").replace("M", "-")
191        while val_str[0] != "-":
192            val_str = val_str[1:]
193    # Check value prefixes
194    speak_prefix = ""
195    if val_str.startswith("ABV "):
196        speak_prefix += "above "
197        val_str = val_str[4:]
198    if val_str.startswith("BLW "):
199        speak_prefix += "below "
200        val_str = val_str[4:]
201    if val_str.startswith("FL"):
202        speak_prefix += "flight level "
203        val_str, literal = val_str[2:], True
204    if val_str.startswith("M"):
205        speak_prefix += "less than "
206        repr = repr or val_str  # noqa: A001
207        val_str = val_str[1:]
208    if val_str.startswith("P"):
209        speak_prefix += "greater than "
210        repr = repr or val_str  # noqa: A001
211        val_str = val_str[1:]
212    # Create Number
213    if not val_str:
214        return None
215    ret: Number | Fraction | None = None
216    # Create Fraction
217    if "/" in val_str:
218        ret = make_fraction(val_str, repr, literal=literal, speak_prefix=speak_prefix)
219    else:
220        val_str = val_str.replace(",", "")
221        # Overwrite float 0 due to "0.0" literal
222        value = float(val_str) or 0 if "." in num else int(val_str)
223        spoken = speak_prefix + spoken_number(speak or str(value), literal=literal)
224        ret = Number(repr or num, value, spoken)
225    # Null the value if "greater than"/"less than"
226    if ret and not m_minus and repr and repr.startswith(("M", "P")):
227        ret.value = None
228    return ret
229
230
231def find_first_in_list(txt: str, str_list: list[str]) -> int:
232    """Return the index of the earliest occurrence of an item from a list in a string.
233
234    Ex: find_first_in_list('foobar', ['bar', 'fin']) -> 3
235    """
236    start = len(txt) + 1
237    for item in str_list:
238        if start > txt.find(item) > -1:
239            start = txt.find(item)
240    return start if len(txt) + 1 > start > -1 else -1
241
242
243def is_timestamp(item: str) -> bool:
244    """Return True if the item matches the timestamp format."""
245    return len(item) == 7 and item[-1] == "Z" and item[:-1].isdigit()
246
247
248def is_timerange(item: str) -> bool:
249    """Return True if the item is a TAF to-from time range."""
250    return len(item) == 9 and item[4] == "/" and item[:4].isdigit() and item[5:].isdigit()
251
252
253def is_possible_temp(temp: str) -> bool:
254    """Return True if all characters are digits or 'M' for minus."""
255    return all((char.isdigit() or char == "M") for char in temp)
256
257
258_Numeric = int | float
259
260
261def relative_humidity(temperature: _Numeric, dewpoint: _Numeric, unit: str = "C") -> float:
262    """Calculate the relative humidity as a 0 to 1 percentage."""
263
264    def saturation(value: _Numeric) -> float:
265        """Return the saturation vapor pressure without the C constant for humidity calc."""
266        return math.exp((17.67 * value) / (243.5 + value))
267
268    if unit == "F":
269        dewpoint = (dewpoint - 32) * 5 / 9
270        temperature = (temperature - 32) * 5 / 9
271    return saturation(dewpoint) / saturation(temperature)
272
273
274# https://aviation.stackexchange.com/questions/47971/how-do-i-calculate-density-altitude-by-hand
275
276
277def pressure_altitude(pressure: float, altitude: _Numeric, unit: str = "inHg") -> int:
278    """Calculate the pressure altitude in feet. Converts pressure units."""
279    if unit == "hPa":
280        pressure *= 0.02953
281    return round((29.92 - pressure) * 1000 + altitude)
282
283
284def density_altitude(pressure: float, temperature: _Numeric, altitude: _Numeric, units: Units) -> int:
285    """Calculate the density altitude in feet. Converts pressure and temperature units."""
286    if units.temperature == "F":
287        temperature = (temperature - 32) * 5 / 9
288    if units.altimeter == "hPa":
289        pressure *= 0.02953
290    pressure_alt = pressure_altitude(pressure, altitude)
291    standard = 15 - (2 * altitude / 1000)
292    return round(((temperature - standard) * 120) + pressure_alt)
293
294
295def get_station_and_time(
296    data: list[str],
297) -> tuple[list[str], str | None, str | None]:
298    """Return the report list and removed station ident and time strings."""
299    if not data:
300        return data, None, None
301    station = data.pop(0)
302    if not data:
303        return data, station, None
304    q_time, r_time = data[0], None
305    if data and q_time.endswith("Z") and q_time[:-1].isdigit():
306        r_time = data.pop(0)
307    elif data and len(q_time) == 6 and q_time.isdigit():
308        r_time = f"{data.pop(0)}Z"
309    return data, station, r_time
310
311
312def is_wind(text: str) -> bool:
313    """Return True if the text is likely a normal wind element."""
314    # Ignore wind shear
315    if text.startswith("WS"):
316        return False
317    # 09010KT, 09010G15KT
318    if len(text) > 4:
319        for ending in WIND_UNITS:
320            unit_index = text.find(ending)
321            if text.endswith(ending) and text[unit_index - 2 : unit_index].isdigit():
322                return True
323    # 09010  09010G15 VRB10
324    if len(text) != 5 and (len(text) < 8 or "G" not in text or "/" in text):
325        return False
326    return text[:5].isdigit() or (text.startswith("VRB") and text[3:5].isdigit())
327
328
329VARIABLE_DIRECTION_PATTERN = re.compile(r"\d{3}V\d{3}")
330
331
332def is_variable_wind_direction(text: str) -> bool:
333    """Return True if element looks like 350V040."""
334    if len(text) < 7:
335        return False
336    return VARIABLE_DIRECTION_PATTERN.match(text[:7]) is not None
337
338
339def separate_wind(text: str) -> tuple[str, str, str]:
340    """Extract the direction, speed, and gust from a wind element."""
341    direction, speed, gust = "", "", ""
342    # Remove gust
343    if "G" in text:
344        g_index = text.find("G")
345        start, end = g_index + 1, g_index + 3
346        # 16006GP99KT ie gust greater than
347        if "GP" in text:
348            end += 1
349        gust = text[start:end]
350        text = text[:g_index] + text[end:]
351    if text:
352        # 10G18KT
353        if len(text) == 2:
354            speed = text
355        else:
356            direction = text[:3]
357            speed = text[3:]
358    return direction, speed, gust
359
360
361def get_wind(
362    data: list[str], units: Units
363) -> tuple[
364    list[str],
365    Number | None,
366    Number | None,
367    Number | None,
368    list[Number],
369]:
370    """Return the report list, direction string, speed string, gust string, and variable direction list."""
371    direction, speed, gust = "", "", ""
372    variable: list[Number] = []
373    # Remove unit and split elements
374    if data:
375        item = copy(data[0])
376        if is_wind(item):
377            for key, unit in WIND_UNITS.items():
378                if item.endswith(key):
379                    units.wind_speed = unit
380                    item = item.replace(key, "")
381                    break
382            direction, speed, gust = separate_wind(item)
383            data.pop(0)
384    # Separated Gust
385    if data and 1 < len(data[0]) < 4 and data[0][0] == "G" and data[0][1:].isdigit():
386        gust = data.pop(0)[1:]
387    # Variable Wind Direction
388    if data and is_variable_wind_direction(data[0]):
389        for item in data.pop(0).split("V"):
390            value = make_number(item, speak=item, literal=True)
391            if value is not None:
392                variable.append(value)
393    # Convert to Number
394    direction_value = make_number(direction, speak=direction, literal=True)
395    speed_value = make_number(speed.strip("BV"), m_minus=False)
396    gust_value = make_number(gust, m_minus=False)
397    return data, direction_value, speed_value, gust_value, variable
398
399
400def get_visibility(data: list[str], units: Units) -> tuple[list[str], Number | None]:
401    """Return the report list and removed visibility string."""
402    visibility = ""
403    if data:
404        item = copy(data[0])
405        # Vis reported in statue miles
406        if item.endswith("SM"):  # 10SM
407            if item[:-2].isdigit():
408                visibility = str(int(item[:-2]))
409            elif "/" in item:
410                visibility = item[: item.find("SM")]  # 1/2SM
411            else:
412                visibility = item[:-2]
413            data.pop(0)
414            units.visibility = "sm"
415        # Vis reported in meters
416        elif len(item) == 4 and item.isdigit():
417            visibility = data.pop(0)
418            units.visibility = "m"
419        elif 7 >= len(item) >= 5 and item[:4].isdigit() and (item[4] in ["M", "N", "S", "E", "W"] or item[4:] == "NDV"):
420            visibility = data.pop(0)[:4]
421            units.visibility = "m"
422        elif len(item) == 5 and item[1:].isdigit() and item[0] in ["M", "P", "B"]:
423            visibility = data.pop(0)[1:]
424            units.visibility = "m"
425        elif item.endswith("KM"):
426            visibility = f"{item[:-2]}000"
427            data.pop(0)
428            units.visibility = "m"
429        # Vis statute miles but split Ex: 2 1/2SM
430        elif len(data) > 1 and data[1].endswith("SM") and "/" in data[1] and item.isdigit():
431            vis1 = data.pop(0)  # 2
432            vis2 = data.pop(0).replace("SM", "")  # 1/2
433            visibility = str(int(vis1) * int(vis2[2]) + int(vis2[0])) + vis2[1:]  # 5/2
434            units.visibility = "sm"
435    return data, make_number(visibility, m_minus=False)
436
437
438def sanitize_cloud(cloud: str) -> str:
439    """Fix rare cloud layer issues."""
440    if len(cloud) < 4:
441        return cloud
442    if not cloud[3].isdigit() and cloud[3] not in ("/", "-"):
443        # Bad "O": FEWO03 -> FEW003
444        if cloud[3] == "O":
445            cloud = f"{cloud[:3]}0{cloud[4:]}"
446        # Move modifiers to end: BKNC015 -> BKN015C
447        elif cloud[3] != "U" and cloud[:4] not in {"BASE", "UNKN"}:
448            cloud = cloud[:3] + cloud[4:] + cloud[3]
449    return cloud
450
451
452def _null_or_int(val: str | None) -> int | None:
453    """Nullify unknown elements and convert ints."""
454    return None if not isinstance(val, str) or is_unknown(val) else int(val)
455
456
457_TOP_OFFSETS = ("-TOPS", "-TOP")
458
459
460def make_cloud(cloud: str) -> Cloud:
461    """Return a Cloud dataclass for a cloud string.
462
463    This function assumes the input is potentially valid.
464    """
465    raw_cloud = cloud
466    cloud_type = ""
467    base: str | None = None
468    top: str | None = None
469    cloud = sanitize_cloud(cloud).replace("/", "")
470    # Separate top
471    for target in _TOP_OFFSETS:
472        topi = cloud.find(target)
473        if topi > -1:
474            top, cloud = cloud[topi + len(target) :], cloud[:topi]
475            break
476    # Separate type
477    ## BASE027
478    if cloud.startswith("BASES"):
479        cloud = cloud[5:]
480    elif cloud.startswith("BASE"):
481        cloud = cloud[4:]
482    ## VV003
483    elif cloud.startswith("VV"):
484        cloud_type, cloud = cloud[:2], cloud[2:]
485    ## FEW010
486    elif len(cloud) >= 3 and cloud[:3] in CLOUD_LIST:
487        cloud_type, cloud = cloud[:3], cloud[3:]
488    ## BKN-OVC065
489    if len(cloud) > 4 and cloud[0] == "-" and cloud[1:4] in CLOUD_LIST:
490        cloud_type += cloud[:4]
491        cloud = cloud[4:]
492    # Separate base
493    if len(cloud) >= 3 and cloud[:3].isdigit():
494        base, cloud = cloud[:3], cloud[3:]
495    elif len(cloud) >= 4 and cloud[:4] == "UNKN":
496        cloud = cloud[4:]
497    # Remainder is considered modifiers
498    modifier = cloud or None
499    # Make Cloud
500    return Cloud(raw_cloud, cloud_type or None, _null_or_int(base), _null_or_int(top), modifier)
501
502
503def get_clouds(data: list[str]) -> tuple[list[str], list]:
504    """Return the report list and removed list of split cloud layers."""
505    clouds = []
506    for i, item in reversed(list(enumerate(data))):
507        if item[:3] in CLOUD_LIST or item[:2] == "VV":
508            cloud = data.pop(i)
509            clouds.append(make_cloud(cloud))
510    # Attempt cloud sort. Fails if None values are present
511    try:
512        clouds.sort(key=lambda cloud: (cloud.base, cloud.type))
513    except TypeError:
514        clouds.reverse()  # Restores original report order
515    return data, clouds
516
517
518def get_flight_rules(visibility: Number | None, ceiling: Cloud | None) -> int:
519    """Return int based on current flight rules from parsed METAR data.
520
521    0=VFR, 1=MVFR, 2=IFR, 3=LIFR
522
523    Note: Common practice is to report no higher than IFR if visibility unavailable.
524    """
525    # Parse visibility
526    vis: _Numeric
527    if visibility is None:
528        vis = 2
529    elif visibility.repr == "CAVOK" or visibility.repr.startswith("P6"):
530        vis = 10
531    elif visibility.repr.startswith("M"):
532        vis = 0
533    elif visibility.value is None:
534        vis = 2
535    # Convert meters to miles
536    elif len(visibility.repr) == 4:
537        vis = (visibility.value or 0) * 0.000621371
538    else:
539        vis = visibility.value or 0
540    # Parse ceiling
541    cld = (ceiling.base if ceiling else 99) or 99
542    # Determine flight rules
543    if (vis <= 5) or (cld <= 30):
544        if (vis < 3) or (cld < 10):
545            if (vis < 1) or (cld < 5):
546                return 3  # LIFR
547            return 2  # IFR
548        return 1  # MVFR
549    return 0  # VFR
550
551
552def get_ceiling(clouds: list[Cloud]) -> Cloud | None:
553    """Return ceiling layer from Cloud-List or None if none found.
554
555    Assumes that the clouds are already sorted lowest to highest.
556
557    Only 'Broken', 'Overcast', and 'Vertical Visibility' are considered ceilings.
558
559    Prevents errors due to lack of cloud information (eg. '' or 'FEW///')
560    """
561    return next((c for c in clouds if c.base and c.type in {"OVC", "BKN", "VV"}), None)
562
563
564def is_altitude(value: str) -> bool:
565    """Return True if the value is a possible altitude."""
566    if len(value) < 5:
567        return False
568    if value.startswith("SFC/"):
569        return True
570    if value.startswith("FL") and value[2:5].isdigit():
571        return True
572    first, *_ = value.split("/")
573    return bool(first[-2:] == "FT" and first[-5:-2].isdigit())
574
575
576def make_altitude(
577    value: str,
578    units: Units,
579    repr: str | None = None,  # noqa: A002
580    *,
581    force_fl: bool = False,
582) -> tuple[Number | None, Units]:
583    """Convert altitude string into a number."""
584    if not value:
585        return None, units
586    raw = repr or value
587    for end in ("FT", "M"):
588        if value.endswith(end):
589            force_fl = False
590            units.altitude = end.lower()
591            value = value.removesuffix(end)
592    # F430
593    if value[0] == "F" and value[1:].isdigit():
594        value = f"FL{value[1:]}"
595    if force_fl and value[:2] != "FL":
596        value = f"FL{value}"
597    return make_number(value, repr=raw), units
598
599
600def parse_date(
601    date: str,
602    hour_threshold: int = 200,
603    *,
604    time_only: bool = False,
605    target: dt.date | None = None,
606) -> dt.datetime | None:
607    """Parse a report timestamp in ddhhZ or ddhhmmZ format.
608
609    If time_only, assumes hhmm format with current or previous day.
610
611    This function assumes the given timestamp is within the hour threshold from current date.
612    """
613    # Format date string
614    date = date.strip("Z")
615    if not date.isdigit():
616        return None
617    if time_only:
618        if len(date) != 4:
619            return None
620        index_hour = 0
621    else:
622        if len(date) == 4:
623            date += "00"
624        if len(date) != 6:
625            return None
626        index_hour = 2
627    # Create initial guess
628    if target:
629        target = dt.datetime(target.year, target.month, target.day, tzinfo=dt.timezone.utc)
630    else:
631        target = dt.datetime.now(tz=dt.timezone.utc)
632    day = target.day if time_only else int(date[:2])
633    hour = int(date[index_hour : index_hour + 2])
634    # Handle situation where next month has less days than current month
635    # Shifted value makes sure that a month shift doesn't happen twice
636    shifted = False
637    if day > monthrange(target.year, target.month)[1]:
638        target += relativedelta(months=-1)
639        shifted = True
640    try:
641        guess = target.replace(
642            day=day,
643            hour=hour % 24,
644            minute=int(date[index_hour + 2 : index_hour + 4]) % 60,
645            second=0,
646            microsecond=0,
647        )
648    except ValueError:
649        return None
650    # Handle overflow hour
651    if hour > 23:
652        guess += dt.timedelta(days=1)
653    # Handle changing months if not already shifted
654    if not shifted:
655        hourdiff = (guess - target) / dt.timedelta(minutes=1) / 60
656        if hourdiff > hour_threshold:
657            guess += relativedelta(months=-1)
658        elif hourdiff < -hour_threshold:
659            guess += relativedelta(months=+1)
660    return guess
661
662
663def make_timestamp(
664    timestamp: str | None,
665    *,
666    time_only: bool = False,
667    target_date: dt.date | None = None,
668) -> Timestamp | None:
669    """Return a Timestamp dataclass for a report timestamp in ddhhZ or ddhhmmZ format."""
670    if not timestamp:
671        return None
672    date_obj = parse_date(timestamp, time_only=time_only, target=target_date)
673    return Timestamp(timestamp, date_obj)
674
675
676def is_runway_visibility(item: str) -> bool:
677    """Return True if the item is a runway visibility range string."""
678    return (
679        len(item) > 4
680        and item[0] == "R"
681        and (item[3] == "/" or item[4] == "/")
682        and item[1:3].isdigit()
683        and "CLRD" not in item  # R28/CLRD70 Runway State
684    )
def dedupe( items: Iterable[typing.Any], *, only_neighbors: bool = False) -> list[typing.Any]:
33def dedupe(items: Iterable[Any], *, only_neighbors: bool = False) -> list[Any]:
34    """Deduplicate a list while keeping order.
35
36    If only_neighbors is True, dedupe will only check neighboring values.
37    """
38    ret: list[Any] = []
39    for item in items:
40        if (only_neighbors and ret and ret[-1] != item) or item not in ret:
41            ret.append(item)
42    return ret

Deduplicate a list while keeping order.

If only_neighbors is True, dedupe will only check neighboring values.

def is_unknown(value: str) -> bool:
45def is_unknown(value: str) -> bool:
46    """Return True if val represents and unknown value."""
47    if not isinstance(value, str):
48        raise TypeError
49    if not value or value.upper() in {"UNKN", "UNK", "UKN"}:
50        return True
51    for char in value:
52        if char not in ("/", "X", "."):
53            break
54    else:
55        return True
56    return False

Return True if val represents and unknown value.

def get_digit_list(data: list[str], from_index: int) -> tuple[list[str], list[str]]:
59def get_digit_list(data: list[str], from_index: int) -> tuple[list[str], list[str]]:
60    """Return a list of items removed from a given list of strings
61    that are all digits from 'from_index' until hitting a non-digit item.
62    """
63    ret = []
64    data.pop(from_index)
65    while len(data) > from_index and data[from_index].isdigit():
66        ret.append(data.pop(from_index))
67    return data, ret

Return a list of items removed from a given list of strings that are all digits from 'from_index' until hitting a non-digit item.

def unpack_fraction(num: str) -> str:
70def unpack_fraction(num: str) -> str:
71    """Return unpacked fraction string 5/2 -> 2 1/2."""
72    numbers = [int(n) for n in num.split("/") if n]
73    if len(numbers) != 2 or numbers[0] <= numbers[1]:
74        return num
75    numerator, denominator = numbers
76    over = numerator // denominator
77    rem = numerator % denominator
78    return f"{over} {rem}/{denominator}"

Return unpacked fraction string 5/2 -> 2 1/2.

def remove_leading_zeros(num: str) -> str:
81def remove_leading_zeros(num: str) -> str:
82    """Strip zeros while handling -, M, and empty strings."""
83    if not num:
84        return num
85    if num.startswith("M"):
86        ret = "M" + num[1:].lstrip("0")
87    elif num.startswith("-"):
88        ret = "-" + num[1:].lstrip("0")
89    else:
90        ret = num.lstrip("0")
91    return "0" if ret in ("", "M", "-") else ret

Strip zeros while handling -, M, and empty strings.

SPOKEN_POSTFIX = ((' zero zero zero', ' thousand'), (' zero zero', ' hundred'))
def spoken_number(num: str, *, literal: bool = False) -> str:
100def spoken_number(num: str, *, literal: bool = False) -> str:
101    """Return the spoken version of a number.
102
103    If literal, no conversion to hundreds/thousands
104
105    Ex: 1.2 -> one point two
106        1 1/2 -> one and one half
107        25000 -> two five thousand
108    """
109    ret = []
110    for part in num.split():
111        if part in FRACTIONS:
112            ret.append(FRACTIONS[part])
113        else:
114            val = " ".join(NUMBER_REPL[char] for char in part if char in NUMBER_REPL)
115            if not literal:
116                for target, replacement in SPOKEN_POSTFIX:
117                    if val.endswith(target):
118                        val = val[: -len(target)] + replacement
119            ret.append(val)
120    return " and ".join(ret)

Return the spoken version of a number.

If literal, no conversion to hundreds/thousands

Ex: 1.2 -> one point two 1 1/2 -> one and one half 25000 -> two five thousand

def make_fraction( num: str, repr: str | None = None, *, literal: bool = False, speak_prefix: str = '') -> avwx.structs.Fraction:
123def make_fraction(
124    num: str,
125    repr: str | None = None,  # noqa: A002
126    *,
127    literal: bool = False,
128    speak_prefix: str = "",
129) -> Fraction:
130    """Return a fraction dataclass for numbers with / in them."""
131    num_str, den_str = num.split("/")
132    # 2-1/2 but not -2 1/2
133    if "-" in num_str and not num_str.startswith("-"):
134        num_str = num_str.replace("-", " ")
135    denominator = int(den_str)
136    # Multiply multi-digit numerator
137    if len(num_str) > 1:
138        numerator = int(num_str[:-1]) * denominator + int(num_str[-1])
139        num = f"{numerator}/{denominator}"
140    else:
141        numerator = int(num_str)
142    value = numerator / denominator
143    unpacked = unpack_fraction(num)
144    spoken = speak_prefix + spoken_number(unpacked, literal=literal)
145    return Fraction(repr or num, value, spoken, numerator, denominator, unpacked)

Return a fraction dataclass for numbers with / in them.

def make_number( num: str | None, repr: str | None = None, speak: str | None = None, *, literal: bool = False, special: dict | None = None, m_minus: bool = True) -> avwx.structs.Number | avwx.structs.Fraction | None:
148def make_number(
149    num: str | None,
150    repr: str | None = None,  # noqa: A002
151    speak: str | None = None,
152    *,
153    literal: bool = False,
154    special: dict | None = None,
155    m_minus: bool = True,
156) -> Number | Fraction | None:
157    """Return a Number or Fraction dataclass for a number string.
158
159    If literal, spoken string will not convert to hundreds/thousands.
160
161    NOTE: Numerators are assumed to have a single digit. Additional are whole numbers.
162    """
163    if not num or is_unknown(num):
164        return None
165    # Check special
166    with suppress(KeyError):
167        item = (special or {}).get(num) or SPECIAL_NUMBERS[num]
168        if isinstance(item, tuple):
169            value, spoken = item
170        else:
171            value = item
172            spoken = spoken_number(str(value), literal=literal)
173        return Number(repr or num, value, spoken)
174    # Check cardinal direction
175    if num in CARDINALS:
176        if not repr:
177            repr = num  # noqa: A001
178        num = str(CARDINALS[num])
179    val_str = num
180    # Remove unit suffixes
181    if val_str.endswith("SM"):
182        repr = val_str[:]  # noqa: A001
183        val_str = val_str[:-2]
184    # Remove spurious characters from the end
185    num = num.rstrip("M.")
186    num = num.replace("O", "0")
187    num = num.replace("+", "")
188    num = num.replace(",", "")
189    # Handle Minus values with errors like 0M04
190    if m_minus and "M" in num:
191        val_str = num.replace("MM", "-").replace("M", "-")
192        while val_str[0] != "-":
193            val_str = val_str[1:]
194    # Check value prefixes
195    speak_prefix = ""
196    if val_str.startswith("ABV "):
197        speak_prefix += "above "
198        val_str = val_str[4:]
199    if val_str.startswith("BLW "):
200        speak_prefix += "below "
201        val_str = val_str[4:]
202    if val_str.startswith("FL"):
203        speak_prefix += "flight level "
204        val_str, literal = val_str[2:], True
205    if val_str.startswith("M"):
206        speak_prefix += "less than "
207        repr = repr or val_str  # noqa: A001
208        val_str = val_str[1:]
209    if val_str.startswith("P"):
210        speak_prefix += "greater than "
211        repr = repr or val_str  # noqa: A001
212        val_str = val_str[1:]
213    # Create Number
214    if not val_str:
215        return None
216    ret: Number | Fraction | None = None
217    # Create Fraction
218    if "/" in val_str:
219        ret = make_fraction(val_str, repr, literal=literal, speak_prefix=speak_prefix)
220    else:
221        val_str = val_str.replace(",", "")
222        # Overwrite float 0 due to "0.0" literal
223        value = float(val_str) or 0 if "." in num else int(val_str)
224        spoken = speak_prefix + spoken_number(speak or str(value), literal=literal)
225        ret = Number(repr or num, value, spoken)
226    # Null the value if "greater than"/"less than"
227    if ret and not m_minus and repr and repr.startswith(("M", "P")):
228        ret.value = None
229    return ret

Return a Number or Fraction dataclass for a number string.

If literal, spoken string will not convert to hundreds/thousands.

NOTE: Numerators are assumed to have a single digit. Additional are whole numbers.

def find_first_in_list(txt: str, str_list: list[str]) -> int:
232def find_first_in_list(txt: str, str_list: list[str]) -> int:
233    """Return the index of the earliest occurrence of an item from a list in a string.
234
235    Ex: find_first_in_list('foobar', ['bar', 'fin']) -> 3
236    """
237    start = len(txt) + 1
238    for item in str_list:
239        if start > txt.find(item) > -1:
240            start = txt.find(item)
241    return start if len(txt) + 1 > start > -1 else -1

Return the index of the earliest occurrence of an item from a list in a string.

Ex: find_first_in_list('foobar', ['bar', 'fin']) -> 3

def is_timestamp(item: str) -> bool:
244def is_timestamp(item: str) -> bool:
245    """Return True if the item matches the timestamp format."""
246    return len(item) == 7 and item[-1] == "Z" and item[:-1].isdigit()

Return True if the item matches the timestamp format.

def is_timerange(item: str) -> bool:
249def is_timerange(item: str) -> bool:
250    """Return True if the item is a TAF to-from time range."""
251    return len(item) == 9 and item[4] == "/" and item[:4].isdigit() and item[5:].isdigit()

Return True if the item is a TAF to-from time range.

def is_possible_temp(temp: str) -> bool:
254def is_possible_temp(temp: str) -> bool:
255    """Return True if all characters are digits or 'M' for minus."""
256    return all((char.isdigit() or char == "M") for char in temp)

Return True if all characters are digits or 'M' for minus.

def relative_humidity( temperature: int | float, dewpoint: int | float, unit: str = 'C') -> float:
262def relative_humidity(temperature: _Numeric, dewpoint: _Numeric, unit: str = "C") -> float:
263    """Calculate the relative humidity as a 0 to 1 percentage."""
264
265    def saturation(value: _Numeric) -> float:
266        """Return the saturation vapor pressure without the C constant for humidity calc."""
267        return math.exp((17.67 * value) / (243.5 + value))
268
269    if unit == "F":
270        dewpoint = (dewpoint - 32) * 5 / 9
271        temperature = (temperature - 32) * 5 / 9
272    return saturation(dewpoint) / saturation(temperature)

Calculate the relative humidity as a 0 to 1 percentage.

def pressure_altitude(pressure: float, altitude: int | float, unit: str = 'inHg') -> int:
278def pressure_altitude(pressure: float, altitude: _Numeric, unit: str = "inHg") -> int:
279    """Calculate the pressure altitude in feet. Converts pressure units."""
280    if unit == "hPa":
281        pressure *= 0.02953
282    return round((29.92 - pressure) * 1000 + altitude)

Calculate the pressure altitude in feet. Converts pressure units.

def density_altitude( pressure: float, temperature: int | float, altitude: int | float, units: avwx.structs.Units) -> int:
285def density_altitude(pressure: float, temperature: _Numeric, altitude: _Numeric, units: Units) -> int:
286    """Calculate the density altitude in feet. Converts pressure and temperature units."""
287    if units.temperature == "F":
288        temperature = (temperature - 32) * 5 / 9
289    if units.altimeter == "hPa":
290        pressure *= 0.02953
291    pressure_alt = pressure_altitude(pressure, altitude)
292    standard = 15 - (2 * altitude / 1000)
293    return round(((temperature - standard) * 120) + pressure_alt)

Calculate the density altitude in feet. Converts pressure and temperature units.

def get_station_and_time(data: list[str]) -> tuple[list[str], str | None, str | None]:
296def get_station_and_time(
297    data: list[str],
298) -> tuple[list[str], str | None, str | None]:
299    """Return the report list and removed station ident and time strings."""
300    if not data:
301        return data, None, None
302    station = data.pop(0)
303    if not data:
304        return data, station, None
305    q_time, r_time = data[0], None
306    if data and q_time.endswith("Z") and q_time[:-1].isdigit():
307        r_time = data.pop(0)
308    elif data and len(q_time) == 6 and q_time.isdigit():
309        r_time = f"{data.pop(0)}Z"
310    return data, station, r_time

Return the report list and removed station ident and time strings.

def is_wind(text: str) -> bool:
313def is_wind(text: str) -> bool:
314    """Return True if the text is likely a normal wind element."""
315    # Ignore wind shear
316    if text.startswith("WS"):
317        return False
318    # 09010KT, 09010G15KT
319    if len(text) > 4:
320        for ending in WIND_UNITS:
321            unit_index = text.find(ending)
322            if text.endswith(ending) and text[unit_index - 2 : unit_index].isdigit():
323                return True
324    # 09010  09010G15 VRB10
325    if len(text) != 5 and (len(text) < 8 or "G" not in text or "/" in text):
326        return False
327    return text[:5].isdigit() or (text.startswith("VRB") and text[3:5].isdigit())

Return True if the text is likely a normal wind element.

VARIABLE_DIRECTION_PATTERN = re.compile('\\d{3}V\\d{3}')
def is_variable_wind_direction(text: str) -> bool:
333def is_variable_wind_direction(text: str) -> bool:
334    """Return True if element looks like 350V040."""
335    if len(text) < 7:
336        return False
337    return VARIABLE_DIRECTION_PATTERN.match(text[:7]) is not None

Return True if element looks like 350V040.

def separate_wind(text: str) -> tuple[str, str, str]:
340def separate_wind(text: str) -> tuple[str, str, str]:
341    """Extract the direction, speed, and gust from a wind element."""
342    direction, speed, gust = "", "", ""
343    # Remove gust
344    if "G" in text:
345        g_index = text.find("G")
346        start, end = g_index + 1, g_index + 3
347        # 16006GP99KT ie gust greater than
348        if "GP" in text:
349            end += 1
350        gust = text[start:end]
351        text = text[:g_index] + text[end:]
352    if text:
353        # 10G18KT
354        if len(text) == 2:
355            speed = text
356        else:
357            direction = text[:3]
358            speed = text[3:]
359    return direction, speed, gust

Extract the direction, speed, and gust from a wind element.

def get_wind( data: list[str], units: avwx.structs.Units) -> tuple[list[str], avwx.structs.Number | None, avwx.structs.Number | None, avwx.structs.Number | None, list[avwx.structs.Number]]:
362def get_wind(
363    data: list[str], units: Units
364) -> tuple[
365    list[str],
366    Number | None,
367    Number | None,
368    Number | None,
369    list[Number],
370]:
371    """Return the report list, direction string, speed string, gust string, and variable direction list."""
372    direction, speed, gust = "", "", ""
373    variable: list[Number] = []
374    # Remove unit and split elements
375    if data:
376        item = copy(data[0])
377        if is_wind(item):
378            for key, unit in WIND_UNITS.items():
379                if item.endswith(key):
380                    units.wind_speed = unit
381                    item = item.replace(key, "")
382                    break
383            direction, speed, gust = separate_wind(item)
384            data.pop(0)
385    # Separated Gust
386    if data and 1 < len(data[0]) < 4 and data[0][0] == "G" and data[0][1:].isdigit():
387        gust = data.pop(0)[1:]
388    # Variable Wind Direction
389    if data and is_variable_wind_direction(data[0]):
390        for item in data.pop(0).split("V"):
391            value = make_number(item, speak=item, literal=True)
392            if value is not None:
393                variable.append(value)
394    # Convert to Number
395    direction_value = make_number(direction, speak=direction, literal=True)
396    speed_value = make_number(speed.strip("BV"), m_minus=False)
397    gust_value = make_number(gust, m_minus=False)
398    return data, direction_value, speed_value, gust_value, variable

Return the report list, direction string, speed string, gust string, and variable direction list.

def get_visibility( data: list[str], units: avwx.structs.Units) -> tuple[list[str], avwx.structs.Number | None]:
401def get_visibility(data: list[str], units: Units) -> tuple[list[str], Number | None]:
402    """Return the report list and removed visibility string."""
403    visibility = ""
404    if data:
405        item = copy(data[0])
406        # Vis reported in statue miles
407        if item.endswith("SM"):  # 10SM
408            if item[:-2].isdigit():
409                visibility = str(int(item[:-2]))
410            elif "/" in item:
411                visibility = item[: item.find("SM")]  # 1/2SM
412            else:
413                visibility = item[:-2]
414            data.pop(0)
415            units.visibility = "sm"
416        # Vis reported in meters
417        elif len(item) == 4 and item.isdigit():
418            visibility = data.pop(0)
419            units.visibility = "m"
420        elif 7 >= len(item) >= 5 and item[:4].isdigit() and (item[4] in ["M", "N", "S", "E", "W"] or item[4:] == "NDV"):
421            visibility = data.pop(0)[:4]
422            units.visibility = "m"
423        elif len(item) == 5 and item[1:].isdigit() and item[0] in ["M", "P", "B"]:
424            visibility = data.pop(0)[1:]
425            units.visibility = "m"
426        elif item.endswith("KM"):
427            visibility = f"{item[:-2]}000"
428            data.pop(0)
429            units.visibility = "m"
430        # Vis statute miles but split Ex: 2 1/2SM
431        elif len(data) > 1 and data[1].endswith("SM") and "/" in data[1] and item.isdigit():
432            vis1 = data.pop(0)  # 2
433            vis2 = data.pop(0).replace("SM", "")  # 1/2
434            visibility = str(int(vis1) * int(vis2[2]) + int(vis2[0])) + vis2[1:]  # 5/2
435            units.visibility = "sm"
436    return data, make_number(visibility, m_minus=False)

Return the report list and removed visibility string.

def sanitize_cloud(cloud: str) -> str:
439def sanitize_cloud(cloud: str) -> str:
440    """Fix rare cloud layer issues."""
441    if len(cloud) < 4:
442        return cloud
443    if not cloud[3].isdigit() and cloud[3] not in ("/", "-"):
444        # Bad "O": FEWO03 -> FEW003
445        if cloud[3] == "O":
446            cloud = f"{cloud[:3]}0{cloud[4:]}"
447        # Move modifiers to end: BKNC015 -> BKN015C
448        elif cloud[3] != "U" and cloud[:4] not in {"BASE", "UNKN"}:
449            cloud = cloud[:3] + cloud[4:] + cloud[3]
450    return cloud

Fix rare cloud layer issues.

def make_cloud(cloud: str) -> avwx.structs.Cloud:
461def make_cloud(cloud: str) -> Cloud:
462    """Return a Cloud dataclass for a cloud string.
463
464    This function assumes the input is potentially valid.
465    """
466    raw_cloud = cloud
467    cloud_type = ""
468    base: str | None = None
469    top: str | None = None
470    cloud = sanitize_cloud(cloud).replace("/", "")
471    # Separate top
472    for target in _TOP_OFFSETS:
473        topi = cloud.find(target)
474        if topi > -1:
475            top, cloud = cloud[topi + len(target) :], cloud[:topi]
476            break
477    # Separate type
478    ## BASE027
479    if cloud.startswith("BASES"):
480        cloud = cloud[5:]
481    elif cloud.startswith("BASE"):
482        cloud = cloud[4:]
483    ## VV003
484    elif cloud.startswith("VV"):
485        cloud_type, cloud = cloud[:2], cloud[2:]
486    ## FEW010
487    elif len(cloud) >= 3 and cloud[:3] in CLOUD_LIST:
488        cloud_type, cloud = cloud[:3], cloud[3:]
489    ## BKN-OVC065
490    if len(cloud) > 4 and cloud[0] == "-" and cloud[1:4] in CLOUD_LIST:
491        cloud_type += cloud[:4]
492        cloud = cloud[4:]
493    # Separate base
494    if len(cloud) >= 3 and cloud[:3].isdigit():
495        base, cloud = cloud[:3], cloud[3:]
496    elif len(cloud) >= 4 and cloud[:4] == "UNKN":
497        cloud = cloud[4:]
498    # Remainder is considered modifiers
499    modifier = cloud or None
500    # Make Cloud
501    return Cloud(raw_cloud, cloud_type or None, _null_or_int(base), _null_or_int(top), modifier)

Return a Cloud dataclass for a cloud string.

This function assumes the input is potentially valid.

def get_clouds(data: list[str]) -> tuple[list[str], list]:
504def get_clouds(data: list[str]) -> tuple[list[str], list]:
505    """Return the report list and removed list of split cloud layers."""
506    clouds = []
507    for i, item in reversed(list(enumerate(data))):
508        if item[:3] in CLOUD_LIST or item[:2] == "VV":
509            cloud = data.pop(i)
510            clouds.append(make_cloud(cloud))
511    # Attempt cloud sort. Fails if None values are present
512    try:
513        clouds.sort(key=lambda cloud: (cloud.base, cloud.type))
514    except TypeError:
515        clouds.reverse()  # Restores original report order
516    return data, clouds

Return the report list and removed list of split cloud layers.

def get_flight_rules( visibility: avwx.structs.Number | None, ceiling: avwx.structs.Cloud | None) -> int:
519def get_flight_rules(visibility: Number | None, ceiling: Cloud | None) -> int:
520    """Return int based on current flight rules from parsed METAR data.
521
522    0=VFR, 1=MVFR, 2=IFR, 3=LIFR
523
524    Note: Common practice is to report no higher than IFR if visibility unavailable.
525    """
526    # Parse visibility
527    vis: _Numeric
528    if visibility is None:
529        vis = 2
530    elif visibility.repr == "CAVOK" or visibility.repr.startswith("P6"):
531        vis = 10
532    elif visibility.repr.startswith("M"):
533        vis = 0
534    elif visibility.value is None:
535        vis = 2
536    # Convert meters to miles
537    elif len(visibility.repr) == 4:
538        vis = (visibility.value or 0) * 0.000621371
539    else:
540        vis = visibility.value or 0
541    # Parse ceiling
542    cld = (ceiling.base if ceiling else 99) or 99
543    # Determine flight rules
544    if (vis <= 5) or (cld <= 30):
545        if (vis < 3) or (cld < 10):
546            if (vis < 1) or (cld < 5):
547                return 3  # LIFR
548            return 2  # IFR
549        return 1  # MVFR
550    return 0  # VFR

Return int based on current flight rules from parsed METAR data.

0=VFR, 1=MVFR, 2=IFR, 3=LIFR

Note: Common practice is to report no higher than IFR if visibility unavailable.

def get_ceiling(clouds: list[avwx.structs.Cloud]) -> avwx.structs.Cloud | None:
553def get_ceiling(clouds: list[Cloud]) -> Cloud | None:
554    """Return ceiling layer from Cloud-List or None if none found.
555
556    Assumes that the clouds are already sorted lowest to highest.
557
558    Only 'Broken', 'Overcast', and 'Vertical Visibility' are considered ceilings.
559
560    Prevents errors due to lack of cloud information (eg. '' or 'FEW///')
561    """
562    return next((c for c in clouds if c.base and c.type in {"OVC", "BKN", "VV"}), None)

Return ceiling layer from Cloud-List or None if none found.

Assumes that the clouds are already sorted lowest to highest.

Only 'Broken', 'Overcast', and 'Vertical Visibility' are considered ceilings.

Prevents errors due to lack of cloud information (eg. '' or 'FEW///')

def is_altitude(value: str) -> bool:
565def is_altitude(value: str) -> bool:
566    """Return True if the value is a possible altitude."""
567    if len(value) < 5:
568        return False
569    if value.startswith("SFC/"):
570        return True
571    if value.startswith("FL") and value[2:5].isdigit():
572        return True
573    first, *_ = value.split("/")
574    return bool(first[-2:] == "FT" and first[-5:-2].isdigit())

Return True if the value is a possible altitude.

def make_altitude( value: str, units: avwx.structs.Units, repr: str | None = None, *, force_fl: bool = False) -> tuple[avwx.structs.Number | None, avwx.structs.Units]:
577def make_altitude(
578    value: str,
579    units: Units,
580    repr: str | None = None,  # noqa: A002
581    *,
582    force_fl: bool = False,
583) -> tuple[Number | None, Units]:
584    """Convert altitude string into a number."""
585    if not value:
586        return None, units
587    raw = repr or value
588    for end in ("FT", "M"):
589        if value.endswith(end):
590            force_fl = False
591            units.altitude = end.lower()
592            value = value.removesuffix(end)
593    # F430
594    if value[0] == "F" and value[1:].isdigit():
595        value = f"FL{value[1:]}"
596    if force_fl and value[:2] != "FL":
597        value = f"FL{value}"
598    return make_number(value, repr=raw), units

Convert altitude string into a number.

def parse_date( date: str, hour_threshold: int = 200, *, time_only: bool = False, target: datetime.date | None = None) -> datetime.datetime | None:
601def parse_date(
602    date: str,
603    hour_threshold: int = 200,
604    *,
605    time_only: bool = False,
606    target: dt.date | None = None,
607) -> dt.datetime | None:
608    """Parse a report timestamp in ddhhZ or ddhhmmZ format.
609
610    If time_only, assumes hhmm format with current or previous day.
611
612    This function assumes the given timestamp is within the hour threshold from current date.
613    """
614    # Format date string
615    date = date.strip("Z")
616    if not date.isdigit():
617        return None
618    if time_only:
619        if len(date) != 4:
620            return None
621        index_hour = 0
622    else:
623        if len(date) == 4:
624            date += "00"
625        if len(date) != 6:
626            return None
627        index_hour = 2
628    # Create initial guess
629    if target:
630        target = dt.datetime(target.year, target.month, target.day, tzinfo=dt.timezone.utc)
631    else:
632        target = dt.datetime.now(tz=dt.timezone.utc)
633    day = target.day if time_only else int(date[:2])
634    hour = int(date[index_hour : index_hour + 2])
635    # Handle situation where next month has less days than current month
636    # Shifted value makes sure that a month shift doesn't happen twice
637    shifted = False
638    if day > monthrange(target.year, target.month)[1]:
639        target += relativedelta(months=-1)
640        shifted = True
641    try:
642        guess = target.replace(
643            day=day,
644            hour=hour % 24,
645            minute=int(date[index_hour + 2 : index_hour + 4]) % 60,
646            second=0,
647            microsecond=0,
648        )
649    except ValueError:
650        return None
651    # Handle overflow hour
652    if hour > 23:
653        guess += dt.timedelta(days=1)
654    # Handle changing months if not already shifted
655    if not shifted:
656        hourdiff = (guess - target) / dt.timedelta(minutes=1) / 60
657        if hourdiff > hour_threshold:
658            guess += relativedelta(months=-1)
659        elif hourdiff < -hour_threshold:
660            guess += relativedelta(months=+1)
661    return guess

Parse a report timestamp in ddhhZ or ddhhmmZ format.

If time_only, assumes hhmm format with current or previous day.

This function assumes the given timestamp is within the hour threshold from current date.

def make_timestamp( timestamp: str | None, *, time_only: bool = False, target_date: datetime.date | None = None) -> avwx.structs.Timestamp | None:
664def make_timestamp(
665    timestamp: str | None,
666    *,
667    time_only: bool = False,
668    target_date: dt.date | None = None,
669) -> Timestamp | None:
670    """Return a Timestamp dataclass for a report timestamp in ddhhZ or ddhhmmZ format."""
671    if not timestamp:
672        return None
673    date_obj = parse_date(timestamp, time_only=time_only, target=target_date)
674    return Timestamp(timestamp, date_obj)

Return a Timestamp dataclass for a report timestamp in ddhhZ or ddhhmmZ format.

def is_runway_visibility(item: str) -> bool:
677def is_runway_visibility(item: str) -> bool:
678    """Return True if the item is a runway visibility range string."""
679    return (
680        len(item) > 4
681        and item[0] == "R"
682        and (item[3] == "/" or item[4] == "/")
683        and item[1:3].isdigit()
684        and "CLRD" not in item  # R28/CLRD70 Runway State
685    )

Return True if the item is a runway visibility range string.