avwx.current.metar

A METAR (Meteorological Aerodrome Report) is the surface weather observed at most controlled (and some uncontrolled) airports. They are updated once per hour or when conditions change enough to warrant an update, and the observations are valid for one hour after the report was issued or until the next report is issued.

  1"""
  2A METAR (Meteorological Aerodrome Report) is the surface weather observed at
  3most controlled (and some uncontrolled) airports. They are updated once per
  4hour or when conditions change enough to warrant an update, and the
  5observations are valid for one hour after the report was issued or until the
  6next report is issued.
  7"""
  8
  9# stdlib
 10from __future__ import annotations
 11
 12from contextlib import suppress
 13from datetime import date, datetime, timedelta, timezone
 14
 15# module
 16from avwx.current.base import Report, get_wx_codes
 17from avwx.parsing import core, remarks, speech, summary
 18from avwx.parsing.sanitization.metar import clean_metar_list, clean_metar_string
 19from avwx.parsing.translate.metar import translate_metar
 20from avwx.service import Noaa
 21from avwx.static.core import FLIGHT_RULES
 22from avwx.static.metar import METAR_RMK
 23from avwx.station import uses_na_format, valid_station
 24from avwx.structs import (
 25    Code,
 26    MetarData,
 27    MetarTrans,
 28    Number,
 29    RemarksData,
 30    RunwayVisibility,
 31    Sanitization,
 32    Units,
 33)
 34
 35
 36class Metar(Report):
 37    """The Metar class offers an object-oriented approach to managing METAR data
 38    for a single station.
 39
 40    Below is typical usage for fetching and pulling METAR data for KJFK.
 41
 42    ```python
 43    >>> from avwx import Metar
 44    >>> kjfk = Metar("KJFK")
 45    >>> kjfk.station.name
 46    'John F Kennedy International Airport'
 47    >>> kjfk.update()
 48    True
 49    >>> kjfk.last_updated
 50    datetime.datetime(2018, 3, 4, 23, 36, 6, 62376)
 51    >>> kjfk.raw
 52    'KJFK 042251Z 32023G32KT 10SM BKN060 04/M08 A3008 RMK AO2 PK WND 32032/2251 SLP184 T00441078'
 53    >>> kjfk.data.flight_rules
 54    'VFR'
 55    >>> kjfk.translations.remarks
 56    {'AO2': 'Automated with precipitation sensor', 'SLP184': 'Sea level pressure: 1018.4 hPa', 'T00441078': 'Temperature 4.4°C and dewpoint -7.8°C'}
 57    ```
 58
 59    The `parse` and `from_report` methods can parse a report string if you want
 60    to override the normal fetching process. Here's an example of a really bad
 61    day.
 62
 63    ```python
 64    >>> from avwx import Metar
 65    >>> report = 'KSFO 031254Z 36024G55KT 320V040 1/8SM R06/0200D +TS VCFC OVC050 BKN040TCU 14/10 A2978 RMK AIRPORT CLOSED'
 66    >>> ksfo = Metar.from_report(report)
 67    True
 68    >>> ksfo.station.city
 69    'San Francisco'
 70    >>> ksfo.last_updated
 71    datetime.datetime(2018, 3, 4, 23, 54, 4, 353757, tzinfo=datetime.timezone.utc)
 72    >>> ksfo.data.flight_rules
 73    'LIFR'
 74    >>> ksfo.translations.clouds
 75    'Broken layer at 4000ft (Towering Cumulus), Overcast layer at 5000ft - Reported AGL'
 76    >>> ksfo.summary
 77    'Winds N-360 (variable 320 to 040) at 24kt gusting to 55kt, Vis 0.125sm, Temp 14C, Dew 10C, Alt 29.78inHg, Heavy Thunderstorm, Vicinity Funnel Cloud, Broken layer at 4000ft (Towering Cumulus), Overcast layer at 5000ft'
 78    ```
 79    """
 80
 81    data: MetarData | None = None
 82    translations: MetarTrans | None = None
 83
 84    async def _pull_from_default(self) -> None:
 85        """Check for a more recent report from NOAA."""
 86        service = Noaa(self.__class__.__name__.lower())
 87        if self.code is None:
 88            return
 89        report = await service.async_fetch(self.code)
 90        if report is not None:
 91            data, units, sans = parse(self.code, report, self.issued)
 92            if not data or data.time is None or data.time.dt is None:
 93                return
 94            if not self.data or self.data.time is None or self.data.time.dt is None or data.time.dt > self.data.time.dt:
 95                self.data, self.units, self.sanitization = data, units, sans
 96                self.source = service.root
 97
 98    @property
 99    def _should_check_default(self) -> bool:
100        """Return True if pulled from regional source and potentially out of date."""
101        if isinstance(self.service, Noaa) or self.source is None:
102            return False
103
104        if self.data is None or self.data.time is None or self.data.time.dt is None:
105            return True
106        time_since = datetime.now(tz=timezone.utc) - self.data.time.dt
107        return time_since > timedelta(minutes=90)
108
109    def _calculate_altitudes(self) -> None:
110        """Add the pressure and density altitudes to data if all fields are available."""
111        if self.data is None or self.station is None or self.units is None:
112            return
113        # Select decimal temperature if available
114        temp = self.data.temperature
115        if self.data.remarks_info is not None:
116            temp = self.data.remarks_info.temperature_decimal or temp
117        alt = self.data.altimeter
118        if temp is None or temp.value is None or alt is None or alt.value is None:
119            return
120        elev = self.station.elevation_ft
121        if elev is None:
122            return
123        self.data.pressure_altitude = core.pressure_altitude(alt.value, elev, self.units.altimeter)
124        self.data.density_altitude = core.density_altitude(alt.value, temp.value, elev, self.units)
125
126    async def _post_update(self) -> None:
127        if self.code is None or self.raw is None:
128            return
129        self.data, self.units, self.sanitization = parse(self.code, self.raw, self.issued)
130        if self._should_check_default:
131            await self._pull_from_default()
132        if self.data is None or self.units is None:
133            return
134        self._calculate_altitudes()
135        self.translations = translate_metar(self.data, self.units)
136
137    def _post_parse(self) -> None:
138        if self.code is None or self.raw is None:
139            return
140        self.data, self.units, self.sanitization = parse(self.code, self.raw, self.issued)
141        if self.data is None or self.units is None:
142            return
143        self._calculate_altitudes()
144        self.translations = translate_metar(self.data, self.units)
145
146    @staticmethod
147    def sanitize(report: str) -> str:
148        """Sanitize a METAR string."""
149        return sanitize(report)[0]
150
151    @property
152    def summary(self) -> str | None:
153        """Condensed report summary created from translations."""
154        if not self.translations:
155            self.update()
156        return None if self.translations is None else summary.metar(self.translations)
157
158    @property
159    def speech(self) -> str | None:
160        """Report summary designed to be read by a text-to-speech program."""
161        if not self.data:
162            self.update()
163        if self.data is None or self.units is None:
164            return None
165        return speech.metar(self.data, self.units)
166
167
168def get_remarks(txt: str) -> tuple[list[str], str]:
169    """Return the report split into components and the remarks string.
170
171    Remarks can include items like RMK and on, NOSIG and on, and BECMG and on
172    """
173    txt = txt.replace("?", "").strip()
174    # First look for Altimeter in txt
175    alt_index = len(txt) + 1
176    for item in [" A2", " A3", " Q1", " Q0", " Q9"]:
177        index = txt.find(item)
178        if len(txt) - 6 > index > -1 and txt[index + 2 : index + 6].isdigit():
179            alt_index = index
180    # Then look for earliest remarks 'signifier'
181    sig_index = core.find_first_in_list(txt, METAR_RMK)
182    if sig_index == -1:
183        sig_index = len(txt) + 1
184    if sig_index > alt_index > -1:
185        return txt[: alt_index + 6].strip().split(), txt[alt_index + 7 :]
186    if alt_index > sig_index > -1:
187        return txt[:sig_index].strip().split(), txt[sig_index + 1 :]
188    return txt.strip().split(), ""
189
190
191_RVR_CODES = {
192    "M": "less than",
193    "A": "greater than",
194    "P": "greater than",
195    "U": "increasing",
196    "I": "increasing",
197    "D": "decreasing",
198    "F": "decreasing",
199    "R": "decreasing",
200    "N": "no change",
201    "V": "variable",
202}
203
204
205def _parse_rvr_number(value: str) -> Number | None:
206    if not value:
207        return None
208    raw, prefix = value, None
209    with suppress(KeyError):
210        prefix = _RVR_CODES[value[0]]
211        value = value[1:]
212    number = core.make_number(value, raw)
213    if number is not None and prefix is not None:
214        number.spoken = f"{prefix} {number.spoken}"
215        number.value = None
216    return number
217
218
219def parse_runway_visibility(value: str) -> RunwayVisibility:
220    """Parse a runway visibility range string."""
221    raw, trend = value, None
222    # TODO: update to check and convert units post visibility parse
223    value = value.replace("FT", "")
224    with suppress(KeyError):
225        trend = Code(value[-1], _RVR_CODES[value[-1]])
226        value = value[:-1]
227    runway, value, *_ = value[1:].split("/")
228    if value:
229        possible_numbers = [_parse_rvr_number(n) for n in value.split("V")]
230        numbers = [n for n in possible_numbers if n is not None]
231        visibility = numbers.pop() if len(numbers) == 1 else None
232    else:
233        visibility, numbers = None, []
234    return RunwayVisibility(
235        repr=raw,
236        runway=runway,
237        visibility=visibility,
238        variable_visibility=numbers,
239        trend=trend,
240    )
241
242
243def get_runway_visibility(data: list[str]) -> tuple[list[str], list[RunwayVisibility]]:
244    """Return the report list and the remove runway visibility list."""
245    runway_vis = [
246        parse_runway_visibility(data.pop(i))
247        for i, item in reversed(list(enumerate(data)))
248        if core.is_runway_visibility(item)
249    ]
250    runway_vis.sort(key=lambda x: x.runway)
251    return data, runway_vis
252
253
254def parse_altimeter(value: str | None) -> Number | None:
255    """Parse an altimeter string into a Number."""
256    if not value or len(value) < 4:
257        return None
258    # QNH3003INS
259    if len(value) >= 7 and value.endswith("INS"):
260        return core.make_number(f"{value[-7:-5]}.{value[-5:-3]}", value, literal=True)
261    number = value.replace(".", "")
262    # Q1000/10
263    if "/" in number:
264        number = number.split("/")[0]
265    if number.startswith("QNH"):
266        number = f"Q{number[1:]}"
267    if not (len(number) in {4, 5} and number[-4:].isdigit()):
268        return None
269    number = number.lstrip("AQ")
270    if number[0] in ("2", "3"):
271        number = f"{number[:2]}.{number[2:]}"
272    elif number[0] not in ("0", "1"):
273        return None
274    return core.make_number(number, value, number, literal=True)
275
276
277def get_altimeter(data: list[str], units: Units, version: str = "NA") -> tuple[list[str], Number | None]:
278    """Return the report list and the removed altimeter item.
279
280    Version is 'NA' (North American / default) or 'IN' (International)
281    """
282    values: list[Number] = []
283    for _ in range(2):
284        if not data:
285            break
286        value = parse_altimeter(data[-1])
287        if value is None:
288            break
289        values.append(value)
290        data.pop(-1)
291    if not values:
292        return data, None
293    values.sort(key=lambda x: x.value or 0)
294    altimeter = values[0 if version == "NA" else -1]
295    if altimeter.value is not None:
296        units.altimeter = "inHg" if altimeter.value < 100 else "hPa"
297    return data, altimeter
298
299
300def get_temp_and_dew(
301    data: list[str],
302) -> tuple[list[str], Number | None, Number | None]:
303    """Return the report list and removed temperature and dewpoint strings."""
304    for i, item in reversed(list(enumerate(data))):
305        if "/" in item:
306            # ///07
307            if item[0] == "/":
308                item = "/" + item.lstrip("/")  # noqa: PLW2901
309            # 07///
310            elif item[-1] == "/":
311                item = item.rstrip("/") + "/"  # noqa: PLW2901
312            tempdew = item.split("/")
313            if len(tempdew) != 2:
314                continue
315            valid = True
316            for j, temp in enumerate(tempdew):
317                if temp in ["MM", "XX"]:
318                    tempdew[j] = ""
319                elif not core.is_possible_temp(temp):
320                    valid = False
321                    break
322            if valid:
323                data.pop(i)
324                temp, dew = tempdew
325                return data, core.make_number(temp), core.make_number(dew)
326    return data, None, None
327
328
329def get_relative_humidity(
330    temperature: Number | None,
331    dewpoint: Number | None,
332    remarks_info: RemarksData | None,
333    units: Units,
334) -> float | None:
335    """Calculate relative humidity from preferred temperature and dewpoint."""
336    if remarks_info is not None:
337        temp = remarks_info.temperature_decimal or temperature
338        dew = remarks_info.dewpoint_decimal or dewpoint
339    else:
340        temp = temperature
341        dew = dewpoint
342    if temp is None or temp.value is None:
343        return None
344    if dew is None or dew.value is None:
345        return None
346    return core.relative_humidity(temp.value, dew.value, units.temperature)
347
348
349def sanitize(report: str) -> tuple[str, str, list[str], Sanitization]:
350    """Return a sanitized report, remarks, and elements ready for parsing."""
351    sans = Sanitization()
352    clean = clean_metar_string(report, sans)
353    data, remark_str = get_remarks(clean)
354    data = core.dedupe(data)
355    data = clean_metar_list(data, sans)
356    clean = " ".join(data)
357    if remark_str:
358        clean += f" {remark_str}"
359    return clean, remark_str, data, sans
360
361
362def parse(
363    station: str,
364    report: str,
365    issued: date | None = None,
366    use_na: bool | None = None,
367) -> tuple[MetarData | None, Units | None, Sanitization | None]:
368    """Return MetarData and Units dataclasses with parsed data and their associated units."""
369    valid_station(station)
370    if not report:
371        return None, None, None
372    if use_na is None:
373        use_na = uses_na_format(station[:2])
374    parser = parse_na if use_na else parse_in
375    return parser(report, issued)
376
377
378def parse_na(report: str, issued: date | None = None) -> tuple[MetarData, Units, Sanitization]:
379    """Parser for the North American METAR variant."""
380    units = Units.north_american()
381    sanitized, remarks_str, data, sans = sanitize(report)
382    data, station, time = core.get_station_and_time(data)
383    data, runway_visibility = get_runway_visibility(data)
384    data, clouds = core.get_clouds(data)
385    (
386        data,
387        wind_direction,
388        wind_speed,
389        wind_gust,
390        wind_variable_direction,
391    ) = core.get_wind(data, units)
392    data, altimeter = get_altimeter(data, units, "NA")
393    data, visibility = core.get_visibility(data, units)
394    data, temperature, dewpoint = get_temp_and_dew(data)
395    condition = core.get_flight_rules(visibility, core.get_ceiling(clouds))
396    other, wx_codes = get_wx_codes(data)
397    remarks_info = remarks.parse(remarks_str)
398    humidity = get_relative_humidity(temperature, dewpoint, remarks_info, units)
399    struct = MetarData(
400        altimeter=altimeter,
401        clouds=clouds,
402        dewpoint=dewpoint,
403        flight_rules=FLIGHT_RULES[condition],
404        other=other,
405        raw=report,
406        relative_humidity=humidity,
407        remarks_info=remarks_info,
408        remarks=remarks_str,
409        runway_visibility=runway_visibility,
410        sanitized=sanitized,
411        station=station,
412        temperature=temperature,
413        time=core.make_timestamp(time, target_date=issued),
414        visibility=visibility,
415        wind_direction=wind_direction,
416        wind_gust=wind_gust,
417        wind_speed=wind_speed,
418        wind_variable_direction=wind_variable_direction,
419        wx_codes=wx_codes,
420    )
421    return struct, units, sans
422
423
424def parse_in(report: str, issued: date | None = None) -> tuple[MetarData, Units, Sanitization]:
425    """Parser for the International METAR variant."""
426    units = Units.international()
427    sanitized, remarks_str, data, sans = sanitize(report)
428    data, station, time = core.get_station_and_time(data)
429    data, runway_visibility = get_runway_visibility(data)
430    if "CAVOK" not in data:
431        data, clouds = core.get_clouds(data)
432    (
433        data,
434        wind_direction,
435        wind_speed,
436        wind_gust,
437        wind_variable_direction,
438    ) = core.get_wind(data, units)
439    data, altimeter = get_altimeter(data, units, "IN")
440    if "CAVOK" in data:
441        visibility = core.make_number("CAVOK")
442        clouds = []
443        data.remove("CAVOK")
444    else:
445        data, visibility = core.get_visibility(data, units)
446    data, temperature, dewpoint = get_temp_and_dew(data)
447    condition = core.get_flight_rules(visibility, core.get_ceiling(clouds))
448    other, wx_codes = get_wx_codes(data)
449    remarks_info = remarks.parse(remarks_str)
450    humidity = get_relative_humidity(temperature, dewpoint, remarks_info, units)
451    struct = MetarData(
452        altimeter=altimeter,
453        clouds=clouds,
454        dewpoint=dewpoint,
455        flight_rules=FLIGHT_RULES[condition],
456        other=other,
457        raw=report,
458        relative_humidity=humidity,
459        remarks_info=remarks_info,
460        remarks=remarks_str,
461        runway_visibility=runway_visibility,
462        sanitized=sanitized,
463        station=station,
464        temperature=temperature,
465        time=core.make_timestamp(time, target_date=issued),
466        visibility=visibility,
467        wind_direction=wind_direction,
468        wind_gust=wind_gust,
469        wind_speed=wind_speed,
470        wind_variable_direction=wind_variable_direction,
471        wx_codes=wx_codes,
472    )
473    return struct, units, sans
class Metar(avwx.current.base.Report):
 37class Metar(Report):
 38    """The Metar class offers an object-oriented approach to managing METAR data
 39    for a single station.
 40
 41    Below is typical usage for fetching and pulling METAR data for KJFK.
 42
 43    ```python
 44    >>> from avwx import Metar
 45    >>> kjfk = Metar("KJFK")
 46    >>> kjfk.station.name
 47    'John F Kennedy International Airport'
 48    >>> kjfk.update()
 49    True
 50    >>> kjfk.last_updated
 51    datetime.datetime(2018, 3, 4, 23, 36, 6, 62376)
 52    >>> kjfk.raw
 53    'KJFK 042251Z 32023G32KT 10SM BKN060 04/M08 A3008 RMK AO2 PK WND 32032/2251 SLP184 T00441078'
 54    >>> kjfk.data.flight_rules
 55    'VFR'
 56    >>> kjfk.translations.remarks
 57    {'AO2': 'Automated with precipitation sensor', 'SLP184': 'Sea level pressure: 1018.4 hPa', 'T00441078': 'Temperature 4.4°C and dewpoint -7.8°C'}
 58    ```
 59
 60    The `parse` and `from_report` methods can parse a report string if you want
 61    to override the normal fetching process. Here's an example of a really bad
 62    day.
 63
 64    ```python
 65    >>> from avwx import Metar
 66    >>> report = 'KSFO 031254Z 36024G55KT 320V040 1/8SM R06/0200D +TS VCFC OVC050 BKN040TCU 14/10 A2978 RMK AIRPORT CLOSED'
 67    >>> ksfo = Metar.from_report(report)
 68    True
 69    >>> ksfo.station.city
 70    'San Francisco'
 71    >>> ksfo.last_updated
 72    datetime.datetime(2018, 3, 4, 23, 54, 4, 353757, tzinfo=datetime.timezone.utc)
 73    >>> ksfo.data.flight_rules
 74    'LIFR'
 75    >>> ksfo.translations.clouds
 76    'Broken layer at 4000ft (Towering Cumulus), Overcast layer at 5000ft - Reported AGL'
 77    >>> ksfo.summary
 78    'Winds N-360 (variable 320 to 040) at 24kt gusting to 55kt, Vis 0.125sm, Temp 14C, Dew 10C, Alt 29.78inHg, Heavy Thunderstorm, Vicinity Funnel Cloud, Broken layer at 4000ft (Towering Cumulus), Overcast layer at 5000ft'
 79    ```
 80    """
 81
 82    data: MetarData | None = None
 83    translations: MetarTrans | None = None
 84
 85    async def _pull_from_default(self) -> None:
 86        """Check for a more recent report from NOAA."""
 87        service = Noaa(self.__class__.__name__.lower())
 88        if self.code is None:
 89            return
 90        report = await service.async_fetch(self.code)
 91        if report is not None:
 92            data, units, sans = parse(self.code, report, self.issued)
 93            if not data or data.time is None or data.time.dt is None:
 94                return
 95            if not self.data or self.data.time is None or self.data.time.dt is None or data.time.dt > self.data.time.dt:
 96                self.data, self.units, self.sanitization = data, units, sans
 97                self.source = service.root
 98
 99    @property
100    def _should_check_default(self) -> bool:
101        """Return True if pulled from regional source and potentially out of date."""
102        if isinstance(self.service, Noaa) or self.source is None:
103            return False
104
105        if self.data is None or self.data.time is None or self.data.time.dt is None:
106            return True
107        time_since = datetime.now(tz=timezone.utc) - self.data.time.dt
108        return time_since > timedelta(minutes=90)
109
110    def _calculate_altitudes(self) -> None:
111        """Add the pressure and density altitudes to data if all fields are available."""
112        if self.data is None or self.station is None or self.units is None:
113            return
114        # Select decimal temperature if available
115        temp = self.data.temperature
116        if self.data.remarks_info is not None:
117            temp = self.data.remarks_info.temperature_decimal or temp
118        alt = self.data.altimeter
119        if temp is None or temp.value is None or alt is None or alt.value is None:
120            return
121        elev = self.station.elevation_ft
122        if elev is None:
123            return
124        self.data.pressure_altitude = core.pressure_altitude(alt.value, elev, self.units.altimeter)
125        self.data.density_altitude = core.density_altitude(alt.value, temp.value, elev, self.units)
126
127    async def _post_update(self) -> None:
128        if self.code is None or self.raw is None:
129            return
130        self.data, self.units, self.sanitization = parse(self.code, self.raw, self.issued)
131        if self._should_check_default:
132            await self._pull_from_default()
133        if self.data is None or self.units is None:
134            return
135        self._calculate_altitudes()
136        self.translations = translate_metar(self.data, self.units)
137
138    def _post_parse(self) -> None:
139        if self.code is None or self.raw is None:
140            return
141        self.data, self.units, self.sanitization = parse(self.code, self.raw, self.issued)
142        if self.data is None or self.units is None:
143            return
144        self._calculate_altitudes()
145        self.translations = translate_metar(self.data, self.units)
146
147    @staticmethod
148    def sanitize(report: str) -> str:
149        """Sanitize a METAR string."""
150        return sanitize(report)[0]
151
152    @property
153    def summary(self) -> str | None:
154        """Condensed report summary created from translations."""
155        if not self.translations:
156            self.update()
157        return None if self.translations is None else summary.metar(self.translations)
158
159    @property
160    def speech(self) -> str | None:
161        """Report summary designed to be read by a text-to-speech program."""
162        if not self.data:
163            self.update()
164        if self.data is None or self.units is None:
165            return None
166        return speech.metar(self.data, self.units)

The Metar class offers an object-oriented approach to managing METAR data for a single station.

Below is typical usage for fetching and pulling METAR data for KJFK.

>>> from avwx import Metar
>>> kjfk = Metar("KJFK")
>>> kjfk.station.name
'John F Kennedy International Airport'
>>> kjfk.update()
True
>>> kjfk.last_updated
datetime.datetime(2018, 3, 4, 23, 36, 6, 62376)
>>> kjfk.raw
'KJFK 042251Z 32023G32KT 10SM BKN060 04/M08 A3008 RMK AO2 PK WND 32032/2251 SLP184 T00441078'
>>> kjfk.data.flight_rules
'VFR'
>>> kjfk.translations.remarks
{'AO2': 'Automated with precipitation sensor', 'SLP184': 'Sea level pressure: 1018.4 hPa', 'T00441078': 'Temperature 4.4°C and dewpoint -7.8°C'}

The parse and from_report methods can parse a report string if you want to override the normal fetching process. Here's an example of a really bad day.

>>> from avwx import Metar
>>> report = 'KSFO 031254Z 36024G55KT 320V040 1/8SM R06/0200D +TS VCFC OVC050 BKN040TCU 14/10 A2978 RMK AIRPORT CLOSED'
>>> ksfo = Metar.from_report(report)
True
>>> ksfo.station.city
'San Francisco'
>>> ksfo.last_updated
datetime.datetime(2018, 3, 4, 23, 54, 4, 353757, tzinfo=datetime.timezone.utc)
>>> ksfo.data.flight_rules
'LIFR'
>>> ksfo.translations.clouds
'Broken layer at 4000ft (Towering Cumulus), Overcast layer at 5000ft - Reported AGL'
>>> ksfo.summary
'Winds N-360 (variable 320 to 040) at 24kt gusting to 55kt, Vis 0.125sm, Temp 14C, Dew 10C, Alt 29.78inHg, Heavy Thunderstorm, Vicinity Funnel Cloud, Broken layer at 4000ft (Towering Cumulus), Overcast layer at 5000ft'
data: avwx.structs.MetarData | None = None
translations: avwx.structs.MetarTrans | None = None
@staticmethod
def sanitize(report: str) -> str:
147    @staticmethod
148    def sanitize(report: str) -> str:
149        """Sanitize a METAR string."""
150        return sanitize(report)[0]

Sanitize a METAR string.

summary: str | None
152    @property
153    def summary(self) -> str | None:
154        """Condensed report summary created from translations."""
155        if not self.translations:
156            self.update()
157        return None if self.translations is None else summary.metar(self.translations)

Condensed report summary created from translations.

speech: str | None
159    @property
160    def speech(self) -> str | None:
161        """Report summary designed to be read by a text-to-speech program."""
162        if not self.data:
163            self.update()
164        if self.data is None or self.units is None:
165            return None
166        return speech.metar(self.data, self.units)

Report summary designed to be read by a text-to-speech program.

def get_remarks(txt: str) -> tuple[list[str], str]:
169def get_remarks(txt: str) -> tuple[list[str], str]:
170    """Return the report split into components and the remarks string.
171
172    Remarks can include items like RMK and on, NOSIG and on, and BECMG and on
173    """
174    txt = txt.replace("?", "").strip()
175    # First look for Altimeter in txt
176    alt_index = len(txt) + 1
177    for item in [" A2", " A3", " Q1", " Q0", " Q9"]:
178        index = txt.find(item)
179        if len(txt) - 6 > index > -1 and txt[index + 2 : index + 6].isdigit():
180            alt_index = index
181    # Then look for earliest remarks 'signifier'
182    sig_index = core.find_first_in_list(txt, METAR_RMK)
183    if sig_index == -1:
184        sig_index = len(txt) + 1
185    if sig_index > alt_index > -1:
186        return txt[: alt_index + 6].strip().split(), txt[alt_index + 7 :]
187    if alt_index > sig_index > -1:
188        return txt[:sig_index].strip().split(), txt[sig_index + 1 :]
189    return txt.strip().split(), ""

Return the report split into components and the remarks string.

Remarks can include items like RMK and on, NOSIG and on, and BECMG and on

def parse_runway_visibility(value: str) -> avwx.structs.RunwayVisibility:
220def parse_runway_visibility(value: str) -> RunwayVisibility:
221    """Parse a runway visibility range string."""
222    raw, trend = value, None
223    # TODO: update to check and convert units post visibility parse
224    value = value.replace("FT", "")
225    with suppress(KeyError):
226        trend = Code(value[-1], _RVR_CODES[value[-1]])
227        value = value[:-1]
228    runway, value, *_ = value[1:].split("/")
229    if value:
230        possible_numbers = [_parse_rvr_number(n) for n in value.split("V")]
231        numbers = [n for n in possible_numbers if n is not None]
232        visibility = numbers.pop() if len(numbers) == 1 else None
233    else:
234        visibility, numbers = None, []
235    return RunwayVisibility(
236        repr=raw,
237        runway=runway,
238        visibility=visibility,
239        variable_visibility=numbers,
240        trend=trend,
241    )

Parse a runway visibility range string.

def get_runway_visibility(data: list[str]) -> tuple[list[str], list[avwx.structs.RunwayVisibility]]:
244def get_runway_visibility(data: list[str]) -> tuple[list[str], list[RunwayVisibility]]:
245    """Return the report list and the remove runway visibility list."""
246    runway_vis = [
247        parse_runway_visibility(data.pop(i))
248        for i, item in reversed(list(enumerate(data)))
249        if core.is_runway_visibility(item)
250    ]
251    runway_vis.sort(key=lambda x: x.runway)
252    return data, runway_vis

Return the report list and the remove runway visibility list.

def parse_altimeter(value: str | None) -> avwx.structs.Number | None:
255def parse_altimeter(value: str | None) -> Number | None:
256    """Parse an altimeter string into a Number."""
257    if not value or len(value) < 4:
258        return None
259    # QNH3003INS
260    if len(value) >= 7 and value.endswith("INS"):
261        return core.make_number(f"{value[-7:-5]}.{value[-5:-3]}", value, literal=True)
262    number = value.replace(".", "")
263    # Q1000/10
264    if "/" in number:
265        number = number.split("/")[0]
266    if number.startswith("QNH"):
267        number = f"Q{number[1:]}"
268    if not (len(number) in {4, 5} and number[-4:].isdigit()):
269        return None
270    number = number.lstrip("AQ")
271    if number[0] in ("2", "3"):
272        number = f"{number[:2]}.{number[2:]}"
273    elif number[0] not in ("0", "1"):
274        return None
275    return core.make_number(number, value, number, literal=True)

Parse an altimeter string into a Number.

def get_altimeter( data: list[str], units: avwx.structs.Units, version: str = 'NA') -> tuple[list[str], avwx.structs.Number | None]:
278def get_altimeter(data: list[str], units: Units, version: str = "NA") -> tuple[list[str], Number | None]:
279    """Return the report list and the removed altimeter item.
280
281    Version is 'NA' (North American / default) or 'IN' (International)
282    """
283    values: list[Number] = []
284    for _ in range(2):
285        if not data:
286            break
287        value = parse_altimeter(data[-1])
288        if value is None:
289            break
290        values.append(value)
291        data.pop(-1)
292    if not values:
293        return data, None
294    values.sort(key=lambda x: x.value or 0)
295    altimeter = values[0 if version == "NA" else -1]
296    if altimeter.value is not None:
297        units.altimeter = "inHg" if altimeter.value < 100 else "hPa"
298    return data, altimeter

Return the report list and the removed altimeter item.

Version is 'NA' (North American / default) or 'IN' (International)

def get_temp_and_dew( data: list[str]) -> tuple[list[str], avwx.structs.Number | None, avwx.structs.Number | None]:
301def get_temp_and_dew(
302    data: list[str],
303) -> tuple[list[str], Number | None, Number | None]:
304    """Return the report list and removed temperature and dewpoint strings."""
305    for i, item in reversed(list(enumerate(data))):
306        if "/" in item:
307            # ///07
308            if item[0] == "/":
309                item = "/" + item.lstrip("/")  # noqa: PLW2901
310            # 07///
311            elif item[-1] == "/":
312                item = item.rstrip("/") + "/"  # noqa: PLW2901
313            tempdew = item.split("/")
314            if len(tempdew) != 2:
315                continue
316            valid = True
317            for j, temp in enumerate(tempdew):
318                if temp in ["MM", "XX"]:
319                    tempdew[j] = ""
320                elif not core.is_possible_temp(temp):
321                    valid = False
322                    break
323            if valid:
324                data.pop(i)
325                temp, dew = tempdew
326                return data, core.make_number(temp), core.make_number(dew)
327    return data, None, None

Return the report list and removed temperature and dewpoint strings.

def get_relative_humidity( temperature: avwx.structs.Number | None, dewpoint: avwx.structs.Number | None, remarks_info: avwx.structs.RemarksData | None, units: avwx.structs.Units) -> float | None:
330def get_relative_humidity(
331    temperature: Number | None,
332    dewpoint: Number | None,
333    remarks_info: RemarksData | None,
334    units: Units,
335) -> float | None:
336    """Calculate relative humidity from preferred temperature and dewpoint."""
337    if remarks_info is not None:
338        temp = remarks_info.temperature_decimal or temperature
339        dew = remarks_info.dewpoint_decimal or dewpoint
340    else:
341        temp = temperature
342        dew = dewpoint
343    if temp is None or temp.value is None:
344        return None
345    if dew is None or dew.value is None:
346        return None
347    return core.relative_humidity(temp.value, dew.value, units.temperature)

Calculate relative humidity from preferred temperature and dewpoint.

def sanitize(report: str) -> tuple[str, str, list[str], avwx.structs.Sanitization]:
350def sanitize(report: str) -> tuple[str, str, list[str], Sanitization]:
351    """Return a sanitized report, remarks, and elements ready for parsing."""
352    sans = Sanitization()
353    clean = clean_metar_string(report, sans)
354    data, remark_str = get_remarks(clean)
355    data = core.dedupe(data)
356    data = clean_metar_list(data, sans)
357    clean = " ".join(data)
358    if remark_str:
359        clean += f" {remark_str}"
360    return clean, remark_str, data, sans

Return a sanitized report, remarks, and elements ready for parsing.

def parse( station: str, report: str, issued: datetime.date | None = None, use_na: bool | None = None) -> tuple[avwx.structs.MetarData | None, avwx.structs.Units | None, avwx.structs.Sanitization | None]:
363def parse(
364    station: str,
365    report: str,
366    issued: date | None = None,
367    use_na: bool | None = None,
368) -> tuple[MetarData | None, Units | None, Sanitization | None]:
369    """Return MetarData and Units dataclasses with parsed data and their associated units."""
370    valid_station(station)
371    if not report:
372        return None, None, None
373    if use_na is None:
374        use_na = uses_na_format(station[:2])
375    parser = parse_na if use_na else parse_in
376    return parser(report, issued)

Return MetarData and Units dataclasses with parsed data and their associated units.

def parse_na( report: str, issued: datetime.date | None = None) -> tuple[avwx.structs.MetarData, avwx.structs.Units, avwx.structs.Sanitization]:
379def parse_na(report: str, issued: date | None = None) -> tuple[MetarData, Units, Sanitization]:
380    """Parser for the North American METAR variant."""
381    units = Units.north_american()
382    sanitized, remarks_str, data, sans = sanitize(report)
383    data, station, time = core.get_station_and_time(data)
384    data, runway_visibility = get_runway_visibility(data)
385    data, clouds = core.get_clouds(data)
386    (
387        data,
388        wind_direction,
389        wind_speed,
390        wind_gust,
391        wind_variable_direction,
392    ) = core.get_wind(data, units)
393    data, altimeter = get_altimeter(data, units, "NA")
394    data, visibility = core.get_visibility(data, units)
395    data, temperature, dewpoint = get_temp_and_dew(data)
396    condition = core.get_flight_rules(visibility, core.get_ceiling(clouds))
397    other, wx_codes = get_wx_codes(data)
398    remarks_info = remarks.parse(remarks_str)
399    humidity = get_relative_humidity(temperature, dewpoint, remarks_info, units)
400    struct = MetarData(
401        altimeter=altimeter,
402        clouds=clouds,
403        dewpoint=dewpoint,
404        flight_rules=FLIGHT_RULES[condition],
405        other=other,
406        raw=report,
407        relative_humidity=humidity,
408        remarks_info=remarks_info,
409        remarks=remarks_str,
410        runway_visibility=runway_visibility,
411        sanitized=sanitized,
412        station=station,
413        temperature=temperature,
414        time=core.make_timestamp(time, target_date=issued),
415        visibility=visibility,
416        wind_direction=wind_direction,
417        wind_gust=wind_gust,
418        wind_speed=wind_speed,
419        wind_variable_direction=wind_variable_direction,
420        wx_codes=wx_codes,
421    )
422    return struct, units, sans

Parser for the North American METAR variant.

def parse_in( report: str, issued: datetime.date | None = None) -> tuple[avwx.structs.MetarData, avwx.structs.Units, avwx.structs.Sanitization]:
425def parse_in(report: str, issued: date | None = None) -> tuple[MetarData, Units, Sanitization]:
426    """Parser for the International METAR variant."""
427    units = Units.international()
428    sanitized, remarks_str, data, sans = sanitize(report)
429    data, station, time = core.get_station_and_time(data)
430    data, runway_visibility = get_runway_visibility(data)
431    if "CAVOK" not in data:
432        data, clouds = core.get_clouds(data)
433    (
434        data,
435        wind_direction,
436        wind_speed,
437        wind_gust,
438        wind_variable_direction,
439    ) = core.get_wind(data, units)
440    data, altimeter = get_altimeter(data, units, "IN")
441    if "CAVOK" in data:
442        visibility = core.make_number("CAVOK")
443        clouds = []
444        data.remove("CAVOK")
445    else:
446        data, visibility = core.get_visibility(data, units)
447    data, temperature, dewpoint = get_temp_and_dew(data)
448    condition = core.get_flight_rules(visibility, core.get_ceiling(clouds))
449    other, wx_codes = get_wx_codes(data)
450    remarks_info = remarks.parse(remarks_str)
451    humidity = get_relative_humidity(temperature, dewpoint, remarks_info, units)
452    struct = MetarData(
453        altimeter=altimeter,
454        clouds=clouds,
455        dewpoint=dewpoint,
456        flight_rules=FLIGHT_RULES[condition],
457        other=other,
458        raw=report,
459        relative_humidity=humidity,
460        remarks_info=remarks_info,
461        remarks=remarks_str,
462        runway_visibility=runway_visibility,
463        sanitized=sanitized,
464        station=station,
465        temperature=temperature,
466        time=core.make_timestamp(time, target_date=issued),
467        visibility=visibility,
468        wind_direction=wind_direction,
469        wind_gust=wind_gust,
470        wind_speed=wind_speed,
471        wind_variable_direction=wind_variable_direction,
472        wx_codes=wx_codes,
473    )
474    return struct, units, sans

Parser for the International METAR variant.