avwx.parsing.core

Contains the core parsing and indent functions of avwx

  1"""
  2Contains the core parsing and indent functions of avwx
  3"""
  4
  5# pylint: disable=redefined-builtin
  6
  7# stdlib
  8import re
  9import datetime as dt
 10import math
 11from calendar import monthrange
 12from contextlib import suppress
 13from copy import copy
 14from typing import Any, Iterable, List, Optional, Tuple, Union
 15
 16# library
 17from dateutil.relativedelta import relativedelta
 18
 19# module
 20from avwx.static.core import (
 21    CARDINALS,
 22    CLOUD_LIST,
 23    FRACTIONS,
 24    NUMBER_REPL,
 25    SPECIAL_NUMBERS,
 26    WIND_UNITS,
 27)
 28from avwx.structs import Cloud, Fraction, Number, Timestamp, Units
 29
 30
 31def dedupe(items: Iterable[Any], only_neighbors: bool = False) -> List[Any]:
 32    """Deduplicate a list while keeping order
 33
 34    If only_neighbors is True, dedupe will only check neighboring values
 35    """
 36    ret: List[Any] = []
 37    for item in items:
 38        if (only_neighbors and ret and ret[-1] != item) or item not in ret:
 39            ret.append(item)
 40    return ret
 41
 42
 43def is_unknown(value: str) -> bool:
 44    """Returns True if val represents and unknown value"""
 45    if not isinstance(value, str):
 46        raise TypeError
 47    if not value or value.upper() in {"UNKN", "UNK", "UKN"}:
 48        return True
 49    for char in value:
 50        if char not in ("/", "X", "."):
 51            break
 52    else:
 53        return True
 54    return False
 55
 56
 57def get_digit_list(data: List[str], from_index: int) -> Tuple[List[str], List[str]]:
 58    """Returns a list of items removed from a given list of strings
 59    that are all digits from 'from_index' until hitting a non-digit item
 60    """
 61    ret = []
 62    data.pop(from_index)
 63    while len(data) > from_index and data[from_index].isdigit():
 64        ret.append(data.pop(from_index))
 65    return data, ret
 66
 67
 68def unpack_fraction(num: str) -> str:
 69    """Returns unpacked fraction string 5/2 -> 2 1/2"""
 70    numbers = [int(n) for n in num.split("/") if n]
 71    if len(numbers) != 2 or numbers[0] <= numbers[1]:
 72        return num
 73    numerator, denominator = numbers
 74    over = numerator // denominator
 75    rem = numerator % denominator
 76    return f"{over} {rem}/{denominator}"
 77
 78
 79def remove_leading_zeros(num: str) -> str:
 80    """Strips zeros while handling -, M, and empty strings"""
 81    if not num:
 82        return num
 83    if num.startswith("M"):
 84        ret = "M" + num[1:].lstrip("0")
 85    elif num.startswith("-"):
 86        ret = "-" + num[1:].lstrip("0")
 87    else:
 88        ret = num.lstrip("0")
 89    return "0" if ret in ("", "M", "-") else ret
 90
 91
 92SPOKEN_POSTFIX = (
 93    (" zero zero zero", " thousand"),
 94    (" zero zero", " hundred"),
 95)
 96
 97
 98def spoken_number(num: str, literal: bool = False) -> str:
 99    """Returns the spoken version of a number
100
101    If literal, no conversion to hundreds/thousands
102
103    Ex: 1.2 -> one point two
104        1 1/2 -> one and one half
105        25000 -> two five thousand
106    """
107    ret = []
108    for part in num.split():
109        if part in FRACTIONS:
110            ret.append(FRACTIONS[part])
111        else:
112            val = " ".join(NUMBER_REPL[char] for char in part if char in NUMBER_REPL)
113            if not literal:
114                for target, replacement in SPOKEN_POSTFIX:
115                    if val.endswith(target):
116                        val = val[: -len(target)] + replacement
117            ret.append(val)
118    return " and ".join(ret)
119
120
121def make_fraction(
122    num: str, repr: Optional[str] = None, literal: bool = False, speak_prefix: str = ""
123) -> Fraction:
124    """Returns a fraction dataclass for numbers with / in them"""
125    num_str, den_str = num.split("/")
126    # 2-1/2 but not -2 1/2
127    if "-" in num_str and not num_str.startswith("-"):
128        num_str = num_str.replace("-", " ")
129    denominator = int(den_str)
130    # Multiply multi-digit numerator
131    if len(num_str) > 1:
132        numerator = int(num_str[:-1]) * denominator + int(num_str[-1])
133        num = f"{numerator}/{denominator}"
134    else:
135        numerator = int(num_str)
136    value = numerator / denominator
137    unpacked = unpack_fraction(num)
138    spoken = speak_prefix + spoken_number(unpacked, literal)
139    return Fraction(repr or num, value, spoken, numerator, denominator, unpacked)
140
141
142def make_number(
143    num: Optional[str],
144    repr: Optional[str] = None,
145    speak: Optional[str] = None,
146    literal: bool = False,
147    special: Optional[dict] = None,
148    m_minus: bool = True,
149) -> Union[Number, Fraction, None]:  # sourcery skip: avoid-builtin-shadow
150    """Returns a Number or Fraction dataclass for a number string
151
152    If literal, spoken string will not convert to hundreds/thousands
153
154    NOTE: Numerators are assumed to have a single digit. Additional are whole numbers
155    """
156    # pylint: disable=too-many-branches
157    if not num or is_unknown(num):
158        return None
159    # Check special
160    with suppress(KeyError):
161        item = (special or {}).get(num) or SPECIAL_NUMBERS[num]
162        if isinstance(item, tuple):
163            value, spoken = item
164        else:
165            value = item
166            spoken = spoken_number(str(value), literal=literal)
167        return Number(repr or num, value, spoken)
168    # Check cardinal direction
169    if num in CARDINALS:
170        if not repr:
171            repr = num
172        num = str(CARDINALS[num])
173    val_str = num
174    # Remove unit suffixes
175    if val_str.endswith("SM"):
176        repr = val_str[:]
177        val_str = val_str[:-2]
178    # Remove spurious characters from the end
179    num = num.rstrip("M.")
180    num = num.replace("O", "0")
181    num = num.replace("+", "")
182    num = num.replace(",", "")
183    # Handle Minus values with errors like 0M04
184    if m_minus and "M" in num:
185        val_str = num.replace("MM", "-").replace("M", "-")
186        while val_str[0] != "-":
187            val_str = val_str[1:]
188    # Check value prefixes
189    speak_prefix = ""
190    if val_str.startswith("ABV "):
191        speak_prefix += "above "
192        val_str = val_str[4:]
193    if val_str.startswith("BLW "):
194        speak_prefix += "below "
195        val_str = val_str[4:]
196    if val_str.startswith("FL"):
197        speak_prefix += "flight level "
198        val_str, literal = val_str[2:], True
199    if val_str.startswith("M"):
200        speak_prefix += "less than "
201        repr = repr or val_str
202        val_str = val_str[1:]
203    if val_str.startswith("P"):
204        speak_prefix += "greater than "
205        repr = repr or val_str
206        val_str = val_str[1:]
207    # Create Number
208    if not val_str:
209        return None
210    ret: Union[Number, Fraction, None] = None
211    # Create Fraction
212    if "/" in val_str:
213        ret = make_fraction(val_str, repr, literal, speak_prefix=speak_prefix)
214    else:
215        # Overwrite float 0 due to "0.0" literal
216        value = float(val_str) or 0 if "." in num else int(val_str)
217        spoken = speak_prefix + spoken_number(speak or str(value), literal)
218        ret = Number(repr or num, value, spoken)
219    # Null the value if "greater than"/"less than"
220    if ret and not m_minus and repr and repr.startswith(("M", "P")):
221        ret.value = None
222    return ret
223
224
225def find_first_in_list(txt: str, str_list: List[str]) -> int:
226    """Returns the index of the earliest occurrence of an item from a list in a string
227
228    Ex: find_first_in_list('foobar', ['bar', 'fin']) -> 3
229    """
230    start = len(txt) + 1
231    for item in str_list:
232        if start > txt.find(item) > -1:
233            start = txt.find(item)
234    return start if len(txt) + 1 > start > -1 else -1
235
236
237def is_timestamp(item: str) -> bool:
238    """Returns True if the item matches the timestamp format"""
239    return len(item) == 7 and item[-1] == "Z" and item[:-1].isdigit()
240
241
242def is_timerange(item: str) -> bool:
243    """Returns True if the item is a TAF to-from time range"""
244    return (
245        len(item) == 9 and item[4] == "/" and item[:4].isdigit() and item[5:].isdigit()
246    )
247
248
249def is_possible_temp(temp: str) -> bool:
250    """Returns True if all characters are digits or 'M' (for minus)"""
251    return all((char.isdigit() or char == "M") for char in temp)
252
253
254_Numeric = Union[int, float]
255
256
257def relative_humidity(
258    temperature: _Numeric, dewpoint: _Numeric, unit: str = "C"
259) -> float:
260    """Calculates the relative humidity as a 0 to 1 percentage"""
261
262    def saturation(value: Union[int, float]) -> float:
263        """Returns the saturation vapor pressure without the C constant for humidity calc"""
264        return math.exp((17.67 * value) / (243.5 + value))
265
266    if unit == "F":
267        dewpoint = (dewpoint - 32) * 5 / 9
268        temperature = (temperature - 32) * 5 / 9
269    return saturation(dewpoint) / saturation(temperature)
270
271
272# https://aviation.stackexchange.com/questions/47971/how-do-i-calculate-density-altitude-by-hand
273
274
275def pressure_altitude(pressure: float, altitude: _Numeric, unit: str = "inHg") -> int:
276    """Calculates the pressure altitude in feet. Converts pressure units"""
277    if unit == "hPa":
278        pressure *= 0.02953
279    return round((29.92 - pressure) * 1000 + altitude)
280
281
282def density_altitude(
283    pressure: float, temperature: _Numeric, altitude: _Numeric, units: Units
284) -> int:
285    """Calculates the density altitude in feet. Converts pressure and temperature units"""
286    if units.temperature == "F":
287        temperature = (temperature - 32) * 5 / 9
288    if units.altimeter == "hPa":
289        pressure *= 0.02953
290    pressure_alt = pressure_altitude(pressure, altitude)
291    standard = 15 - (2 * altitude / 1000)
292    return round(((temperature - standard) * 120) + pressure_alt)
293
294
295def get_station_and_time(
296    data: List[str],
297) -> Tuple[List[str], Optional[str], Optional[str]]:
298    """Returns the report list and removed station ident and time strings"""
299    if not data:
300        return data, None, None
301    station = data.pop(0)
302    if not data:
303        return data, station, None
304    q_time, r_time = data[0], None
305    if data and q_time.endswith("Z") and q_time[:-1].isdigit():
306        r_time = data.pop(0)
307    elif data and len(q_time) == 6 and q_time.isdigit():
308        r_time = f"{data.pop(0)}Z"
309    return data, station, r_time
310
311
312def is_wind(text: str) -> bool:
313    """Returns True if the text is likely a normal wind element"""
314    # Ignore wind shear
315    if text.startswith("WS"):
316        return False
317    # 09010KT, 09010G15KT
318    if len(text) > 4:
319        for ending in WIND_UNITS:
320            unit_index = text.find(ending)
321            if text.endswith(ending) and text[unit_index - 2 : unit_index].isdigit():
322                return True
323    # 09010  09010G15 VRB10
324    if len(text) != 5 and (len(text) < 8 or "G" not in text or "/" in text):
325        return False
326    return text[:5].isdigit() or (text.startswith("VRB") and text[3:5].isdigit())
327
328
329VARIABLE_DIRECTION_PATTERN = re.compile(r"\d{3}V\d{3}")
330
331
332def is_variable_wind_direction(text: str) -> bool:
333    """Returns True if element looks like 350V040"""
334    if len(text) < 7:
335        return False
336    return VARIABLE_DIRECTION_PATTERN.match(text[:7]) is not None
337
338
339def separate_wind(text: str) -> Tuple[str, str, str]:
340    """Extracts the direction, speed, and gust from a wind element"""
341    direction, speed, gust = "", "", ""
342    # Remove gust
343    if "G" in text:
344        g_index = text.find("G")
345        start, end = g_index + 1, g_index + 3
346        # 16006GP99KT ie gust greater than
347        if "GP" in text:
348            end += 1
349        gust = text[start:end]
350        text = text[:g_index] + text[end:]
351    if text:
352        # 10G18KT
353        if len(text) == 2:
354            speed = text
355        else:
356            direction = text[:3]
357            speed = text[3:]
358    return direction, speed, gust
359
360
361def get_wind(
362    data: List[str], units: Units
363) -> Tuple[
364    List[str],
365    Optional[Number],
366    Optional[Number],
367    Optional[Number],
368    List[Number],
369]:
370    """Returns the report list and removed:
371
372    Direction string, speed string, gust string, variable direction list
373    """
374    direction, speed, gust = "", "", ""
375    variable: List[Number] = []
376    # Remove unit and split elements
377    if data:
378        item = copy(data[0])
379        if is_wind(item):
380            for key, unit in WIND_UNITS.items():
381                if item.endswith(key):
382                    units.wind_speed = unit
383                    item = item.replace(key, "")
384                    break
385            direction, speed, gust = separate_wind(item)
386            data.pop(0)
387    # Separated Gust
388    if data and 1 < len(data[0]) < 4 and data[0][0] == "G" and data[0][1:].isdigit():
389        gust = data.pop(0)[1:]
390    # Variable Wind Direction
391    if data and is_variable_wind_direction(data[0]):
392        for item in data.pop(0).split("V"):
393            value = make_number(item, speak=item, literal=True)
394            if value is not None:
395                variable.append(value)
396    # Convert to Number
397    direction_value = make_number(direction, speak=direction, literal=True)
398    speed_value = make_number(speed.strip("BV"), m_minus=False)
399    gust_value = make_number(gust, m_minus=False)
400    return data, direction_value, speed_value, gust_value, variable
401
402
403def get_visibility(data: List[str], units: Units) -> Tuple[List[str], Optional[Number]]:
404    """Returns the report list and removed visibility string"""
405    visibility = ""
406    if data:
407        item = copy(data[0])
408        # Vis reported in statue miles
409        if item.endswith("SM"):  # 10SM
410            if item[:-2].isdigit():
411                visibility = str(int(item[:-2]))
412            elif "/" in item:
413                visibility = item[: item.find("SM")]  # 1/2SM
414            else:
415                visibility = item[:-2]
416            data.pop(0)
417            units.visibility = "sm"
418        # Vis reported in meters
419        elif len(item) == 4 and item.isdigit():
420            visibility = data.pop(0)
421            units.visibility = "m"
422        elif (
423            7 >= len(item) >= 5
424            and item[:4].isdigit()
425            and (item[4] in ["M", "N", "S", "E", "W"] or item[4:] == "NDV")
426        ):
427            visibility = data.pop(0)[:4]
428            units.visibility = "m"
429        elif len(item) == 5 and item[1:].isdigit() and item[0] in ["M", "P", "B"]:
430            visibility = data.pop(0)[1:]
431            units.visibility = "m"
432        elif item.endswith("KM"):
433            visibility = f"{item[:-2]}000"
434            data.pop(0)
435            units.visibility = "m"
436        # Vis statute miles but split Ex: 2 1/2SM
437        elif (
438            len(data) > 1
439            and data[1].endswith("SM")
440            and "/" in data[1]
441            and item.isdigit()
442        ):
443            vis1 = data.pop(0)  # 2
444            vis2 = data.pop(0).replace("SM", "")  # 1/2
445            visibility = str(int(vis1) * int(vis2[2]) + int(vis2[0])) + vis2[1:]  # 5/2
446            units.visibility = "sm"
447    return data, make_number(visibility, m_minus=False)
448
449
450def sanitize_cloud(cloud: str) -> str:
451    """Fix rare cloud layer issues"""
452    if len(cloud) < 4:
453        return cloud
454    if not cloud[3].isdigit() and cloud[3] not in ("/", "-"):
455        # Bad "O": FEWO03 -> FEW003
456        if cloud[3] == "O":
457            cloud = f"{cloud[:3]}0{cloud[4:]}"
458        # Move modifiers to end: BKNC015 -> BKN015C
459        elif cloud[3] != "U" and cloud[:4] not in {"BASE", "UNKN"}:
460            cloud = cloud[:3] + cloud[4:] + cloud[3]
461    return cloud
462
463
464def _null_or_int(val: Optional[str]) -> Optional[int]:
465    """Nullify unknown elements and convert ints"""
466    return None if not isinstance(val, str) or is_unknown(val) else int(val)
467
468
469_TOP_OFFSETS = ("-TOPS", "-TOP")
470
471
472def make_cloud(cloud: str) -> Cloud:
473    """Returns a Cloud dataclass for a cloud string
474
475    This function assumes the input is potentially valid
476    """
477    raw_cloud = cloud
478    cloud_type = ""
479    base: Optional[str] = None
480    top: Optional[str] = None
481    modifier: Optional[str] = None
482    cloud = sanitize_cloud(cloud).replace("/", "")
483    # Separate top
484    for target in _TOP_OFFSETS:
485        topi = cloud.find(target)
486        if topi > -1:
487            top, cloud = cloud[topi + len(target) :], cloud[:topi]
488            break
489    # Separate type
490    ## BASE027
491    if cloud.startswith("BASES"):
492        cloud = cloud[5:]
493    elif cloud.startswith("BASE"):
494        cloud = cloud[4:]
495    ## VV003
496    elif cloud.startswith("VV"):
497        cloud_type, cloud = cloud[:2], cloud[2:]
498    ## FEW010
499    elif len(cloud) >= 3 and cloud[:3] in CLOUD_LIST:
500        cloud_type, cloud = cloud[:3], cloud[3:]
501    ## BKN-OVC065
502    if len(cloud) > 4 and cloud[0] == "-" and cloud[1:4] in CLOUD_LIST:
503        cloud_type += cloud[:4]
504        cloud = cloud[4:]
505    # Separate base
506    if len(cloud) >= 3 and cloud[:3].isdigit():
507        base, cloud = cloud[:3], cloud[3:]
508    elif len(cloud) >= 4 and cloud[:4] == "UNKN":
509        cloud = cloud[4:]
510    # Remainder is considered modifiers
511    if cloud:
512        modifier = cloud
513    # Make Cloud
514    return Cloud(
515        raw_cloud, cloud_type or None, _null_or_int(base), _null_or_int(top), modifier
516    )
517
518
519def get_clouds(data: List[str]) -> Tuple[List[str], list]:
520    """Returns the report list and removed list of split cloud layers"""
521    clouds = []
522    for i, item in reversed(list(enumerate(data))):
523        if item[:3] in CLOUD_LIST or item[:2] == "VV":
524            cloud = data.pop(i)
525            clouds.append(make_cloud(cloud))
526    # Attempt cloud sort. Fails if None values are present
527    try:
528        clouds.sort(key=lambda cloud: (cloud.base, cloud.type))
529    except TypeError:
530        clouds.reverse()  # Restores original report order
531    return data, clouds
532
533
534def get_flight_rules(visibility: Optional[Number], ceiling: Optional[Cloud]) -> int:
535    # sourcery skip: assign-if-exp, reintroduce-else
536    """Returns int based on current flight rules from parsed METAR data
537
538    0=VFR, 1=MVFR, 2=IFR, 3=LIFR
539
540    Note: Common practice is to report no higher than IFR if visibility unavailable
541    """
542    # Parse visibility
543    vis: Union[int, float]
544    if visibility is None:
545        vis = 2
546    elif visibility.repr == "CAVOK" or visibility.repr.startswith("P6"):
547        vis = 10
548    elif visibility.repr.startswith("M"):
549        vis = 0
550    elif visibility.value is None:
551        vis = 2
552    # Convert meters to miles
553    elif len(visibility.repr) == 4:
554        vis = (visibility.value or 0) * 0.000621371
555    else:
556        vis = visibility.value or 0
557    # Parse ceiling
558    cld = (ceiling.base if ceiling else 99) or 99
559    # Determine flight rules
560    if (vis <= 5) or (cld <= 30):
561        if (vis < 3) or (cld < 10):
562            if (vis < 1) or (cld < 5):
563                return 3  # LIFR
564            return 2  # IFR
565        return 1  # MVFR
566    return 0  # VFR
567
568
569def get_ceiling(clouds: List[Cloud]) -> Optional[Cloud]:
570    """Returns ceiling layer from Cloud-List or None if none found
571
572    Assumes that the clouds are already sorted lowest to highest
573
574    Only 'Broken', 'Overcast', and 'Vertical Visibility' are considered ceilings
575
576    Prevents errors due to lack of cloud information (eg. '' or 'FEW///')
577    """
578    return next((c for c in clouds if c.base and c.type in {"OVC", "BKN", "VV"}), None)
579
580
581def is_altitude(value: str) -> bool:
582    """Returns True if the value is a possible altitude"""
583    if len(value) < 5:
584        return False
585    if value.startswith("SFC/"):
586        return True
587    if value.startswith("FL") and value[2:5].isdigit():
588        return True
589    first, *_ = value.split("/")
590    return bool(first[-2:] == "FT" and first[-5:-2].isdigit())
591
592
593def make_altitude(
594    value: str, units: Units, repr: Optional[str] = None, force_fl: bool = False
595) -> Tuple[Optional[Number], Units]:
596    """Convert altitude string into a number"""
597    if not value:
598        return None, units
599    raw = repr or value
600    for end in ("FT", "M"):
601        if value.endswith(end):
602            force_fl = False
603            units.altitude = end.lower()
604            # post 3.8 value = value.removesuffix(end)
605            value = value[: -len(end)]
606    # F430
607    if value[0] == "F" and value[1:].isdigit():
608        value = f"FL{value[1:]}"
609    if force_fl and value[:2] != "FL":
610        value = f"FL{value}"
611    return make_number(value, repr=raw), units
612
613
614def parse_date(
615    date: str,
616    hour_threshold: int = 200,
617    time_only: bool = False,
618    target: Optional[dt.date] = None,
619) -> Optional[dt.datetime]:
620    """Parses a report timestamp in ddhhZ or ddhhmmZ format
621
622    If time_only, assumes hhmm format with current or previous day
623
624    This function assumes the given timestamp is within the hour threshold from current date
625    """
626    # pylint: disable=too-many-branches
627    # Format date string
628    date = date.strip("Z")
629    if not date.isdigit():
630        return None
631    if time_only:
632        if len(date) != 4:
633            return None
634        index_hour = 0
635    else:
636        if len(date) == 4:
637            date += "00"
638        if len(date) != 6:
639            return None
640        index_hour = 2
641    # Create initial guess
642    if target:
643        target = dt.datetime(
644            target.year, target.month, target.day, tzinfo=dt.timezone.utc
645        )
646    else:
647        target = dt.datetime.now(tz=dt.timezone.utc)
648    day = target.day if time_only else int(date[:2])
649    hour = int(date[index_hour : index_hour + 2])
650    # Handle situation where next month has less days than current month
651    # Shifted value makes sure that a month shift doesn't happen twice
652    shifted = False
653    if day > monthrange(target.year, target.month)[1]:
654        target += relativedelta(months=-1)
655        shifted = True
656    try:
657        guess = target.replace(
658            day=day,
659            hour=hour % 24,
660            minute=int(date[index_hour + 2 : index_hour + 4]) % 60,
661            second=0,
662            microsecond=0,
663        )
664    except ValueError:
665        return None
666    # Handle overflow hour
667    if hour > 23:
668        guess += dt.timedelta(days=1)
669    # Handle changing months if not already shifted
670    if not shifted:
671        hourdiff = (guess - target) / dt.timedelta(minutes=1) / 60
672        if hourdiff > hour_threshold:
673            guess += relativedelta(months=-1)
674        elif hourdiff < -hour_threshold:
675            guess += relativedelta(months=+1)
676    return guess
677
678
679def make_timestamp(
680    timestamp: Optional[str],
681    time_only: bool = False,
682    target_date: Optional[dt.date] = None,
683) -> Optional[Timestamp]:
684    """Returns a Timestamp dataclass for a report timestamp in ddhhZ or ddhhmmZ format"""
685    if not timestamp:
686        return None
687    date_obj = parse_date(timestamp, time_only=time_only, target=target_date)
688    return Timestamp(timestamp, date_obj)
689
690
691def is_runway_visibility(item: str) -> bool:
692    """Returns True if the item is a runway visibility range string"""
693    return (
694        len(item) > 4
695        and item[0] == "R"
696        and (item[3] == "/" or item[4] == "/")
697        and item[1:3].isdigit()
698        and "CLRD" not in item  # R28/CLRD70 Runway State
699    )
def dedupe(items: Iterable[Any], only_neighbors: bool = False) -> List[Any]:
32def dedupe(items: Iterable[Any], only_neighbors: bool = False) -> List[Any]:
33    """Deduplicate a list while keeping order
34
35    If only_neighbors is True, dedupe will only check neighboring values
36    """
37    ret: List[Any] = []
38    for item in items:
39        if (only_neighbors and ret and ret[-1] != item) or item not in ret:
40            ret.append(item)
41    return ret

Deduplicate a list while keeping order

If only_neighbors is True, dedupe will only check neighboring values

def is_unknown(value: str) -> bool:
44def is_unknown(value: str) -> bool:
45    """Returns True if val represents and unknown value"""
46    if not isinstance(value, str):
47        raise TypeError
48    if not value or value.upper() in {"UNKN", "UNK", "UKN"}:
49        return True
50    for char in value:
51        if char not in ("/", "X", "."):
52            break
53    else:
54        return True
55    return False

Returns True if val represents and unknown value

def get_digit_list(data: List[str], from_index: int) -> Tuple[List[str], List[str]]:
58def get_digit_list(data: List[str], from_index: int) -> Tuple[List[str], List[str]]:
59    """Returns a list of items removed from a given list of strings
60    that are all digits from 'from_index' until hitting a non-digit item
61    """
62    ret = []
63    data.pop(from_index)
64    while len(data) > from_index and data[from_index].isdigit():
65        ret.append(data.pop(from_index))
66    return data, ret

Returns a list of items removed from a given list of strings that are all digits from 'from_index' until hitting a non-digit item

def unpack_fraction(num: str) -> str:
69def unpack_fraction(num: str) -> str:
70    """Returns unpacked fraction string 5/2 -> 2 1/2"""
71    numbers = [int(n) for n in num.split("/") if n]
72    if len(numbers) != 2 or numbers[0] <= numbers[1]:
73        return num
74    numerator, denominator = numbers
75    over = numerator // denominator
76    rem = numerator % denominator
77    return f"{over} {rem}/{denominator}"

Returns unpacked fraction string 5/2 -> 2 1/2

def remove_leading_zeros(num: str) -> str:
80def remove_leading_zeros(num: str) -> str:
81    """Strips zeros while handling -, M, and empty strings"""
82    if not num:
83        return num
84    if num.startswith("M"):
85        ret = "M" + num[1:].lstrip("0")
86    elif num.startswith("-"):
87        ret = "-" + num[1:].lstrip("0")
88    else:
89        ret = num.lstrip("0")
90    return "0" if ret in ("", "M", "-") else ret

Strips zeros while handling -, M, and empty strings

SPOKEN_POSTFIX = ((' zero zero zero', ' thousand'), (' zero zero', ' hundred'))
def spoken_number(num: str, literal: bool = False) -> str:
 99def spoken_number(num: str, literal: bool = False) -> str:
100    """Returns the spoken version of a number
101
102    If literal, no conversion to hundreds/thousands
103
104    Ex: 1.2 -> one point two
105        1 1/2 -> one and one half
106        25000 -> two five thousand
107    """
108    ret = []
109    for part in num.split():
110        if part in FRACTIONS:
111            ret.append(FRACTIONS[part])
112        else:
113            val = " ".join(NUMBER_REPL[char] for char in part if char in NUMBER_REPL)
114            if not literal:
115                for target, replacement in SPOKEN_POSTFIX:
116                    if val.endswith(target):
117                        val = val[: -len(target)] + replacement
118            ret.append(val)
119    return " and ".join(ret)

Returns the spoken version of a number

If literal, no conversion to hundreds/thousands

Ex: 1.2 -> one point two 1 1/2 -> one and one half 25000 -> two five thousand

def make_fraction( num: str, repr: Optional[str] = None, literal: bool = False, speak_prefix: str = '') -> avwx.structs.Fraction:
122def make_fraction(
123    num: str, repr: Optional[str] = None, literal: bool = False, speak_prefix: str = ""
124) -> Fraction:
125    """Returns a fraction dataclass for numbers with / in them"""
126    num_str, den_str = num.split("/")
127    # 2-1/2 but not -2 1/2
128    if "-" in num_str and not num_str.startswith("-"):
129        num_str = num_str.replace("-", " ")
130    denominator = int(den_str)
131    # Multiply multi-digit numerator
132    if len(num_str) > 1:
133        numerator = int(num_str[:-1]) * denominator + int(num_str[-1])
134        num = f"{numerator}/{denominator}"
135    else:
136        numerator = int(num_str)
137    value = numerator / denominator
138    unpacked = unpack_fraction(num)
139    spoken = speak_prefix + spoken_number(unpacked, literal)
140    return Fraction(repr or num, value, spoken, numerator, denominator, unpacked)

Returns a fraction dataclass for numbers with / in them

def make_number( num: Optional[str], repr: Optional[str] = None, speak: Optional[str] = None, literal: bool = False, special: Optional[dict] = None, m_minus: bool = True) -> Union[avwx.structs.Number, avwx.structs.Fraction, NoneType]:
143def make_number(
144    num: Optional[str],
145    repr: Optional[str] = None,
146    speak: Optional[str] = None,
147    literal: bool = False,
148    special: Optional[dict] = None,
149    m_minus: bool = True,
150) -> Union[Number, Fraction, None]:  # sourcery skip: avoid-builtin-shadow
151    """Returns a Number or Fraction dataclass for a number string
152
153    If literal, spoken string will not convert to hundreds/thousands
154
155    NOTE: Numerators are assumed to have a single digit. Additional are whole numbers
156    """
157    # pylint: disable=too-many-branches
158    if not num or is_unknown(num):
159        return None
160    # Check special
161    with suppress(KeyError):
162        item = (special or {}).get(num) or SPECIAL_NUMBERS[num]
163        if isinstance(item, tuple):
164            value, spoken = item
165        else:
166            value = item
167            spoken = spoken_number(str(value), literal=literal)
168        return Number(repr or num, value, spoken)
169    # Check cardinal direction
170    if num in CARDINALS:
171        if not repr:
172            repr = num
173        num = str(CARDINALS[num])
174    val_str = num
175    # Remove unit suffixes
176    if val_str.endswith("SM"):
177        repr = val_str[:]
178        val_str = val_str[:-2]
179    # Remove spurious characters from the end
180    num = num.rstrip("M.")
181    num = num.replace("O", "0")
182    num = num.replace("+", "")
183    num = num.replace(",", "")
184    # Handle Minus values with errors like 0M04
185    if m_minus and "M" in num:
186        val_str = num.replace("MM", "-").replace("M", "-")
187        while val_str[0] != "-":
188            val_str = val_str[1:]
189    # Check value prefixes
190    speak_prefix = ""
191    if val_str.startswith("ABV "):
192        speak_prefix += "above "
193        val_str = val_str[4:]
194    if val_str.startswith("BLW "):
195        speak_prefix += "below "
196        val_str = val_str[4:]
197    if val_str.startswith("FL"):
198        speak_prefix += "flight level "
199        val_str, literal = val_str[2:], True
200    if val_str.startswith("M"):
201        speak_prefix += "less than "
202        repr = repr or val_str
203        val_str = val_str[1:]
204    if val_str.startswith("P"):
205        speak_prefix += "greater than "
206        repr = repr or val_str
207        val_str = val_str[1:]
208    # Create Number
209    if not val_str:
210        return None
211    ret: Union[Number, Fraction, None] = None
212    # Create Fraction
213    if "/" in val_str:
214        ret = make_fraction(val_str, repr, literal, speak_prefix=speak_prefix)
215    else:
216        # Overwrite float 0 due to "0.0" literal
217        value = float(val_str) or 0 if "." in num else int(val_str)
218        spoken = speak_prefix + spoken_number(speak or str(value), literal)
219        ret = Number(repr or num, value, spoken)
220    # Null the value if "greater than"/"less than"
221    if ret and not m_minus and repr and repr.startswith(("M", "P")):
222        ret.value = None
223    return ret

Returns a Number or Fraction dataclass for a number string

If literal, spoken string will not convert to hundreds/thousands

NOTE: Numerators are assumed to have a single digit. Additional are whole numbers

def find_first_in_list(txt: str, str_list: List[str]) -> int:
226def find_first_in_list(txt: str, str_list: List[str]) -> int:
227    """Returns the index of the earliest occurrence of an item from a list in a string
228
229    Ex: find_first_in_list('foobar', ['bar', 'fin']) -> 3
230    """
231    start = len(txt) + 1
232    for item in str_list:
233        if start > txt.find(item) > -1:
234            start = txt.find(item)
235    return start if len(txt) + 1 > start > -1 else -1

Returns the index of the earliest occurrence of an item from a list in a string

Ex: find_first_in_list('foobar', ['bar', 'fin']) -> 3

def is_timestamp(item: str) -> bool:
238def is_timestamp(item: str) -> bool:
239    """Returns True if the item matches the timestamp format"""
240    return len(item) == 7 and item[-1] == "Z" and item[:-1].isdigit()

Returns True if the item matches the timestamp format

def is_timerange(item: str) -> bool:
243def is_timerange(item: str) -> bool:
244    """Returns True if the item is a TAF to-from time range"""
245    return (
246        len(item) == 9 and item[4] == "/" and item[:4].isdigit() and item[5:].isdigit()
247    )

Returns True if the item is a TAF to-from time range

def is_possible_temp(temp: str) -> bool:
250def is_possible_temp(temp: str) -> bool:
251    """Returns True if all characters are digits or 'M' (for minus)"""
252    return all((char.isdigit() or char == "M") for char in temp)

Returns True if all characters are digits or 'M' (for minus)

def relative_humidity( temperature: Union[int, float], dewpoint: Union[int, float], unit: str = 'C') -> float:
258def relative_humidity(
259    temperature: _Numeric, dewpoint: _Numeric, unit: str = "C"
260) -> float:
261    """Calculates the relative humidity as a 0 to 1 percentage"""
262
263    def saturation(value: Union[int, float]) -> float:
264        """Returns the saturation vapor pressure without the C constant for humidity calc"""
265        return math.exp((17.67 * value) / (243.5 + value))
266
267    if unit == "F":
268        dewpoint = (dewpoint - 32) * 5 / 9
269        temperature = (temperature - 32) * 5 / 9
270    return saturation(dewpoint) / saturation(temperature)

Calculates the relative humidity as a 0 to 1 percentage

def pressure_altitude(pressure: float, altitude: Union[int, float], unit: str = 'inHg') -> int:
276def pressure_altitude(pressure: float, altitude: _Numeric, unit: str = "inHg") -> int:
277    """Calculates the pressure altitude in feet. Converts pressure units"""
278    if unit == "hPa":
279        pressure *= 0.02953
280    return round((29.92 - pressure) * 1000 + altitude)

Calculates the pressure altitude in feet. Converts pressure units

def density_altitude( pressure: float, temperature: Union[int, float], altitude: Union[int, float], units: avwx.structs.Units) -> int:
283def density_altitude(
284    pressure: float, temperature: _Numeric, altitude: _Numeric, units: Units
285) -> int:
286    """Calculates the density altitude in feet. Converts pressure and temperature units"""
287    if units.temperature == "F":
288        temperature = (temperature - 32) * 5 / 9
289    if units.altimeter == "hPa":
290        pressure *= 0.02953
291    pressure_alt = pressure_altitude(pressure, altitude)
292    standard = 15 - (2 * altitude / 1000)
293    return round(((temperature - standard) * 120) + pressure_alt)

Calculates the density altitude in feet. Converts pressure and temperature units

def get_station_and_time(data: List[str]) -> Tuple[List[str], Optional[str], Optional[str]]:
296def get_station_and_time(
297    data: List[str],
298) -> Tuple[List[str], Optional[str], Optional[str]]:
299    """Returns the report list and removed station ident and time strings"""
300    if not data:
301        return data, None, None
302    station = data.pop(0)
303    if not data:
304        return data, station, None
305    q_time, r_time = data[0], None
306    if data and q_time.endswith("Z") and q_time[:-1].isdigit():
307        r_time = data.pop(0)
308    elif data and len(q_time) == 6 and q_time.isdigit():
309        r_time = f"{data.pop(0)}Z"
310    return data, station, r_time

Returns the report list and removed station ident and time strings

def is_wind(text: str) -> bool:
313def is_wind(text: str) -> bool:
314    """Returns True if the text is likely a normal wind element"""
315    # Ignore wind shear
316    if text.startswith("WS"):
317        return False
318    # 09010KT, 09010G15KT
319    if len(text) > 4:
320        for ending in WIND_UNITS:
321            unit_index = text.find(ending)
322            if text.endswith(ending) and text[unit_index - 2 : unit_index].isdigit():
323                return True
324    # 09010  09010G15 VRB10
325    if len(text) != 5 and (len(text) < 8 or "G" not in text or "/" in text):
326        return False
327    return text[:5].isdigit() or (text.startswith("VRB") and text[3:5].isdigit())

Returns True if the text is likely a normal wind element

VARIABLE_DIRECTION_PATTERN = re.compile('\\d{3}V\\d{3}')
def is_variable_wind_direction(text: str) -> bool:
333def is_variable_wind_direction(text: str) -> bool:
334    """Returns True if element looks like 350V040"""
335    if len(text) < 7:
336        return False
337    return VARIABLE_DIRECTION_PATTERN.match(text[:7]) is not None

Returns True if element looks like 350V040

def separate_wind(text: str) -> Tuple[str, str, str]:
340def separate_wind(text: str) -> Tuple[str, str, str]:
341    """Extracts the direction, speed, and gust from a wind element"""
342    direction, speed, gust = "", "", ""
343    # Remove gust
344    if "G" in text:
345        g_index = text.find("G")
346        start, end = g_index + 1, g_index + 3
347        # 16006GP99KT ie gust greater than
348        if "GP" in text:
349            end += 1
350        gust = text[start:end]
351        text = text[:g_index] + text[end:]
352    if text:
353        # 10G18KT
354        if len(text) == 2:
355            speed = text
356        else:
357            direction = text[:3]
358            speed = text[3:]
359    return direction, speed, gust

Extracts the direction, speed, and gust from a wind element

def get_wind( data: List[str], units: avwx.structs.Units) -> Tuple[List[str], Optional[avwx.structs.Number], Optional[avwx.structs.Number], Optional[avwx.structs.Number], List[avwx.structs.Number]]:
362def get_wind(
363    data: List[str], units: Units
364) -> Tuple[
365    List[str],
366    Optional[Number],
367    Optional[Number],
368    Optional[Number],
369    List[Number],
370]:
371    """Returns the report list and removed:
372
373    Direction string, speed string, gust string, variable direction list
374    """
375    direction, speed, gust = "", "", ""
376    variable: List[Number] = []
377    # Remove unit and split elements
378    if data:
379        item = copy(data[0])
380        if is_wind(item):
381            for key, unit in WIND_UNITS.items():
382                if item.endswith(key):
383                    units.wind_speed = unit
384                    item = item.replace(key, "")
385                    break
386            direction, speed, gust = separate_wind(item)
387            data.pop(0)
388    # Separated Gust
389    if data and 1 < len(data[0]) < 4 and data[0][0] == "G" and data[0][1:].isdigit():
390        gust = data.pop(0)[1:]
391    # Variable Wind Direction
392    if data and is_variable_wind_direction(data[0]):
393        for item in data.pop(0).split("V"):
394            value = make_number(item, speak=item, literal=True)
395            if value is not None:
396                variable.append(value)
397    # Convert to Number
398    direction_value = make_number(direction, speak=direction, literal=True)
399    speed_value = make_number(speed.strip("BV"), m_minus=False)
400    gust_value = make_number(gust, m_minus=False)
401    return data, direction_value, speed_value, gust_value, variable

Returns the report list and removed:

Direction string, speed string, gust string, variable direction list

def get_visibility( data: List[str], units: avwx.structs.Units) -> Tuple[List[str], Optional[avwx.structs.Number]]:
404def get_visibility(data: List[str], units: Units) -> Tuple[List[str], Optional[Number]]:
405    """Returns the report list and removed visibility string"""
406    visibility = ""
407    if data:
408        item = copy(data[0])
409        # Vis reported in statue miles
410        if item.endswith("SM"):  # 10SM
411            if item[:-2].isdigit():
412                visibility = str(int(item[:-2]))
413            elif "/" in item:
414                visibility = item[: item.find("SM")]  # 1/2SM
415            else:
416                visibility = item[:-2]
417            data.pop(0)
418            units.visibility = "sm"
419        # Vis reported in meters
420        elif len(item) == 4 and item.isdigit():
421            visibility = data.pop(0)
422            units.visibility = "m"
423        elif (
424            7 >= len(item) >= 5
425            and item[:4].isdigit()
426            and (item[4] in ["M", "N", "S", "E", "W"] or item[4:] == "NDV")
427        ):
428            visibility = data.pop(0)[:4]
429            units.visibility = "m"
430        elif len(item) == 5 and item[1:].isdigit() and item[0] in ["M", "P", "B"]:
431            visibility = data.pop(0)[1:]
432            units.visibility = "m"
433        elif item.endswith("KM"):
434            visibility = f"{item[:-2]}000"
435            data.pop(0)
436            units.visibility = "m"
437        # Vis statute miles but split Ex: 2 1/2SM
438        elif (
439            len(data) > 1
440            and data[1].endswith("SM")
441            and "/" in data[1]
442            and item.isdigit()
443        ):
444            vis1 = data.pop(0)  # 2
445            vis2 = data.pop(0).replace("SM", "")  # 1/2
446            visibility = str(int(vis1) * int(vis2[2]) + int(vis2[0])) + vis2[1:]  # 5/2
447            units.visibility = "sm"
448    return data, make_number(visibility, m_minus=False)

Returns the report list and removed visibility string

def sanitize_cloud(cloud: str) -> str:
451def sanitize_cloud(cloud: str) -> str:
452    """Fix rare cloud layer issues"""
453    if len(cloud) < 4:
454        return cloud
455    if not cloud[3].isdigit() and cloud[3] not in ("/", "-"):
456        # Bad "O": FEWO03 -> FEW003
457        if cloud[3] == "O":
458            cloud = f"{cloud[:3]}0{cloud[4:]}"
459        # Move modifiers to end: BKNC015 -> BKN015C
460        elif cloud[3] != "U" and cloud[:4] not in {"BASE", "UNKN"}:
461            cloud = cloud[:3] + cloud[4:] + cloud[3]
462    return cloud

Fix rare cloud layer issues

def make_cloud(cloud: str) -> avwx.structs.Cloud:
473def make_cloud(cloud: str) -> Cloud:
474    """Returns a Cloud dataclass for a cloud string
475
476    This function assumes the input is potentially valid
477    """
478    raw_cloud = cloud
479    cloud_type = ""
480    base: Optional[str] = None
481    top: Optional[str] = None
482    modifier: Optional[str] = None
483    cloud = sanitize_cloud(cloud).replace("/", "")
484    # Separate top
485    for target in _TOP_OFFSETS:
486        topi = cloud.find(target)
487        if topi > -1:
488            top, cloud = cloud[topi + len(target) :], cloud[:topi]
489            break
490    # Separate type
491    ## BASE027
492    if cloud.startswith("BASES"):
493        cloud = cloud[5:]
494    elif cloud.startswith("BASE"):
495        cloud = cloud[4:]
496    ## VV003
497    elif cloud.startswith("VV"):
498        cloud_type, cloud = cloud[:2], cloud[2:]
499    ## FEW010
500    elif len(cloud) >= 3 and cloud[:3] in CLOUD_LIST:
501        cloud_type, cloud = cloud[:3], cloud[3:]
502    ## BKN-OVC065
503    if len(cloud) > 4 and cloud[0] == "-" and cloud[1:4] in CLOUD_LIST:
504        cloud_type += cloud[:4]
505        cloud = cloud[4:]
506    # Separate base
507    if len(cloud) >= 3 and cloud[:3].isdigit():
508        base, cloud = cloud[:3], cloud[3:]
509    elif len(cloud) >= 4 and cloud[:4] == "UNKN":
510        cloud = cloud[4:]
511    # Remainder is considered modifiers
512    if cloud:
513        modifier = cloud
514    # Make Cloud
515    return Cloud(
516        raw_cloud, cloud_type or None, _null_or_int(base), _null_or_int(top), modifier
517    )

Returns a Cloud dataclass for a cloud string

This function assumes the input is potentially valid

def get_clouds(data: List[str]) -> Tuple[List[str], list]:
520def get_clouds(data: List[str]) -> Tuple[List[str], list]:
521    """Returns the report list and removed list of split cloud layers"""
522    clouds = []
523    for i, item in reversed(list(enumerate(data))):
524        if item[:3] in CLOUD_LIST or item[:2] == "VV":
525            cloud = data.pop(i)
526            clouds.append(make_cloud(cloud))
527    # Attempt cloud sort. Fails if None values are present
528    try:
529        clouds.sort(key=lambda cloud: (cloud.base, cloud.type))
530    except TypeError:
531        clouds.reverse()  # Restores original report order
532    return data, clouds

Returns the report list and removed list of split cloud layers

def get_flight_rules( visibility: Optional[avwx.structs.Number], ceiling: Optional[avwx.structs.Cloud]) -> int:
535def get_flight_rules(visibility: Optional[Number], ceiling: Optional[Cloud]) -> int:
536    # sourcery skip: assign-if-exp, reintroduce-else
537    """Returns int based on current flight rules from parsed METAR data
538
539    0=VFR, 1=MVFR, 2=IFR, 3=LIFR
540
541    Note: Common practice is to report no higher than IFR if visibility unavailable
542    """
543    # Parse visibility
544    vis: Union[int, float]
545    if visibility is None:
546        vis = 2
547    elif visibility.repr == "CAVOK" or visibility.repr.startswith("P6"):
548        vis = 10
549    elif visibility.repr.startswith("M"):
550        vis = 0
551    elif visibility.value is None:
552        vis = 2
553    # Convert meters to miles
554    elif len(visibility.repr) == 4:
555        vis = (visibility.value or 0) * 0.000621371
556    else:
557        vis = visibility.value or 0
558    # Parse ceiling
559    cld = (ceiling.base if ceiling else 99) or 99
560    # Determine flight rules
561    if (vis <= 5) or (cld <= 30):
562        if (vis < 3) or (cld < 10):
563            if (vis < 1) or (cld < 5):
564                return 3  # LIFR
565            return 2  # IFR
566        return 1  # MVFR
567    return 0  # VFR

Returns int based on current flight rules from parsed METAR data

0=VFR, 1=MVFR, 2=IFR, 3=LIFR

Note: Common practice is to report no higher than IFR if visibility unavailable

def get_ceiling(clouds: List[avwx.structs.Cloud]) -> Optional[avwx.structs.Cloud]:
570def get_ceiling(clouds: List[Cloud]) -> Optional[Cloud]:
571    """Returns ceiling layer from Cloud-List or None if none found
572
573    Assumes that the clouds are already sorted lowest to highest
574
575    Only 'Broken', 'Overcast', and 'Vertical Visibility' are considered ceilings
576
577    Prevents errors due to lack of cloud information (eg. '' or 'FEW///')
578    """
579    return next((c for c in clouds if c.base and c.type in {"OVC", "BKN", "VV"}), None)

Returns ceiling layer from Cloud-List or None if none found

Assumes that the clouds are already sorted lowest to highest

Only 'Broken', 'Overcast', and 'Vertical Visibility' are considered ceilings

Prevents errors due to lack of cloud information (eg. '' or 'FEW///')

def is_altitude(value: str) -> bool:
582def is_altitude(value: str) -> bool:
583    """Returns True if the value is a possible altitude"""
584    if len(value) < 5:
585        return False
586    if value.startswith("SFC/"):
587        return True
588    if value.startswith("FL") and value[2:5].isdigit():
589        return True
590    first, *_ = value.split("/")
591    return bool(first[-2:] == "FT" and first[-5:-2].isdigit())

Returns True if the value is a possible altitude

def make_altitude( value: str, units: avwx.structs.Units, repr: Optional[str] = None, force_fl: bool = False) -> Tuple[Optional[avwx.structs.Number], avwx.structs.Units]:
594def make_altitude(
595    value: str, units: Units, repr: Optional[str] = None, force_fl: bool = False
596) -> Tuple[Optional[Number], Units]:
597    """Convert altitude string into a number"""
598    if not value:
599        return None, units
600    raw = repr or value
601    for end in ("FT", "M"):
602        if value.endswith(end):
603            force_fl = False
604            units.altitude = end.lower()
605            # post 3.8 value = value.removesuffix(end)
606            value = value[: -len(end)]
607    # F430
608    if value[0] == "F" and value[1:].isdigit():
609        value = f"FL{value[1:]}"
610    if force_fl and value[:2] != "FL":
611        value = f"FL{value}"
612    return make_number(value, repr=raw), units

Convert altitude string into a number

def parse_date( date: str, hour_threshold: int = 200, time_only: bool = False, target: Optional[datetime.date] = None) -> Optional[datetime.datetime]:
615def parse_date(
616    date: str,
617    hour_threshold: int = 200,
618    time_only: bool = False,
619    target: Optional[dt.date] = None,
620) -> Optional[dt.datetime]:
621    """Parses a report timestamp in ddhhZ or ddhhmmZ format
622
623    If time_only, assumes hhmm format with current or previous day
624
625    This function assumes the given timestamp is within the hour threshold from current date
626    """
627    # pylint: disable=too-many-branches
628    # Format date string
629    date = date.strip("Z")
630    if not date.isdigit():
631        return None
632    if time_only:
633        if len(date) != 4:
634            return None
635        index_hour = 0
636    else:
637        if len(date) == 4:
638            date += "00"
639        if len(date) != 6:
640            return None
641        index_hour = 2
642    # Create initial guess
643    if target:
644        target = dt.datetime(
645            target.year, target.month, target.day, tzinfo=dt.timezone.utc
646        )
647    else:
648        target = dt.datetime.now(tz=dt.timezone.utc)
649    day = target.day if time_only else int(date[:2])
650    hour = int(date[index_hour : index_hour + 2])
651    # Handle situation where next month has less days than current month
652    # Shifted value makes sure that a month shift doesn't happen twice
653    shifted = False
654    if day > monthrange(target.year, target.month)[1]:
655        target += relativedelta(months=-1)
656        shifted = True
657    try:
658        guess = target.replace(
659            day=day,
660            hour=hour % 24,
661            minute=int(date[index_hour + 2 : index_hour + 4]) % 60,
662            second=0,
663            microsecond=0,
664        )
665    except ValueError:
666        return None
667    # Handle overflow hour
668    if hour > 23:
669        guess += dt.timedelta(days=1)
670    # Handle changing months if not already shifted
671    if not shifted:
672        hourdiff = (guess - target) / dt.timedelta(minutes=1) / 60
673        if hourdiff > hour_threshold:
674            guess += relativedelta(months=-1)
675        elif hourdiff < -hour_threshold:
676            guess += relativedelta(months=+1)
677    return guess

Parses a report timestamp in ddhhZ or ddhhmmZ format

If time_only, assumes hhmm format with current or previous day

This function assumes the given timestamp is within the hour threshold from current date

def make_timestamp( timestamp: Optional[str], time_only: bool = False, target_date: Optional[datetime.date] = None) -> Optional[avwx.structs.Timestamp]:
680def make_timestamp(
681    timestamp: Optional[str],
682    time_only: bool = False,
683    target_date: Optional[dt.date] = None,
684) -> Optional[Timestamp]:
685    """Returns a Timestamp dataclass for a report timestamp in ddhhZ or ddhhmmZ format"""
686    if not timestamp:
687        return None
688    date_obj = parse_date(timestamp, time_only=time_only, target=target_date)
689    return Timestamp(timestamp, date_obj)

Returns a Timestamp dataclass for a report timestamp in ddhhZ or ddhhmmZ format

def is_runway_visibility(item: str) -> bool:
692def is_runway_visibility(item: str) -> bool:
693    """Returns True if the item is a runway visibility range string"""
694    return (
695        len(item) > 4
696        and item[0] == "R"
697        and (item[3] == "/" or item[4] == "/")
698        and item[1:3].isdigit()
699        and "CLRD" not in item  # R28/CLRD70 Runway State
700    )

Returns True if the item is a runway visibility range string