avwx.parsing.remarks

Contains functions for handling and translating remarks

  1"""
  2Contains functions for handling and translating remarks
  3"""
  4
  5# pylint: disable=redefined-builtin
  6
  7# stdlib
  8from contextlib import suppress
  9from typing import List, Optional, Tuple
 10
 11# module
 12from avwx.parsing import core
 13from avwx.static.core import REMARKS_ELEMENTS, REMARKS_GROUPS, WX_TRANSLATIONS
 14from avwx.static.taf import PRESSURE_TENDENCIES
 15from avwx.structs import Code, FiveDigitCodes, Number, PressureTendency, RemarksData
 16
 17
 18Codes = List[str]
 19
 20
 21def decimal_code(code: str, repr: Optional[str] = None) -> Optional[Number]:
 22    """Parses a 4-digit decimal temperature representation
 23
 24    Ex: 1045 -> -4.5    0237 -> 23.7
 25    """
 26    if not code:
 27        return None
 28    number = f"{'-' if code[0] == '1' else ''}{int(code[1:3])}.{code[3]}"
 29    return core.make_number(number, repr or code)
 30
 31
 32def temp_dew_decimal(codes: Codes) -> Tuple[Codes, Optional[Number], Optional[Number]]:
 33    """Returns the decimal temperature and dewpoint values"""
 34    temp, dew = None, None
 35    for i, code in reversed(list(enumerate(codes))):
 36        if len(code) in {5, 9} and code[0] == "T" and code[1:].isdigit():
 37            codes.pop(i)
 38            temp, dew = decimal_code(code[1:5]), decimal_code(code[5:])
 39            break
 40    return codes, temp, dew
 41
 42
 43def temp_minmax(codes: Codes) -> Tuple[Codes, Optional[Number], Optional[Number]]:
 44    """Returns the 24-hour minimum and maximum temperatures"""
 45    maximum, minimum = None, None
 46    for i, code in enumerate(codes):
 47        if len(code) == 9 and code[0] == "4" and code.isdigit():
 48            maximum, minimum = decimal_code(code[1:5]), decimal_code(code[5:])
 49            codes.pop(i)
 50            break
 51    return codes, maximum, minimum
 52
 53
 54def precip_snow(codes: Codes) -> Tuple[Codes, Optional[Number], Optional[Number]]:
 55    """Returns the hourly precipitation and snow depth"""
 56    precip, snow = None, None
 57    for i, code in reversed(list(enumerate(codes))):
 58        if len(code) != 5:
 59            continue
 60        # P0213
 61        if code[0] == "P" and code[1:].isdigit():
 62            precip = core.make_number(f"{code[1:3]}.{code[3:]}", code)
 63            codes.pop(i)
 64        # 4/012
 65        elif code[:2] == "4/" and code[2:].isdigit():
 66            snow = core.make_number(code[2:], code)
 67            codes.pop(i)
 68    return codes, precip, snow
 69
 70
 71def sea_level_pressure(codes: Codes) -> Tuple[Codes, Optional[Number]]:
 72    """Returns the sea level pressure always in hPa"""
 73    sea = None
 74    for i, code in enumerate(codes):
 75        if len(code) == 6 and code.startswith("SLP") and code[-3:].isdigit():
 76            value = f"{'9' if int(code[-3]) > 4 else '10'}{code[-3:-1]}.{code[-1]}"
 77            sea = core.make_number(value, code)
 78            codes.pop(i)
 79            break
 80    return codes, sea
 81
 82
 83def parse_pressure(code: str) -> PressureTendency:
 84    """Parse a 5-digit pressure tendency"""
 85    return PressureTendency(
 86        repr=code,
 87        tendency=PRESSURE_TENDENCIES[code[1]],
 88        change=float(f"{code[2:4]}.{code[4]}"),
 89    )
 90
 91
 92def parse_precipitation(code: str) -> Optional[Number]:
 93    """Parse a 5-digit precipitation amount"""
 94    return core.make_number(f"{code[1:3]}.{code[3:]}", code)
 95
 96
 97def five_digit_codes(codes: Codes) -> Tuple[Codes, FiveDigitCodes]:
 98    """Returns  a 5-digit min/max temperature code"""
 99    values = FiveDigitCodes()
100    for i, code in reversed(list(enumerate(codes))):
101        if len(code) == 5 and code.isdigit():
102            key = int(code[0])
103            if key == 1:
104                values.maximum_temperature_6 = decimal_code(code[1:], code)
105            elif key == 2:
106                values.minimum_temperature_6 = decimal_code(code[1:], code)
107            elif key == 5:
108                values.pressure_tendency = parse_pressure(code)
109            elif key == 6:
110                values.precip_36_hours = parse_precipitation(code)
111            elif key == 7:
112                values.precip_24_hours = parse_precipitation(code)
113            elif key == 9:
114                values.sunshine_minutes = core.make_number(code[2:], code)
115            else:
116                continue
117            codes.pop(i)
118    return codes, values
119
120
121def find_codes(rmk: str) -> Tuple[Codes, List[Code]]:
122    """Find a remove known static codes from the starting remarks list"""
123    ret = []
124    for key, value in REMARKS_GROUPS.items():
125        if key in rmk:
126            ret.append(Code(key, value))
127            rmk.replace(key, "")
128    codes = [i for i in rmk.split() if i]
129    for i, code in reversed(list(enumerate(codes))):
130        with suppress(KeyError):
131            ret.append(Code(code, REMARKS_ELEMENTS[code]))
132            codes.pop(i)
133        # Weather began/ended
134        if (
135            len(code) == 5
136            and code[2] in ("B", "E")
137            and code[3:].isdigit()
138            and code[:2] in WX_TRANSLATIONS
139        ):
140            state = "began" if code[2] == "B" else "ended"
141            value = f"{WX_TRANSLATIONS[code[:2]]} {state} at :{code[3:]}"
142            ret.append(Code(code, value))
143            codes.pop(i)
144    ret.sort(key=lambda x: x.repr)
145    return codes, ret
146
147
148def parse(rmk: str) -> Optional[RemarksData]:
149    """Finds temperature and dewpoint decimal values from the remarks"""
150    if not rmk:
151        return None
152    codes, parsed_codes = find_codes(rmk)
153    codes, temperature, dewpoint = temp_dew_decimal(codes)
154    codes, max_temp, min_temp = temp_minmax(codes)
155    codes, precip, snow = precip_snow(codes)
156    codes, sea = sea_level_pressure(codes)
157    codes, fivedigits = five_digit_codes(codes)
158    return RemarksData(
159        codes=parsed_codes,
160        dewpoint_decimal=dewpoint,
161        temperature_decimal=temperature,
162        minimum_temperature_6=fivedigits.minimum_temperature_6,
163        minimum_temperature_24=min_temp,
164        maximum_temperature_6=fivedigits.maximum_temperature_6,
165        maximum_temperature_24=max_temp,
166        pressure_tendency=fivedigits.pressure_tendency,
167        precip_36_hours=fivedigits.precip_36_hours,
168        precip_24_hours=fivedigits.precip_24_hours,
169        sunshine_minutes=fivedigits.sunshine_minutes,
170        precip_hourly=precip,
171        snow_depth=snow,
172        sea_level_pressure=sea,
173    )
Codes = typing.List[str]
def decimal_code(code: str, repr: Optional[str] = None) -> Optional[avwx.structs.Number]:
22def decimal_code(code: str, repr: Optional[str] = None) -> Optional[Number]:
23    """Parses a 4-digit decimal temperature representation
24
25    Ex: 1045 -> -4.5    0237 -> 23.7
26    """
27    if not code:
28        return None
29    number = f"{'-' if code[0] == '1' else ''}{int(code[1:3])}.{code[3]}"
30    return core.make_number(number, repr or code)

Parses a 4-digit decimal temperature representation

Ex: 1045 -> -4.5 0237 -> 23.7

def temp_dew_decimal( codes: List[str]) -> Tuple[List[str], Optional[avwx.structs.Number], Optional[avwx.structs.Number]]:
33def temp_dew_decimal(codes: Codes) -> Tuple[Codes, Optional[Number], Optional[Number]]:
34    """Returns the decimal temperature and dewpoint values"""
35    temp, dew = None, None
36    for i, code in reversed(list(enumerate(codes))):
37        if len(code) in {5, 9} and code[0] == "T" and code[1:].isdigit():
38            codes.pop(i)
39            temp, dew = decimal_code(code[1:5]), decimal_code(code[5:])
40            break
41    return codes, temp, dew

Returns the decimal temperature and dewpoint values

def temp_minmax( codes: List[str]) -> Tuple[List[str], Optional[avwx.structs.Number], Optional[avwx.structs.Number]]:
44def temp_minmax(codes: Codes) -> Tuple[Codes, Optional[Number], Optional[Number]]:
45    """Returns the 24-hour minimum and maximum temperatures"""
46    maximum, minimum = None, None
47    for i, code in enumerate(codes):
48        if len(code) == 9 and code[0] == "4" and code.isdigit():
49            maximum, minimum = decimal_code(code[1:5]), decimal_code(code[5:])
50            codes.pop(i)
51            break
52    return codes, maximum, minimum

Returns the 24-hour minimum and maximum temperatures

def precip_snow( codes: List[str]) -> Tuple[List[str], Optional[avwx.structs.Number], Optional[avwx.structs.Number]]:
55def precip_snow(codes: Codes) -> Tuple[Codes, Optional[Number], Optional[Number]]:
56    """Returns the hourly precipitation and snow depth"""
57    precip, snow = None, None
58    for i, code in reversed(list(enumerate(codes))):
59        if len(code) != 5:
60            continue
61        # P0213
62        if code[0] == "P" and code[1:].isdigit():
63            precip = core.make_number(f"{code[1:3]}.{code[3:]}", code)
64            codes.pop(i)
65        # 4/012
66        elif code[:2] == "4/" and code[2:].isdigit():
67            snow = core.make_number(code[2:], code)
68            codes.pop(i)
69    return codes, precip, snow

Returns the hourly precipitation and snow depth

def sea_level_pressure(codes: List[str]) -> Tuple[List[str], Optional[avwx.structs.Number]]:
72def sea_level_pressure(codes: Codes) -> Tuple[Codes, Optional[Number]]:
73    """Returns the sea level pressure always in hPa"""
74    sea = None
75    for i, code in enumerate(codes):
76        if len(code) == 6 and code.startswith("SLP") and code[-3:].isdigit():
77            value = f"{'9' if int(code[-3]) > 4 else '10'}{code[-3:-1]}.{code[-1]}"
78            sea = core.make_number(value, code)
79            codes.pop(i)
80            break
81    return codes, sea

Returns the sea level pressure always in hPa

def parse_pressure(code: str) -> avwx.structs.PressureTendency:
84def parse_pressure(code: str) -> PressureTendency:
85    """Parse a 5-digit pressure tendency"""
86    return PressureTendency(
87        repr=code,
88        tendency=PRESSURE_TENDENCIES[code[1]],
89        change=float(f"{code[2:4]}.{code[4]}"),
90    )

Parse a 5-digit pressure tendency

def parse_precipitation(code: str) -> Optional[avwx.structs.Number]:
93def parse_precipitation(code: str) -> Optional[Number]:
94    """Parse a 5-digit precipitation amount"""
95    return core.make_number(f"{code[1:3]}.{code[3:]}", code)

Parse a 5-digit precipitation amount

def five_digit_codes(codes: List[str]) -> Tuple[List[str], avwx.structs.FiveDigitCodes]:
 98def five_digit_codes(codes: Codes) -> Tuple[Codes, FiveDigitCodes]:
 99    """Returns  a 5-digit min/max temperature code"""
100    values = FiveDigitCodes()
101    for i, code in reversed(list(enumerate(codes))):
102        if len(code) == 5 and code.isdigit():
103            key = int(code[0])
104            if key == 1:
105                values.maximum_temperature_6 = decimal_code(code[1:], code)
106            elif key == 2:
107                values.minimum_temperature_6 = decimal_code(code[1:], code)
108            elif key == 5:
109                values.pressure_tendency = parse_pressure(code)
110            elif key == 6:
111                values.precip_36_hours = parse_precipitation(code)
112            elif key == 7:
113                values.precip_24_hours = parse_precipitation(code)
114            elif key == 9:
115                values.sunshine_minutes = core.make_number(code[2:], code)
116            else:
117                continue
118            codes.pop(i)
119    return codes, values

Returns a 5-digit min/max temperature code

def find_codes(rmk: str) -> Tuple[List[str], List[avwx.structs.Code]]:
122def find_codes(rmk: str) -> Tuple[Codes, List[Code]]:
123    """Find a remove known static codes from the starting remarks list"""
124    ret = []
125    for key, value in REMARKS_GROUPS.items():
126        if key in rmk:
127            ret.append(Code(key, value))
128            rmk.replace(key, "")
129    codes = [i for i in rmk.split() if i]
130    for i, code in reversed(list(enumerate(codes))):
131        with suppress(KeyError):
132            ret.append(Code(code, REMARKS_ELEMENTS[code]))
133            codes.pop(i)
134        # Weather began/ended
135        if (
136            len(code) == 5
137            and code[2] in ("B", "E")
138            and code[3:].isdigit()
139            and code[:2] in WX_TRANSLATIONS
140        ):
141            state = "began" if code[2] == "B" else "ended"
142            value = f"{WX_TRANSLATIONS[code[:2]]} {state} at :{code[3:]}"
143            ret.append(Code(code, value))
144            codes.pop(i)
145    ret.sort(key=lambda x: x.repr)
146    return codes, ret

Find a remove known static codes from the starting remarks list

def parse(rmk: str) -> Optional[avwx.structs.RemarksData]:
149def parse(rmk: str) -> Optional[RemarksData]:
150    """Finds temperature and dewpoint decimal values from the remarks"""
151    if not rmk:
152        return None
153    codes, parsed_codes = find_codes(rmk)
154    codes, temperature, dewpoint = temp_dew_decimal(codes)
155    codes, max_temp, min_temp = temp_minmax(codes)
156    codes, precip, snow = precip_snow(codes)
157    codes, sea = sea_level_pressure(codes)
158    codes, fivedigits = five_digit_codes(codes)
159    return RemarksData(
160        codes=parsed_codes,
161        dewpoint_decimal=dewpoint,
162        temperature_decimal=temperature,
163        minimum_temperature_6=fivedigits.minimum_temperature_6,
164        minimum_temperature_24=min_temp,
165        maximum_temperature_6=fivedigits.maximum_temperature_6,
166        maximum_temperature_24=max_temp,
167        pressure_tendency=fivedigits.pressure_tendency,
168        precip_36_hours=fivedigits.precip_36_hours,
169        precip_24_hours=fivedigits.precip_24_hours,
170        sunshine_minutes=fivedigits.sunshine_minutes,
171        precip_hourly=precip,
172        snow_depth=snow,
173        sea_level_pressure=sea,
174    )

Finds temperature and dewpoint decimal values from the remarks