avwx.current.metar

A METAR (Meteorological Aerodrome Report) is the surface weather observed at most controlled (and some uncontrolled) airports. They are updated once per hour or when conditions change enough to warrant an update, and the observations are valid for one hour after the report was issued or until the next report is issued.

  1"""
  2A METAR (Meteorological Aerodrome Report) is the surface weather observed at
  3most controlled (and some uncontrolled) airports. They are updated once per
  4hour or when conditions change enough to warrant an update, and the
  5observations are valid for one hour after the report was issued or until the
  6next report is issued.
  7"""
  8
  9# pylint: disable=invalid-overridden-method
 10
 11# stdlib
 12from contextlib import suppress
 13from datetime import date, datetime, timedelta, timezone
 14from typing import List, Tuple, Optional
 15
 16# module
 17from avwx.current.base import Report, get_wx_codes
 18from avwx.parsing import core, remarks, speech, summary
 19from avwx.parsing.sanitization.metar import clean_metar_list, clean_metar_string
 20from avwx.parsing.translate.metar import translate_metar
 21from avwx.service import NOAA
 22from avwx.static.core import FLIGHT_RULES
 23from avwx.static.metar import METAR_RMK
 24from avwx.station import uses_na_format, valid_station
 25from avwx.structs import (
 26    Code,
 27    MetarData,
 28    MetarTrans,
 29    Number,
 30    RemarksData,
 31    RunwayVisibility,
 32    Sanitization,
 33    Units,
 34)
 35
 36
 37class Metar(Report):
 38    """
 39    The Metar class offers an object-oriented approach to managing METAR data
 40    for a single station.
 41
 42    Below is typical usage for fetching and pulling METAR data for KJFK.
 43
 44    ```python
 45    >>> from avwx import Metar
 46    >>> kjfk = Metar("KJFK")
 47    >>> kjfk.station.name
 48    'John F Kennedy International Airport'
 49    >>> kjfk.update()
 50    True
 51    >>> kjfk.last_updated
 52    datetime.datetime(2018, 3, 4, 23, 36, 6, 62376)
 53    >>> kjfk.raw
 54    'KJFK 042251Z 32023G32KT 10SM BKN060 04/M08 A3008 RMK AO2 PK WND 32032/2251 SLP184 T00441078'
 55    >>> kjfk.data.flight_rules
 56    'VFR'
 57    >>> kjfk.translations.remarks
 58    {'AO2': 'Automated with precipitation sensor', 'SLP184': 'Sea level pressure: 1018.4 hPa', 'T00441078': 'Temperature 4.4°C and dewpoint -7.8°C'}
 59    ```
 60
 61    The `parse` and `from_report` methods can parse a report string if you want
 62    to override the normal fetching process. Here's an example of a really bad
 63    day.
 64
 65    ```python
 66    >>> from avwx import Metar
 67    >>> report = 'KSFO 031254Z 36024G55KT 320V040 1/8SM R06/0200D +TS VCFC OVC050 BKN040TCU 14/10 A2978 RMK AIRPORT CLOSED'
 68    >>> ksfo = Metar.from_report(report)
 69    True
 70    >>> ksfo.station.city
 71    'San Francisco'
 72    >>> ksfo.last_updated
 73    datetime.datetime(2018, 3, 4, 23, 54, 4, 353757, tzinfo=datetime.timezone.utc)
 74    >>> ksfo.data.flight_rules
 75    'LIFR'
 76    >>> ksfo.translations.clouds
 77    'Broken layer at 4000ft (Towering Cumulus), Overcast layer at 5000ft - Reported AGL'
 78    >>> ksfo.summary
 79    'Winds N-360 (variable 320 to 040) at 24kt gusting to 55kt, Vis 0.125sm, Temp 14C, Dew 10C, Alt 29.78inHg, Heavy Thunderstorm, Vicinity Funnel Cloud, Broken layer at 4000ft (Towering Cumulus), Overcast layer at 5000ft'
 80    ```
 81    """
 82
 83    data: Optional[MetarData] = None
 84    translations: Optional[MetarTrans] = None
 85
 86    async def _pull_from_default(self) -> None:
 87        """Checks for a more recent report from NOAA. Only sync"""
 88        service = NOAA(self.__class__.__name__.lower())
 89        if self.code is None:
 90            return
 91        report = await service.async_fetch(self.code)
 92        if report is not None:
 93            data, units, sans = parse(self.code, report, self.issued)
 94            if not data or data.time is None or data.time.dt is None:
 95                return
 96            if (
 97                not self.data
 98                or self.data.time is None
 99                or self.data.time.dt is None
100                or data.time.dt > self.data.time.dt
101            ):
102                self.data, self.units, self.sanitization = data, units, sans
103                self.source = service.root
104
105    @property
106    def _should_check_default(self) -> bool:
107        """Returns True if pulled from regional source and potentially out of date"""
108        if isinstance(self.service, NOAA) or self.source is None:
109            return False
110
111        if self.data is None or self.data.time is None or self.data.time.dt is None:
112            return True
113        time_since = datetime.now(tz=timezone.utc) - self.data.time.dt
114        return time_since > timedelta(minutes=90)
115
116    def _calculate_altitudes(self) -> None:
117        """Adds the pressure and density altitudes to data if all fields are available"""
118        if self.data is None or self.station is None or self.units is None:
119            return
120        # Select decimal temperature if available
121        temp = self.data.temperature
122        if self.data.remarks_info is not None:
123            temp = self.data.remarks_info.temperature_decimal or temp
124        alt = self.data.altimeter
125        if temp is None or temp.value is None or alt is None or alt.value is None:
126            return
127        elev = self.station.elevation_ft
128        if elev is None:
129            return
130        self.data.pressure_altitude = core.pressure_altitude(
131            alt.value, elev, self.units.altimeter
132        )
133        self.data.density_altitude = core.density_altitude(
134            alt.value, temp.value, elev, self.units
135        )
136
137    async def _post_update(self) -> None:
138        if self.code is None or self.raw is None:
139            return
140        self.data, self.units, self.sanitization = parse(
141            self.code, self.raw, self.issued
142        )
143        if self._should_check_default:
144            await self._pull_from_default()
145        if self.data is None or self.units is None:
146            return
147        self._calculate_altitudes()
148        self.translations = translate_metar(self.data, self.units)
149
150    def _post_parse(self) -> None:
151        if self.code is None or self.raw is None:
152            return
153        self.data, self.units, self.sanitization = parse(
154            self.code, self.raw, self.issued
155        )
156        if self.data is None or self.units is None:
157            return
158        self._calculate_altitudes()
159        self.translations = translate_metar(self.data, self.units)
160
161    @staticmethod
162    def sanitize(report: str) -> str:
163        """Sanitizes a METAR string"""
164        return sanitize(report)[0]
165
166    @property
167    def summary(self) -> Optional[str]:
168        """Condensed report summary created from translations"""
169        if not self.translations:
170            self.update()
171        return None if self.translations is None else summary.metar(self.translations)
172
173    @property
174    def speech(self) -> Optional[str]:
175        """Report summary designed to be read by a text-to-speech program"""
176        if not self.data:
177            self.update()
178        if self.data is None or self.units is None:
179            return None
180        return speech.metar(self.data, self.units)
181
182
183def get_remarks(txt: str) -> Tuple[List[str], str]:
184    """Returns the report split into components and the remarks string
185
186    Remarks can include items like RMK and on, NOSIG and on, and BECMG and on
187    """
188    txt = txt.replace("?", "").strip()
189    # First look for Altimeter in txt
190    alt_index = len(txt) + 1
191    for item in [" A2", " A3", " Q1", " Q0", " Q9"]:
192        index = txt.find(item)
193        if len(txt) - 6 > index > -1 and txt[index + 2 : index + 6].isdigit():
194            alt_index = index
195    # Then look for earliest remarks 'signifier'
196    sig_index = core.find_first_in_list(txt, METAR_RMK)
197    if sig_index == -1:
198        sig_index = len(txt) + 1
199    if sig_index > alt_index > -1:
200        return txt[: alt_index + 6].strip().split(), txt[alt_index + 7 :]
201    if alt_index > sig_index > -1:
202        return txt[:sig_index].strip().split(), txt[sig_index + 1 :]
203    return txt.strip().split(), ""
204
205
206_RVR_CODES = {
207    "M": "less than",
208    "A": "greater than",
209    "P": "greater than",
210    "U": "increasing",
211    "I": "increasing",
212    "D": "decreasing",
213    "F": "decreasing",
214    "R": "decreasing",
215    "N": "no change",
216    "V": "variable",
217}
218
219
220def _parse_rvr_number(value: str) -> Optional[Number]:
221    if not value:
222        return None
223    raw, prefix = value, None
224    with suppress(KeyError):
225        prefix = _RVR_CODES[value[0]]
226        value = value[1:]
227    number = core.make_number(value, raw)
228    if number is not None and prefix is not None:
229        number.spoken = f"{prefix} {number.spoken}"
230        number.value = None
231    return number
232
233
234def parse_runway_visibility(value: str) -> RunwayVisibility:
235    """Parse a runway visibility range string"""
236    raw, trend = value, None
237    # TODO: update to check and convert units post visibility parse
238    value = value.replace("FT", "")
239    with suppress(KeyError):
240        trend = Code(value[-1], _RVR_CODES[value[-1]])
241        value = value[:-1]
242    runway, value, *_ = value[1:].split("/")
243    if value:
244        possible_numbers = [_parse_rvr_number(n) for n in value.split("V")]
245        numbers = [n for n in possible_numbers if n is not None]
246        visibility = numbers.pop() if len(numbers) == 1 else None
247    else:
248        visibility, numbers = None, []
249    return RunwayVisibility(
250        repr=raw,
251        runway=runway,
252        visibility=visibility,
253        variable_visibility=numbers,
254        trend=trend,
255    )
256
257
258def get_runway_visibility(data: List[str]) -> Tuple[List[str], List[RunwayVisibility]]:
259    """Returns the report list and the remove runway visibility list"""
260    runway_vis = [
261        parse_runway_visibility(data.pop(i))
262        for i, item in reversed(list(enumerate(data)))
263        if core.is_runway_visibility(item)
264    ]
265    runway_vis.sort(key=lambda x: x.runway)
266    return data, runway_vis
267
268
269def parse_altimeter(value: str) -> Optional[Number]:
270    """Parse an altimeter string into a Number"""
271    if not value or len(value) < 4:
272        return None
273    # QNH3003INS
274    if len(value) >= 7 and value.endswith("INS"):
275        return core.make_number(f"{value[-7:-5]}.{value[-5:-3]}", value, literal=True)
276    number = value.replace(".", "")
277    # Q1000/10
278    if "/" in number:
279        number = number.split("/")[0]
280    if number.startswith("QNH"):
281        number = f"Q{number[1:]}"
282    if not (len(number) in {4, 5} and number[-4:].isdigit()):
283        return None
284    number = number.lstrip("AQ")
285    if number[0] in ("2", "3"):
286        number = f"{number[:2]}.{number[2:]}"
287    elif number[0] not in ("0", "1"):
288        return None
289    return core.make_number(number, value, number, literal=True)
290
291
292def get_altimeter(
293    data: List[str], units: Units, version: str = "NA"
294) -> Tuple[List[str], Optional[Number]]:
295    """Returns the report list and the removed altimeter item
296
297    Version is 'NA' (North American / default) or 'IN' (International)
298    """
299    values: List[Number] = []
300    for _ in range(2):
301        if not data:
302            break
303        value = parse_altimeter(data[-1])
304        if value is None:
305            break
306        values.append(value)
307        data.pop(-1)
308    if not values:
309        return data, None
310    values.sort(key=lambda x: x.value or 0)
311    altimeter = values[0 if version == "NA" else -1]
312    if altimeter.value is not None:
313        units.altimeter = "inHg" if altimeter.value < 100 else "hPa"
314    return data, altimeter
315
316
317def get_temp_and_dew(
318    data: List[str],
319) -> Tuple[List[str], Optional[Number], Optional[Number]]:
320    """Returns the report list and removed temperature and dewpoint strings"""
321    for i, item in reversed(list(enumerate(data))):
322        if "/" in item:
323            # ///07
324            if item[0] == "/":
325                item = "/" + item.lstrip("/")
326            # 07///
327            elif item[-1] == "/":
328                item = item.rstrip("/") + "/"
329            tempdew = item.split("/")
330            if len(tempdew) != 2:
331                continue
332            valid = True
333            for j, temp in enumerate(tempdew):
334                if temp in ["MM", "XX"]:
335                    tempdew[j] = ""
336                elif not core.is_possible_temp(temp):
337                    valid = False
338                    break
339            if valid:
340                data.pop(i)
341                temp, dew = tempdew
342                return data, core.make_number(temp), core.make_number(dew)
343    return data, None, None
344
345
346def get_relative_humidity(
347    temperature: Optional[Number],
348    dewpoint: Optional[Number],
349    remarks_info: Optional[RemarksData],
350    units: Units,
351) -> Optional[float]:
352    """Calculates relative humidity from preferred temperature and dewpoint"""
353    if remarks_info is not None:
354        temp = remarks_info.temperature_decimal or temperature
355        dew = remarks_info.dewpoint_decimal or dewpoint
356    else:
357        temp = temperature
358        dew = dewpoint
359    if temp is None or temp.value is None:
360        return None
361    if dew is None or dew.value is None:
362        return None
363    return core.relative_humidity(temp.value, dew.value, units.temperature)
364
365
366def sanitize(report: str) -> Tuple[str, str, List[str], Sanitization]:
367    """Returns a sanitized report, remarks, and elements ready for parsing"""
368    sans = Sanitization()
369    clean = clean_metar_string(report, sans)
370    data, remark_str = get_remarks(clean)
371    data = core.dedupe(data)
372    data = clean_metar_list(data, sans)
373    clean = " ".join(data)
374    if remark_str:
375        clean += f" {remark_str}"
376    return clean, remark_str, data, sans
377
378
379def parse(
380    station: str,
381    report: str,
382    issued: Optional[date] = None,
383    use_na: Optional[bool] = None,
384) -> Tuple[Optional[MetarData], Optional[Units], Optional[Sanitization]]:
385    """Returns MetarData and Units dataclasses with parsed data and their associated units"""
386    valid_station(station)
387    if not report:
388        return None, None, None
389    if use_na is None:
390        use_na = uses_na_format(station[:2])
391    parser = parse_na if use_na else parse_in
392    return parser(report, issued)
393
394
395def parse_na(
396    report: str, issued: Optional[date] = None
397) -> Tuple[MetarData, Units, Sanitization]:
398    """Parser for the North American METAR variant"""
399    # pylint: disable=too-many-locals
400    units = Units.north_american()
401    sanitized, remarks_str, data, sans = sanitize(report)
402    data, station, time = core.get_station_and_time(data)
403    data, runway_visibility = get_runway_visibility(data)
404    data, clouds = core.get_clouds(data)
405    (
406        data,
407        wind_direction,
408        wind_speed,
409        wind_gust,
410        wind_variable_direction,
411    ) = core.get_wind(data, units)
412    data, altimeter = get_altimeter(data, units, "NA")
413    data, visibility = core.get_visibility(data, units)
414    data, temperature, dewpoint = get_temp_and_dew(data)
415    condition = core.get_flight_rules(visibility, core.get_ceiling(clouds))
416    other, wx_codes = get_wx_codes(data)
417    remarks_info = remarks.parse(remarks_str)
418    humidity = get_relative_humidity(temperature, dewpoint, remarks_info, units)
419    struct = MetarData(
420        altimeter=altimeter,
421        clouds=clouds,
422        dewpoint=dewpoint,
423        flight_rules=FLIGHT_RULES[condition],
424        other=other,
425        raw=report,
426        relative_humidity=humidity,
427        remarks_info=remarks_info,
428        remarks=remarks_str,
429        runway_visibility=runway_visibility,
430        sanitized=sanitized,
431        station=station,
432        temperature=temperature,
433        time=core.make_timestamp(time, target_date=issued),
434        visibility=visibility,
435        wind_direction=wind_direction,
436        wind_gust=wind_gust,
437        wind_speed=wind_speed,
438        wind_variable_direction=wind_variable_direction,
439        wx_codes=wx_codes,
440    )
441    return struct, units, sans
442
443
444def parse_in(
445    report: str, issued: Optional[date] = None
446) -> Tuple[MetarData, Units, Sanitization]:
447    """Parser for the International METAR variant"""
448    # pylint: disable=too-many-locals
449    units = Units.international()
450    sanitized, remarks_str, data, sans = sanitize(report)
451    data, station, time = core.get_station_and_time(data)
452    data, runway_visibility = get_runway_visibility(data)
453    if "CAVOK" not in data:
454        data, clouds = core.get_clouds(data)
455    (
456        data,
457        wind_direction,
458        wind_speed,
459        wind_gust,
460        wind_variable_direction,
461    ) = core.get_wind(data, units)
462    data, altimeter = get_altimeter(data, units, "IN")
463    if "CAVOK" in data:
464        visibility = core.make_number("CAVOK")
465        clouds = []
466        data.remove("CAVOK")
467    else:
468        data, visibility = core.get_visibility(data, units)
469    data, temperature, dewpoint = get_temp_and_dew(data)
470    condition = core.get_flight_rules(visibility, core.get_ceiling(clouds))
471    other, wx_codes = get_wx_codes(data)
472    remarks_info = remarks.parse(remarks_str)
473    humidity = get_relative_humidity(temperature, dewpoint, remarks_info, units)
474    struct = MetarData(
475        altimeter=altimeter,
476        clouds=clouds,
477        dewpoint=dewpoint,
478        flight_rules=FLIGHT_RULES[condition],
479        other=other,
480        raw=report,
481        relative_humidity=humidity,
482        remarks_info=remarks_info,
483        remarks=remarks_str,
484        runway_visibility=runway_visibility,
485        sanitized=sanitized,
486        station=station,
487        temperature=temperature,
488        time=core.make_timestamp(time, target_date=issued),
489        visibility=visibility,
490        wind_direction=wind_direction,
491        wind_gust=wind_gust,
492        wind_speed=wind_speed,
493        wind_variable_direction=wind_variable_direction,
494        wx_codes=wx_codes,
495    )
496    return struct, units, sans
class Metar(avwx.current.base.Report):
 38class Metar(Report):
 39    """
 40    The Metar class offers an object-oriented approach to managing METAR data
 41    for a single station.
 42
 43    Below is typical usage for fetching and pulling METAR data for KJFK.
 44
 45    ```python
 46    >>> from avwx import Metar
 47    >>> kjfk = Metar("KJFK")
 48    >>> kjfk.station.name
 49    'John F Kennedy International Airport'
 50    >>> kjfk.update()
 51    True
 52    >>> kjfk.last_updated
 53    datetime.datetime(2018, 3, 4, 23, 36, 6, 62376)
 54    >>> kjfk.raw
 55    'KJFK 042251Z 32023G32KT 10SM BKN060 04/M08 A3008 RMK AO2 PK WND 32032/2251 SLP184 T00441078'
 56    >>> kjfk.data.flight_rules
 57    'VFR'
 58    >>> kjfk.translations.remarks
 59    {'AO2': 'Automated with precipitation sensor', 'SLP184': 'Sea level pressure: 1018.4 hPa', 'T00441078': 'Temperature 4.4°C and dewpoint -7.8°C'}
 60    ```
 61
 62    The `parse` and `from_report` methods can parse a report string if you want
 63    to override the normal fetching process. Here's an example of a really bad
 64    day.
 65
 66    ```python
 67    >>> from avwx import Metar
 68    >>> report = 'KSFO 031254Z 36024G55KT 320V040 1/8SM R06/0200D +TS VCFC OVC050 BKN040TCU 14/10 A2978 RMK AIRPORT CLOSED'
 69    >>> ksfo = Metar.from_report(report)
 70    True
 71    >>> ksfo.station.city
 72    'San Francisco'
 73    >>> ksfo.last_updated
 74    datetime.datetime(2018, 3, 4, 23, 54, 4, 353757, tzinfo=datetime.timezone.utc)
 75    >>> ksfo.data.flight_rules
 76    'LIFR'
 77    >>> ksfo.translations.clouds
 78    'Broken layer at 4000ft (Towering Cumulus), Overcast layer at 5000ft - Reported AGL'
 79    >>> ksfo.summary
 80    'Winds N-360 (variable 320 to 040) at 24kt gusting to 55kt, Vis 0.125sm, Temp 14C, Dew 10C, Alt 29.78inHg, Heavy Thunderstorm, Vicinity Funnel Cloud, Broken layer at 4000ft (Towering Cumulus), Overcast layer at 5000ft'
 81    ```
 82    """
 83
 84    data: Optional[MetarData] = None
 85    translations: Optional[MetarTrans] = None
 86
 87    async def _pull_from_default(self) -> None:
 88        """Checks for a more recent report from NOAA. Only sync"""
 89        service = NOAA(self.__class__.__name__.lower())
 90        if self.code is None:
 91            return
 92        report = await service.async_fetch(self.code)
 93        if report is not None:
 94            data, units, sans = parse(self.code, report, self.issued)
 95            if not data or data.time is None or data.time.dt is None:
 96                return
 97            if (
 98                not self.data
 99                or self.data.time is None
100                or self.data.time.dt is None
101                or data.time.dt > self.data.time.dt
102            ):
103                self.data, self.units, self.sanitization = data, units, sans
104                self.source = service.root
105
106    @property
107    def _should_check_default(self) -> bool:
108        """Returns True if pulled from regional source and potentially out of date"""
109        if isinstance(self.service, NOAA) or self.source is None:
110            return False
111
112        if self.data is None or self.data.time is None or self.data.time.dt is None:
113            return True
114        time_since = datetime.now(tz=timezone.utc) - self.data.time.dt
115        return time_since > timedelta(minutes=90)
116
117    def _calculate_altitudes(self) -> None:
118        """Adds the pressure and density altitudes to data if all fields are available"""
119        if self.data is None or self.station is None or self.units is None:
120            return
121        # Select decimal temperature if available
122        temp = self.data.temperature
123        if self.data.remarks_info is not None:
124            temp = self.data.remarks_info.temperature_decimal or temp
125        alt = self.data.altimeter
126        if temp is None or temp.value is None or alt is None or alt.value is None:
127            return
128        elev = self.station.elevation_ft
129        if elev is None:
130            return
131        self.data.pressure_altitude = core.pressure_altitude(
132            alt.value, elev, self.units.altimeter
133        )
134        self.data.density_altitude = core.density_altitude(
135            alt.value, temp.value, elev, self.units
136        )
137
138    async def _post_update(self) -> None:
139        if self.code is None or self.raw is None:
140            return
141        self.data, self.units, self.sanitization = parse(
142            self.code, self.raw, self.issued
143        )
144        if self._should_check_default:
145            await self._pull_from_default()
146        if self.data is None or self.units is None:
147            return
148        self._calculate_altitudes()
149        self.translations = translate_metar(self.data, self.units)
150
151    def _post_parse(self) -> None:
152        if self.code is None or self.raw is None:
153            return
154        self.data, self.units, self.sanitization = parse(
155            self.code, self.raw, self.issued
156        )
157        if self.data is None or self.units is None:
158            return
159        self._calculate_altitudes()
160        self.translations = translate_metar(self.data, self.units)
161
162    @staticmethod
163    def sanitize(report: str) -> str:
164        """Sanitizes a METAR string"""
165        return sanitize(report)[0]
166
167    @property
168    def summary(self) -> Optional[str]:
169        """Condensed report summary created from translations"""
170        if not self.translations:
171            self.update()
172        return None if self.translations is None else summary.metar(self.translations)
173
174    @property
175    def speech(self) -> Optional[str]:
176        """Report summary designed to be read by a text-to-speech program"""
177        if not self.data:
178            self.update()
179        if self.data is None or self.units is None:
180            return None
181        return speech.metar(self.data, self.units)

The Metar class offers an object-oriented approach to managing METAR data for a single station.

Below is typical usage for fetching and pulling METAR data for KJFK.

>>> from avwx import Metar
>>> kjfk = Metar("KJFK")
>>> kjfk.station.name
'John F Kennedy International Airport'
>>> kjfk.update()
True
>>> kjfk.last_updated
datetime.datetime(2018, 3, 4, 23, 36, 6, 62376)
>>> kjfk.raw
'KJFK 042251Z 32023G32KT 10SM BKN060 04/M08 A3008 RMK AO2 PK WND 32032/2251 SLP184 T00441078'
>>> kjfk.data.flight_rules
'VFR'
>>> kjfk.translations.remarks
{'AO2': 'Automated with precipitation sensor', 'SLP184': 'Sea level pressure: 1018.4 hPa', 'T00441078': 'Temperature 4.4°C and dewpoint -7.8°C'}

The parse and from_report methods can parse a report string if you want to override the normal fetching process. Here's an example of a really bad day.

>>> from avwx import Metar
>>> report = 'KSFO 031254Z 36024G55KT 320V040 1/8SM R06/0200D +TS VCFC OVC050 BKN040TCU 14/10 A2978 RMK AIRPORT CLOSED'
>>> ksfo = Metar.from_report(report)
True
>>> ksfo.station.city
'San Francisco'
>>> ksfo.last_updated
datetime.datetime(2018, 3, 4, 23, 54, 4, 353757, tzinfo=datetime.timezone.utc)
>>> ksfo.data.flight_rules
'LIFR'
>>> ksfo.translations.clouds
'Broken layer at 4000ft (Towering Cumulus), Overcast layer at 5000ft - Reported AGL'
>>> ksfo.summary
'Winds N-360 (variable 320 to 040) at 24kt gusting to 55kt, Vis 0.125sm, Temp 14C, Dew 10C, Alt 29.78inHg, Heavy Thunderstorm, Vicinity Funnel Cloud, Broken layer at 4000ft (Towering Cumulus), Overcast layer at 5000ft'
data: Optional[avwx.structs.MetarData] = None
translations: Optional[avwx.structs.MetarTrans] = None
@staticmethod
def sanitize(report: str) -> str:
162    @staticmethod
163    def sanitize(report: str) -> str:
164        """Sanitizes a METAR string"""
165        return sanitize(report)[0]

Sanitizes a METAR string

summary: Optional[str]

Condensed report summary created from translations

speech: Optional[str]

Report summary designed to be read by a text-to-speech program

def get_remarks(txt: str) -> Tuple[List[str], str]:
184def get_remarks(txt: str) -> Tuple[List[str], str]:
185    """Returns the report split into components and the remarks string
186
187    Remarks can include items like RMK and on, NOSIG and on, and BECMG and on
188    """
189    txt = txt.replace("?", "").strip()
190    # First look for Altimeter in txt
191    alt_index = len(txt) + 1
192    for item in [" A2", " A3", " Q1", " Q0", " Q9"]:
193        index = txt.find(item)
194        if len(txt) - 6 > index > -1 and txt[index + 2 : index + 6].isdigit():
195            alt_index = index
196    # Then look for earliest remarks 'signifier'
197    sig_index = core.find_first_in_list(txt, METAR_RMK)
198    if sig_index == -1:
199        sig_index = len(txt) + 1
200    if sig_index > alt_index > -1:
201        return txt[: alt_index + 6].strip().split(), txt[alt_index + 7 :]
202    if alt_index > sig_index > -1:
203        return txt[:sig_index].strip().split(), txt[sig_index + 1 :]
204    return txt.strip().split(), ""

Returns the report split into components and the remarks string

Remarks can include items like RMK and on, NOSIG and on, and BECMG and on

def parse_runway_visibility(value: str) -> avwx.structs.RunwayVisibility:
235def parse_runway_visibility(value: str) -> RunwayVisibility:
236    """Parse a runway visibility range string"""
237    raw, trend = value, None
238    # TODO: update to check and convert units post visibility parse
239    value = value.replace("FT", "")
240    with suppress(KeyError):
241        trend = Code(value[-1], _RVR_CODES[value[-1]])
242        value = value[:-1]
243    runway, value, *_ = value[1:].split("/")
244    if value:
245        possible_numbers = [_parse_rvr_number(n) for n in value.split("V")]
246        numbers = [n for n in possible_numbers if n is not None]
247        visibility = numbers.pop() if len(numbers) == 1 else None
248    else:
249        visibility, numbers = None, []
250    return RunwayVisibility(
251        repr=raw,
252        runway=runway,
253        visibility=visibility,
254        variable_visibility=numbers,
255        trend=trend,
256    )

Parse a runway visibility range string

def get_runway_visibility(data: List[str]) -> Tuple[List[str], List[avwx.structs.RunwayVisibility]]:
259def get_runway_visibility(data: List[str]) -> Tuple[List[str], List[RunwayVisibility]]:
260    """Returns the report list and the remove runway visibility list"""
261    runway_vis = [
262        parse_runway_visibility(data.pop(i))
263        for i, item in reversed(list(enumerate(data)))
264        if core.is_runway_visibility(item)
265    ]
266    runway_vis.sort(key=lambda x: x.runway)
267    return data, runway_vis

Returns the report list and the remove runway visibility list

def parse_altimeter(value: str) -> Optional[avwx.structs.Number]:
270def parse_altimeter(value: str) -> Optional[Number]:
271    """Parse an altimeter string into a Number"""
272    if not value or len(value) < 4:
273        return None
274    # QNH3003INS
275    if len(value) >= 7 and value.endswith("INS"):
276        return core.make_number(f"{value[-7:-5]}.{value[-5:-3]}", value, literal=True)
277    number = value.replace(".", "")
278    # Q1000/10
279    if "/" in number:
280        number = number.split("/")[0]
281    if number.startswith("QNH"):
282        number = f"Q{number[1:]}"
283    if not (len(number) in {4, 5} and number[-4:].isdigit()):
284        return None
285    number = number.lstrip("AQ")
286    if number[0] in ("2", "3"):
287        number = f"{number[:2]}.{number[2:]}"
288    elif number[0] not in ("0", "1"):
289        return None
290    return core.make_number(number, value, number, literal=True)

Parse an altimeter string into a Number

def get_altimeter( data: List[str], units: avwx.structs.Units, version: str = 'NA') -> Tuple[List[str], Optional[avwx.structs.Number]]:
293def get_altimeter(
294    data: List[str], units: Units, version: str = "NA"
295) -> Tuple[List[str], Optional[Number]]:
296    """Returns the report list and the removed altimeter item
297
298    Version is 'NA' (North American / default) or 'IN' (International)
299    """
300    values: List[Number] = []
301    for _ in range(2):
302        if not data:
303            break
304        value = parse_altimeter(data[-1])
305        if value is None:
306            break
307        values.append(value)
308        data.pop(-1)
309    if not values:
310        return data, None
311    values.sort(key=lambda x: x.value or 0)
312    altimeter = values[0 if version == "NA" else -1]
313    if altimeter.value is not None:
314        units.altimeter = "inHg" if altimeter.value < 100 else "hPa"
315    return data, altimeter

Returns the report list and the removed altimeter item

Version is 'NA' (North American / default) or 'IN' (International)

def get_temp_and_dew( data: List[str]) -> Tuple[List[str], Optional[avwx.structs.Number], Optional[avwx.structs.Number]]:
318def get_temp_and_dew(
319    data: List[str],
320) -> Tuple[List[str], Optional[Number], Optional[Number]]:
321    """Returns the report list and removed temperature and dewpoint strings"""
322    for i, item in reversed(list(enumerate(data))):
323        if "/" in item:
324            # ///07
325            if item[0] == "/":
326                item = "/" + item.lstrip("/")
327            # 07///
328            elif item[-1] == "/":
329                item = item.rstrip("/") + "/"
330            tempdew = item.split("/")
331            if len(tempdew) != 2:
332                continue
333            valid = True
334            for j, temp in enumerate(tempdew):
335                if temp in ["MM", "XX"]:
336                    tempdew[j] = ""
337                elif not core.is_possible_temp(temp):
338                    valid = False
339                    break
340            if valid:
341                data.pop(i)
342                temp, dew = tempdew
343                return data, core.make_number(temp), core.make_number(dew)
344    return data, None, None

Returns the report list and removed temperature and dewpoint strings

def get_relative_humidity( temperature: Optional[avwx.structs.Number], dewpoint: Optional[avwx.structs.Number], remarks_info: Optional[avwx.structs.RemarksData], units: avwx.structs.Units) -> Optional[float]:
347def get_relative_humidity(
348    temperature: Optional[Number],
349    dewpoint: Optional[Number],
350    remarks_info: Optional[RemarksData],
351    units: Units,
352) -> Optional[float]:
353    """Calculates relative humidity from preferred temperature and dewpoint"""
354    if remarks_info is not None:
355        temp = remarks_info.temperature_decimal or temperature
356        dew = remarks_info.dewpoint_decimal or dewpoint
357    else:
358        temp = temperature
359        dew = dewpoint
360    if temp is None or temp.value is None:
361        return None
362    if dew is None or dew.value is None:
363        return None
364    return core.relative_humidity(temp.value, dew.value, units.temperature)

Calculates relative humidity from preferred temperature and dewpoint

def sanitize(report: str) -> Tuple[str, str, List[str], avwx.structs.Sanitization]:
367def sanitize(report: str) -> Tuple[str, str, List[str], Sanitization]:
368    """Returns a sanitized report, remarks, and elements ready for parsing"""
369    sans = Sanitization()
370    clean = clean_metar_string(report, sans)
371    data, remark_str = get_remarks(clean)
372    data = core.dedupe(data)
373    data = clean_metar_list(data, sans)
374    clean = " ".join(data)
375    if remark_str:
376        clean += f" {remark_str}"
377    return clean, remark_str, data, sans

Returns a sanitized report, remarks, and elements ready for parsing

def parse( station: str, report: str, issued: Optional[datetime.date] = None, use_na: Optional[bool] = None) -> Tuple[Optional[avwx.structs.MetarData], Optional[avwx.structs.Units], Optional[avwx.structs.Sanitization]]:
380def parse(
381    station: str,
382    report: str,
383    issued: Optional[date] = None,
384    use_na: Optional[bool] = None,
385) -> Tuple[Optional[MetarData], Optional[Units], Optional[Sanitization]]:
386    """Returns MetarData and Units dataclasses with parsed data and their associated units"""
387    valid_station(station)
388    if not report:
389        return None, None, None
390    if use_na is None:
391        use_na = uses_na_format(station[:2])
392    parser = parse_na if use_na else parse_in
393    return parser(report, issued)

Returns MetarData and Units dataclasses with parsed data and their associated units

def parse_na( report: str, issued: Optional[datetime.date] = None) -> Tuple[avwx.structs.MetarData, avwx.structs.Units, avwx.structs.Sanitization]:
396def parse_na(
397    report: str, issued: Optional[date] = None
398) -> Tuple[MetarData, Units, Sanitization]:
399    """Parser for the North American METAR variant"""
400    # pylint: disable=too-many-locals
401    units = Units.north_american()
402    sanitized, remarks_str, data, sans = sanitize(report)
403    data, station, time = core.get_station_and_time(data)
404    data, runway_visibility = get_runway_visibility(data)
405    data, clouds = core.get_clouds(data)
406    (
407        data,
408        wind_direction,
409        wind_speed,
410        wind_gust,
411        wind_variable_direction,
412    ) = core.get_wind(data, units)
413    data, altimeter = get_altimeter(data, units, "NA")
414    data, visibility = core.get_visibility(data, units)
415    data, temperature, dewpoint = get_temp_and_dew(data)
416    condition = core.get_flight_rules(visibility, core.get_ceiling(clouds))
417    other, wx_codes = get_wx_codes(data)
418    remarks_info = remarks.parse(remarks_str)
419    humidity = get_relative_humidity(temperature, dewpoint, remarks_info, units)
420    struct = MetarData(
421        altimeter=altimeter,
422        clouds=clouds,
423        dewpoint=dewpoint,
424        flight_rules=FLIGHT_RULES[condition],
425        other=other,
426        raw=report,
427        relative_humidity=humidity,
428        remarks_info=remarks_info,
429        remarks=remarks_str,
430        runway_visibility=runway_visibility,
431        sanitized=sanitized,
432        station=station,
433        temperature=temperature,
434        time=core.make_timestamp(time, target_date=issued),
435        visibility=visibility,
436        wind_direction=wind_direction,
437        wind_gust=wind_gust,
438        wind_speed=wind_speed,
439        wind_variable_direction=wind_variable_direction,
440        wx_codes=wx_codes,
441    )
442    return struct, units, sans

Parser for the North American METAR variant

def parse_in( report: str, issued: Optional[datetime.date] = None) -> Tuple[avwx.structs.MetarData, avwx.structs.Units, avwx.structs.Sanitization]:
445def parse_in(
446    report: str, issued: Optional[date] = None
447) -> Tuple[MetarData, Units, Sanitization]:
448    """Parser for the International METAR variant"""
449    # pylint: disable=too-many-locals
450    units = Units.international()
451    sanitized, remarks_str, data, sans = sanitize(report)
452    data, station, time = core.get_station_and_time(data)
453    data, runway_visibility = get_runway_visibility(data)
454    if "CAVOK" not in data:
455        data, clouds = core.get_clouds(data)
456    (
457        data,
458        wind_direction,
459        wind_speed,
460        wind_gust,
461        wind_variable_direction,
462    ) = core.get_wind(data, units)
463    data, altimeter = get_altimeter(data, units, "IN")
464    if "CAVOK" in data:
465        visibility = core.make_number("CAVOK")
466        clouds = []
467        data.remove("CAVOK")
468    else:
469        data, visibility = core.get_visibility(data, units)
470    data, temperature, dewpoint = get_temp_and_dew(data)
471    condition = core.get_flight_rules(visibility, core.get_ceiling(clouds))
472    other, wx_codes = get_wx_codes(data)
473    remarks_info = remarks.parse(remarks_str)
474    humidity = get_relative_humidity(temperature, dewpoint, remarks_info, units)
475    struct = MetarData(
476        altimeter=altimeter,
477        clouds=clouds,
478        dewpoint=dewpoint,
479        flight_rules=FLIGHT_RULES[condition],
480        other=other,
481        raw=report,
482        relative_humidity=humidity,
483        remarks_info=remarks_info,
484        remarks=remarks_str,
485        runway_visibility=runway_visibility,
486        sanitized=sanitized,
487        station=station,
488        temperature=temperature,
489        time=core.make_timestamp(time, target_date=issued),
490        visibility=visibility,
491        wind_direction=wind_direction,
492        wind_gust=wind_gust,
493        wind_speed=wind_speed,
494        wind_variable_direction=wind_variable_direction,
495        wx_codes=wx_codes,
496    )
497    return struct, units, sans

Parser for the International METAR variant