avwx.current.metar
A METAR (Meteorological Aerodrome Report) is the surface weather observed at most controlled (and some uncontrolled) airports. They are updated once per hour or when conditions change enough to warrant an update, and the observations are valid for one hour after the report was issued or until the next report is issued.
1""" 2A METAR (Meteorological Aerodrome Report) is the surface weather observed at 3most controlled (and some uncontrolled) airports. They are updated once per 4hour or when conditions change enough to warrant an update, and the 5observations are valid for one hour after the report was issued or until the 6next report is issued. 7""" 8 9# stdlib 10from __future__ import annotations 11 12from contextlib import suppress 13from datetime import date, datetime, timedelta, timezone 14 15# module 16from avwx.current.base import Report, get_wx_codes 17from avwx.parsing import core, remarks, speech, summary 18from avwx.parsing.sanitization.metar import clean_metar_list, clean_metar_string 19from avwx.parsing.translate.metar import translate_metar 20from avwx.service import Noaa 21from avwx.static.core import FLIGHT_RULES 22from avwx.static.metar import METAR_RMK 23from avwx.station import uses_na_format, valid_station 24from avwx.structs import ( 25 Code, 26 MetarData, 27 MetarTrans, 28 Number, 29 RemarksData, 30 RunwayVisibility, 31 Sanitization, 32 Units, 33) 34 35 36class Metar(Report): 37 """The Metar class offers an object-oriented approach to managing METAR data 38 for a single station. 39 40 Below is typical usage for fetching and pulling METAR data for KJFK. 41 42 ```python 43 >>> from avwx import Metar 44 >>> kjfk = Metar("KJFK") 45 >>> kjfk.station.name 46 'John F Kennedy International Airport' 47 >>> kjfk.update() 48 True 49 >>> kjfk.last_updated 50 datetime.datetime(2018, 3, 4, 23, 36, 6, 62376) 51 >>> kjfk.raw 52 'KJFK 042251Z 32023G32KT 10SM BKN060 04/M08 A3008 RMK AO2 PK WND 32032/2251 SLP184 T00441078' 53 >>> kjfk.data.flight_rules 54 'VFR' 55 >>> kjfk.translations.remarks 56 {'AO2': 'Automated with precipitation sensor', 'SLP184': 'Sea level pressure: 1018.4 hPa', 'T00441078': 'Temperature 4.4°C and dewpoint -7.8°C'} 57 ``` 58 59 The `parse` and `from_report` methods can parse a report string if you want 60 to override the normal fetching process. Here's an example of a really bad 61 day. 62 63 ```python 64 >>> from avwx import Metar 65 >>> report = 'KSFO 031254Z 36024G55KT 320V040 1/8SM R06/0200D +TS VCFC OVC050 BKN040TCU 14/10 A2978 RMK AIRPORT CLOSED' 66 >>> ksfo = Metar.from_report(report) 67 True 68 >>> ksfo.station.city 69 'San Francisco' 70 >>> ksfo.last_updated 71 datetime.datetime(2018, 3, 4, 23, 54, 4, 353757, tzinfo=datetime.timezone.utc) 72 >>> ksfo.data.flight_rules 73 'LIFR' 74 >>> ksfo.translations.clouds 75 'Broken layer at 4000ft (Towering Cumulus), Overcast layer at 5000ft - Reported AGL' 76 >>> ksfo.summary 77 'Winds N-360 (variable 320 to 040) at 24kt gusting to 55kt, Vis 0.125sm, Temp 14C, Dew 10C, Alt 29.78inHg, Heavy Thunderstorm, Vicinity Funnel Cloud, Broken layer at 4000ft (Towering Cumulus), Overcast layer at 5000ft' 78 ``` 79 """ 80 81 data: MetarData | None = None 82 translations: MetarTrans | None = None 83 84 async def _pull_from_default(self) -> None: 85 """Check for a more recent report from NOAA.""" 86 service = Noaa(self.__class__.__name__.lower()) 87 if self.code is None: 88 return 89 report = await service.async_fetch(self.code) 90 if report is not None: 91 data, units, sans = parse(self.code, report, self.issued) 92 if not data or data.time is None or data.time.dt is None: 93 return 94 if not self.data or self.data.time is None or self.data.time.dt is None or data.time.dt > self.data.time.dt: 95 self.data, self.units, self.sanitization = data, units, sans 96 self.source = service.root 97 98 @property 99 def _should_check_default(self) -> bool: 100 """Return True if pulled from regional source and potentially out of date.""" 101 if isinstance(self.service, Noaa) or self.source is None: 102 return False 103 104 if self.data is None or self.data.time is None or self.data.time.dt is None: 105 return True 106 time_since = datetime.now(tz=timezone.utc) - self.data.time.dt 107 return time_since > timedelta(minutes=90) 108 109 def _calculate_altitudes(self) -> None: 110 """Add the pressure and density altitudes to data if all fields are available.""" 111 if self.data is None or self.station is None or self.units is None: 112 return 113 # Select decimal temperature if available 114 temp = self.data.temperature 115 if self.data.remarks_info is not None: 116 temp = self.data.remarks_info.temperature_decimal or temp 117 alt = self.data.altimeter 118 if temp is None or temp.value is None or alt is None or alt.value is None: 119 return 120 elev = self.station.elevation_ft 121 if elev is None: 122 return 123 self.data.pressure_altitude = core.pressure_altitude(alt.value, elev, self.units.altimeter) 124 self.data.density_altitude = core.density_altitude(alt.value, temp.value, elev, self.units) 125 126 async def _post_update(self) -> None: 127 if self.code is None or self.raw is None: 128 return 129 self.data, self.units, self.sanitization = parse(self.code, self.raw, self.issued) 130 if self._should_check_default: 131 await self._pull_from_default() 132 if self.data is None or self.units is None: 133 return 134 self._calculate_altitudes() 135 self.translations = translate_metar(self.data, self.units) 136 137 def _post_parse(self) -> None: 138 if self.code is None or self.raw is None: 139 return 140 self.data, self.units, self.sanitization = parse(self.code, self.raw, self.issued) 141 if self.data is None or self.units is None: 142 return 143 self._calculate_altitudes() 144 self.translations = translate_metar(self.data, self.units) 145 146 @staticmethod 147 def sanitize(report: str) -> str: 148 """Sanitize a METAR string.""" 149 return sanitize(report)[0] 150 151 @property 152 def summary(self) -> str | None: 153 """Condensed report summary created from translations.""" 154 if not self.translations: 155 self.update() 156 return None if self.translations is None else summary.metar(self.translations) 157 158 @property 159 def speech(self) -> str | None: 160 """Report summary designed to be read by a text-to-speech program.""" 161 if not self.data: 162 self.update() 163 if self.data is None or self.units is None: 164 return None 165 return speech.metar(self.data, self.units) 166 167 168def get_remarks(txt: str) -> tuple[list[str], str]: 169 """Return the report split into components and the remarks string. 170 171 Remarks can include items like RMK and on, NOSIG and on, and BECMG and on 172 """ 173 txt = txt.replace("?", "").strip() 174 # First look for Altimeter in txt 175 alt_index = len(txt) + 1 176 for item in [" A2", " A3", " Q1", " Q0", " Q9"]: 177 index = txt.find(item) 178 if len(txt) - 6 > index > -1 and txt[index + 2 : index + 6].isdigit(): 179 alt_index = index 180 # Then look for earliest remarks 'signifier' 181 sig_index = core.find_first_in_list(txt, METAR_RMK) 182 if sig_index == -1: 183 sig_index = len(txt) + 1 184 if sig_index > alt_index > -1: 185 return txt[: alt_index + 6].strip().split(), txt[alt_index + 7 :] 186 if alt_index > sig_index > -1: 187 return txt[:sig_index].strip().split(), txt[sig_index + 1 :] 188 return txt.strip().split(), "" 189 190 191_RVR_CODES = { 192 "M": "less than", 193 "A": "greater than", 194 "P": "greater than", 195 "U": "increasing", 196 "I": "increasing", 197 "D": "decreasing", 198 "F": "decreasing", 199 "R": "decreasing", 200 "N": "no change", 201 "V": "variable", 202} 203 204 205def _parse_rvr_number(value: str) -> Number | None: 206 if not value: 207 return None 208 raw, prefix = value, None 209 with suppress(KeyError): 210 prefix = _RVR_CODES[value[0]] 211 value = value[1:] 212 number = core.make_number(value, raw) 213 if number is not None and prefix is not None: 214 number.spoken = f"{prefix} {number.spoken}" 215 number.value = None 216 return number 217 218 219def parse_runway_visibility(value: str) -> RunwayVisibility: 220 """Parse a runway visibility range string.""" 221 raw, trend = value, None 222 # TODO: update to check and convert units post visibility parse 223 value = value.replace("FT", "") 224 with suppress(KeyError): 225 trend = Code(value[-1], _RVR_CODES[value[-1]]) 226 value = value[:-1] 227 runway, value, *_ = value[1:].split("/") 228 if value: 229 possible_numbers = [_parse_rvr_number(n) for n in value.split("V")] 230 numbers = [n for n in possible_numbers if n is not None] 231 visibility = numbers.pop() if len(numbers) == 1 else None 232 else: 233 visibility, numbers = None, [] 234 return RunwayVisibility( 235 repr=raw, 236 runway=runway, 237 visibility=visibility, 238 variable_visibility=numbers, 239 trend=trend, 240 ) 241 242 243def get_runway_visibility(data: list[str]) -> tuple[list[str], list[RunwayVisibility]]: 244 """Return the report list and the remove runway visibility list.""" 245 runway_vis = [ 246 parse_runway_visibility(data.pop(i)) 247 for i, item in reversed(list(enumerate(data))) 248 if core.is_runway_visibility(item) 249 ] 250 runway_vis.sort(key=lambda x: x.runway) 251 return data, runway_vis 252 253 254def parse_altimeter(value: str | None) -> Number | None: 255 """Parse an altimeter string into a Number.""" 256 if not value or len(value) < 4: 257 return None 258 # QNH3003INS 259 if len(value) >= 7 and value.endswith("INS"): 260 return core.make_number(f"{value[-7:-5]}.{value[-5:-3]}", value, literal=True) 261 number = value.replace(".", "") 262 # Q1000/10 263 if "/" in number: 264 number = number.split("/")[0] 265 if number.startswith("QNH"): 266 number = f"Q{number[1:]}" 267 if not (len(number) in {4, 5} and number[-4:].isdigit()): 268 return None 269 number = number.lstrip("AQ") 270 if number[0] in ("2", "3"): 271 number = f"{number[:2]}.{number[2:]}" 272 elif number[0] not in ("0", "1"): 273 return None 274 return core.make_number(number, value, number, literal=True) 275 276 277def get_altimeter(data: list[str], units: Units, version: str = "NA") -> tuple[list[str], Number | None]: 278 """Return the report list and the removed altimeter item. 279 280 Version is 'NA' (North American / default) or 'IN' (International) 281 """ 282 values: list[Number] = [] 283 for _ in range(2): 284 if not data: 285 break 286 value = parse_altimeter(data[-1]) 287 if value is None: 288 break 289 values.append(value) 290 data.pop(-1) 291 if not values: 292 return data, None 293 values.sort(key=lambda x: x.value or 0) 294 altimeter = values[0 if version == "NA" else -1] 295 if altimeter.value is not None: 296 units.altimeter = "inHg" if altimeter.value < 100 else "hPa" 297 return data, altimeter 298 299 300def get_temp_and_dew( 301 data: list[str], 302) -> tuple[list[str], Number | None, Number | None]: 303 """Return the report list and removed temperature and dewpoint strings.""" 304 for i, item in reversed(list(enumerate(data))): 305 if "/" in item: 306 # ///07 307 if item[0] == "/": 308 item = "/" + item.lstrip("/") # noqa: PLW2901 309 # 07/// 310 elif item[-1] == "/": 311 item = item.rstrip("/") + "/" # noqa: PLW2901 312 tempdew = item.split("/") 313 if len(tempdew) != 2: 314 continue 315 valid = True 316 for j, temp in enumerate(tempdew): 317 if temp in ["MM", "XX"]: 318 tempdew[j] = "" 319 elif not core.is_possible_temp(temp): 320 valid = False 321 break 322 if valid: 323 data.pop(i) 324 temp, dew = tempdew 325 return data, core.make_number(temp), core.make_number(dew) 326 return data, None, None 327 328 329def get_relative_humidity( 330 temperature: Number | None, 331 dewpoint: Number | None, 332 remarks_info: RemarksData | None, 333 units: Units, 334) -> float | None: 335 """Calculate relative humidity from preferred temperature and dewpoint.""" 336 if remarks_info is not None: 337 temp = remarks_info.temperature_decimal or temperature 338 dew = remarks_info.dewpoint_decimal or dewpoint 339 else: 340 temp = temperature 341 dew = dewpoint 342 if temp is None or temp.value is None: 343 return None 344 if dew is None or dew.value is None: 345 return None 346 return core.relative_humidity(temp.value, dew.value, units.temperature) 347 348 349def sanitize(report: str) -> tuple[str, str, list[str], Sanitization]: 350 """Return a sanitized report, remarks, and elements ready for parsing.""" 351 sans = Sanitization() 352 clean = clean_metar_string(report, sans) 353 data, remark_str = get_remarks(clean) 354 data = core.dedupe(data) 355 data = clean_metar_list(data, sans) 356 clean = " ".join(data) 357 if remark_str: 358 clean += f" {remark_str}" 359 return clean, remark_str, data, sans 360 361 362def parse( 363 station: str, 364 report: str, 365 issued: date | None = None, 366 use_na: bool | None = None, 367) -> tuple[MetarData | None, Units | None, Sanitization | None]: 368 """Return MetarData and Units dataclasses with parsed data and their associated units.""" 369 valid_station(station) 370 if not report: 371 return None, None, None 372 if use_na is None: 373 use_na = uses_na_format(station[:2]) 374 parser = parse_na if use_na else parse_in 375 return parser(report, issued) 376 377 378def parse_na(report: str, issued: date | None = None) -> tuple[MetarData, Units, Sanitization]: 379 """Parser for the North American METAR variant.""" 380 units = Units.north_american() 381 sanitized, remarks_str, data, sans = sanitize(report) 382 data, station, time = core.get_station_and_time(data) 383 data, runway_visibility = get_runway_visibility(data) 384 data, clouds = core.get_clouds(data) 385 ( 386 data, 387 wind_direction, 388 wind_speed, 389 wind_gust, 390 wind_variable_direction, 391 ) = core.get_wind(data, units) 392 data, altimeter = get_altimeter(data, units, "NA") 393 data, visibility = core.get_visibility(data, units) 394 data, temperature, dewpoint = get_temp_and_dew(data) 395 condition = core.get_flight_rules(visibility, core.get_ceiling(clouds)) 396 other, wx_codes = get_wx_codes(data) 397 remarks_info = remarks.parse(remarks_str) 398 humidity = get_relative_humidity(temperature, dewpoint, remarks_info, units) 399 struct = MetarData( 400 altimeter=altimeter, 401 clouds=clouds, 402 dewpoint=dewpoint, 403 flight_rules=FLIGHT_RULES[condition], 404 other=other, 405 raw=report, 406 relative_humidity=humidity, 407 remarks_info=remarks_info, 408 remarks=remarks_str, 409 runway_visibility=runway_visibility, 410 sanitized=sanitized, 411 station=station, 412 temperature=temperature, 413 time=core.make_timestamp(time, target_date=issued), 414 visibility=visibility, 415 wind_direction=wind_direction, 416 wind_gust=wind_gust, 417 wind_speed=wind_speed, 418 wind_variable_direction=wind_variable_direction, 419 wx_codes=wx_codes, 420 ) 421 return struct, units, sans 422 423 424def parse_in(report: str, issued: date | None = None) -> tuple[MetarData, Units, Sanitization]: 425 """Parser for the International METAR variant.""" 426 units = Units.international() 427 sanitized, remarks_str, data, sans = sanitize(report) 428 data, station, time = core.get_station_and_time(data) 429 data, runway_visibility = get_runway_visibility(data) 430 if "CAVOK" not in data: 431 data, clouds = core.get_clouds(data) 432 ( 433 data, 434 wind_direction, 435 wind_speed, 436 wind_gust, 437 wind_variable_direction, 438 ) = core.get_wind(data, units) 439 data, altimeter = get_altimeter(data, units, "IN") 440 if "CAVOK" in data: 441 visibility = core.make_number("CAVOK") 442 clouds = [] 443 data.remove("CAVOK") 444 else: 445 data, visibility = core.get_visibility(data, units) 446 data, temperature, dewpoint = get_temp_and_dew(data) 447 condition = core.get_flight_rules(visibility, core.get_ceiling(clouds)) 448 other, wx_codes = get_wx_codes(data) 449 remarks_info = remarks.parse(remarks_str) 450 humidity = get_relative_humidity(temperature, dewpoint, remarks_info, units) 451 struct = MetarData( 452 altimeter=altimeter, 453 clouds=clouds, 454 dewpoint=dewpoint, 455 flight_rules=FLIGHT_RULES[condition], 456 other=other, 457 raw=report, 458 relative_humidity=humidity, 459 remarks_info=remarks_info, 460 remarks=remarks_str, 461 runway_visibility=runway_visibility, 462 sanitized=sanitized, 463 station=station, 464 temperature=temperature, 465 time=core.make_timestamp(time, target_date=issued), 466 visibility=visibility, 467 wind_direction=wind_direction, 468 wind_gust=wind_gust, 469 wind_speed=wind_speed, 470 wind_variable_direction=wind_variable_direction, 471 wx_codes=wx_codes, 472 ) 473 return struct, units, sans
37class Metar(Report): 38 """The Metar class offers an object-oriented approach to managing METAR data 39 for a single station. 40 41 Below is typical usage for fetching and pulling METAR data for KJFK. 42 43 ```python 44 >>> from avwx import Metar 45 >>> kjfk = Metar("KJFK") 46 >>> kjfk.station.name 47 'John F Kennedy International Airport' 48 >>> kjfk.update() 49 True 50 >>> kjfk.last_updated 51 datetime.datetime(2018, 3, 4, 23, 36, 6, 62376) 52 >>> kjfk.raw 53 'KJFK 042251Z 32023G32KT 10SM BKN060 04/M08 A3008 RMK AO2 PK WND 32032/2251 SLP184 T00441078' 54 >>> kjfk.data.flight_rules 55 'VFR' 56 >>> kjfk.translations.remarks 57 {'AO2': 'Automated with precipitation sensor', 'SLP184': 'Sea level pressure: 1018.4 hPa', 'T00441078': 'Temperature 4.4°C and dewpoint -7.8°C'} 58 ``` 59 60 The `parse` and `from_report` methods can parse a report string if you want 61 to override the normal fetching process. Here's an example of a really bad 62 day. 63 64 ```python 65 >>> from avwx import Metar 66 >>> report = 'KSFO 031254Z 36024G55KT 320V040 1/8SM R06/0200D +TS VCFC OVC050 BKN040TCU 14/10 A2978 RMK AIRPORT CLOSED' 67 >>> ksfo = Metar.from_report(report) 68 True 69 >>> ksfo.station.city 70 'San Francisco' 71 >>> ksfo.last_updated 72 datetime.datetime(2018, 3, 4, 23, 54, 4, 353757, tzinfo=datetime.timezone.utc) 73 >>> ksfo.data.flight_rules 74 'LIFR' 75 >>> ksfo.translations.clouds 76 'Broken layer at 4000ft (Towering Cumulus), Overcast layer at 5000ft - Reported AGL' 77 >>> ksfo.summary 78 'Winds N-360 (variable 320 to 040) at 24kt gusting to 55kt, Vis 0.125sm, Temp 14C, Dew 10C, Alt 29.78inHg, Heavy Thunderstorm, Vicinity Funnel Cloud, Broken layer at 4000ft (Towering Cumulus), Overcast layer at 5000ft' 79 ``` 80 """ 81 82 data: MetarData | None = None 83 translations: MetarTrans | None = None 84 85 async def _pull_from_default(self) -> None: 86 """Check for a more recent report from NOAA.""" 87 service = Noaa(self.__class__.__name__.lower()) 88 if self.code is None: 89 return 90 report = await service.async_fetch(self.code) 91 if report is not None: 92 data, units, sans = parse(self.code, report, self.issued) 93 if not data or data.time is None or data.time.dt is None: 94 return 95 if not self.data or self.data.time is None or self.data.time.dt is None or data.time.dt > self.data.time.dt: 96 self.data, self.units, self.sanitization = data, units, sans 97 self.source = service.root 98 99 @property 100 def _should_check_default(self) -> bool: 101 """Return True if pulled from regional source and potentially out of date.""" 102 if isinstance(self.service, Noaa) or self.source is None: 103 return False 104 105 if self.data is None or self.data.time is None or self.data.time.dt is None: 106 return True 107 time_since = datetime.now(tz=timezone.utc) - self.data.time.dt 108 return time_since > timedelta(minutes=90) 109 110 def _calculate_altitudes(self) -> None: 111 """Add the pressure and density altitudes to data if all fields are available.""" 112 if self.data is None or self.station is None or self.units is None: 113 return 114 # Select decimal temperature if available 115 temp = self.data.temperature 116 if self.data.remarks_info is not None: 117 temp = self.data.remarks_info.temperature_decimal or temp 118 alt = self.data.altimeter 119 if temp is None or temp.value is None or alt is None or alt.value is None: 120 return 121 elev = self.station.elevation_ft 122 if elev is None: 123 return 124 self.data.pressure_altitude = core.pressure_altitude(alt.value, elev, self.units.altimeter) 125 self.data.density_altitude = core.density_altitude(alt.value, temp.value, elev, self.units) 126 127 async def _post_update(self) -> None: 128 if self.code is None or self.raw is None: 129 return 130 self.data, self.units, self.sanitization = parse(self.code, self.raw, self.issued) 131 if self._should_check_default: 132 await self._pull_from_default() 133 if self.data is None or self.units is None: 134 return 135 self._calculate_altitudes() 136 self.translations = translate_metar(self.data, self.units) 137 138 def _post_parse(self) -> None: 139 if self.code is None or self.raw is None: 140 return 141 self.data, self.units, self.sanitization = parse(self.code, self.raw, self.issued) 142 if self.data is None or self.units is None: 143 return 144 self._calculate_altitudes() 145 self.translations = translate_metar(self.data, self.units) 146 147 @staticmethod 148 def sanitize(report: str) -> str: 149 """Sanitize a METAR string.""" 150 return sanitize(report)[0] 151 152 @property 153 def summary(self) -> str | None: 154 """Condensed report summary created from translations.""" 155 if not self.translations: 156 self.update() 157 return None if self.translations is None else summary.metar(self.translations) 158 159 @property 160 def speech(self) -> str | None: 161 """Report summary designed to be read by a text-to-speech program.""" 162 if not self.data: 163 self.update() 164 if self.data is None or self.units is None: 165 return None 166 return speech.metar(self.data, self.units)
The Metar class offers an object-oriented approach to managing METAR data for a single station.
Below is typical usage for fetching and pulling METAR data for KJFK.
>>> from avwx import Metar
>>> kjfk = Metar("KJFK")
>>> kjfk.station.name
'John F Kennedy International Airport'
>>> kjfk.update()
True
>>> kjfk.last_updated
datetime.datetime(2018, 3, 4, 23, 36, 6, 62376)
>>> kjfk.raw
'KJFK 042251Z 32023G32KT 10SM BKN060 04/M08 A3008 RMK AO2 PK WND 32032/2251 SLP184 T00441078'
>>> kjfk.data.flight_rules
'VFR'
>>> kjfk.translations.remarks
{'AO2': 'Automated with precipitation sensor', 'SLP184': 'Sea level pressure: 1018.4 hPa', 'T00441078': 'Temperature 4.4°C and dewpoint -7.8°C'}
The parse
and from_report
methods can parse a report string if you want
to override the normal fetching process. Here's an example of a really bad
day.
>>> from avwx import Metar
>>> report = 'KSFO 031254Z 36024G55KT 320V040 1/8SM R06/0200D +TS VCFC OVC050 BKN040TCU 14/10 A2978 RMK AIRPORT CLOSED'
>>> ksfo = Metar.from_report(report)
True
>>> ksfo.station.city
'San Francisco'
>>> ksfo.last_updated
datetime.datetime(2018, 3, 4, 23, 54, 4, 353757, tzinfo=datetime.timezone.utc)
>>> ksfo.data.flight_rules
'LIFR'
>>> ksfo.translations.clouds
'Broken layer at 4000ft (Towering Cumulus), Overcast layer at 5000ft - Reported AGL'
>>> ksfo.summary
'Winds N-360 (variable 320 to 040) at 24kt gusting to 55kt, Vis 0.125sm, Temp 14C, Dew 10C, Alt 29.78inHg, Heavy Thunderstorm, Vicinity Funnel Cloud, Broken layer at 4000ft (Towering Cumulus), Overcast layer at 5000ft'
147 @staticmethod 148 def sanitize(report: str) -> str: 149 """Sanitize a METAR string.""" 150 return sanitize(report)[0]
Sanitize a METAR string.
152 @property 153 def summary(self) -> str | None: 154 """Condensed report summary created from translations.""" 155 if not self.translations: 156 self.update() 157 return None if self.translations is None else summary.metar(self.translations)
Condensed report summary created from translations.
159 @property 160 def speech(self) -> str | None: 161 """Report summary designed to be read by a text-to-speech program.""" 162 if not self.data: 163 self.update() 164 if self.data is None or self.units is None: 165 return None 166 return speech.metar(self.data, self.units)
Report summary designed to be read by a text-to-speech program.
169def get_remarks(txt: str) -> tuple[list[str], str]: 170 """Return the report split into components and the remarks string. 171 172 Remarks can include items like RMK and on, NOSIG and on, and BECMG and on 173 """ 174 txt = txt.replace("?", "").strip() 175 # First look for Altimeter in txt 176 alt_index = len(txt) + 1 177 for item in [" A2", " A3", " Q1", " Q0", " Q9"]: 178 index = txt.find(item) 179 if len(txt) - 6 > index > -1 and txt[index + 2 : index + 6].isdigit(): 180 alt_index = index 181 # Then look for earliest remarks 'signifier' 182 sig_index = core.find_first_in_list(txt, METAR_RMK) 183 if sig_index == -1: 184 sig_index = len(txt) + 1 185 if sig_index > alt_index > -1: 186 return txt[: alt_index + 6].strip().split(), txt[alt_index + 7 :] 187 if alt_index > sig_index > -1: 188 return txt[:sig_index].strip().split(), txt[sig_index + 1 :] 189 return txt.strip().split(), ""
Return the report split into components and the remarks string.
Remarks can include items like RMK and on, NOSIG and on, and BECMG and on
220def parse_runway_visibility(value: str) -> RunwayVisibility: 221 """Parse a runway visibility range string.""" 222 raw, trend = value, None 223 # TODO: update to check and convert units post visibility parse 224 value = value.replace("FT", "") 225 with suppress(KeyError): 226 trend = Code(value[-1], _RVR_CODES[value[-1]]) 227 value = value[:-1] 228 runway, value, *_ = value[1:].split("/") 229 if value: 230 possible_numbers = [_parse_rvr_number(n) for n in value.split("V")] 231 numbers = [n for n in possible_numbers if n is not None] 232 visibility = numbers.pop() if len(numbers) == 1 else None 233 else: 234 visibility, numbers = None, [] 235 return RunwayVisibility( 236 repr=raw, 237 runway=runway, 238 visibility=visibility, 239 variable_visibility=numbers, 240 trend=trend, 241 )
Parse a runway visibility range string.
244def get_runway_visibility(data: list[str]) -> tuple[list[str], list[RunwayVisibility]]: 245 """Return the report list and the remove runway visibility list.""" 246 runway_vis = [ 247 parse_runway_visibility(data.pop(i)) 248 for i, item in reversed(list(enumerate(data))) 249 if core.is_runway_visibility(item) 250 ] 251 runway_vis.sort(key=lambda x: x.runway) 252 return data, runway_vis
Return the report list and the remove runway visibility list.
255def parse_altimeter(value: str | None) -> Number | None: 256 """Parse an altimeter string into a Number.""" 257 if not value or len(value) < 4: 258 return None 259 # QNH3003INS 260 if len(value) >= 7 and value.endswith("INS"): 261 return core.make_number(f"{value[-7:-5]}.{value[-5:-3]}", value, literal=True) 262 number = value.replace(".", "") 263 # Q1000/10 264 if "/" in number: 265 number = number.split("/")[0] 266 if number.startswith("QNH"): 267 number = f"Q{number[1:]}" 268 if not (len(number) in {4, 5} and number[-4:].isdigit()): 269 return None 270 number = number.lstrip("AQ") 271 if number[0] in ("2", "3"): 272 number = f"{number[:2]}.{number[2:]}" 273 elif number[0] not in ("0", "1"): 274 return None 275 return core.make_number(number, value, number, literal=True)
Parse an altimeter string into a Number.
278def get_altimeter(data: list[str], units: Units, version: str = "NA") -> tuple[list[str], Number | None]: 279 """Return the report list and the removed altimeter item. 280 281 Version is 'NA' (North American / default) or 'IN' (International) 282 """ 283 values: list[Number] = [] 284 for _ in range(2): 285 if not data: 286 break 287 value = parse_altimeter(data[-1]) 288 if value is None: 289 break 290 values.append(value) 291 data.pop(-1) 292 if not values: 293 return data, None 294 values.sort(key=lambda x: x.value or 0) 295 altimeter = values[0 if version == "NA" else -1] 296 if altimeter.value is not None: 297 units.altimeter = "inHg" if altimeter.value < 100 else "hPa" 298 return data, altimeter
Return the report list and the removed altimeter item.
Version is 'NA' (North American / default) or 'IN' (International)
301def get_temp_and_dew( 302 data: list[str], 303) -> tuple[list[str], Number | None, Number | None]: 304 """Return the report list and removed temperature and dewpoint strings.""" 305 for i, item in reversed(list(enumerate(data))): 306 if "/" in item: 307 # ///07 308 if item[0] == "/": 309 item = "/" + item.lstrip("/") # noqa: PLW2901 310 # 07/// 311 elif item[-1] == "/": 312 item = item.rstrip("/") + "/" # noqa: PLW2901 313 tempdew = item.split("/") 314 if len(tempdew) != 2: 315 continue 316 valid = True 317 for j, temp in enumerate(tempdew): 318 if temp in ["MM", "XX"]: 319 tempdew[j] = "" 320 elif not core.is_possible_temp(temp): 321 valid = False 322 break 323 if valid: 324 data.pop(i) 325 temp, dew = tempdew 326 return data, core.make_number(temp), core.make_number(dew) 327 return data, None, None
Return the report list and removed temperature and dewpoint strings.
330def get_relative_humidity( 331 temperature: Number | None, 332 dewpoint: Number | None, 333 remarks_info: RemarksData | None, 334 units: Units, 335) -> float | None: 336 """Calculate relative humidity from preferred temperature and dewpoint.""" 337 if remarks_info is not None: 338 temp = remarks_info.temperature_decimal or temperature 339 dew = remarks_info.dewpoint_decimal or dewpoint 340 else: 341 temp = temperature 342 dew = dewpoint 343 if temp is None or temp.value is None: 344 return None 345 if dew is None or dew.value is None: 346 return None 347 return core.relative_humidity(temp.value, dew.value, units.temperature)
Calculate relative humidity from preferred temperature and dewpoint.
350def sanitize(report: str) -> tuple[str, str, list[str], Sanitization]: 351 """Return a sanitized report, remarks, and elements ready for parsing.""" 352 sans = Sanitization() 353 clean = clean_metar_string(report, sans) 354 data, remark_str = get_remarks(clean) 355 data = core.dedupe(data) 356 data = clean_metar_list(data, sans) 357 clean = " ".join(data) 358 if remark_str: 359 clean += f" {remark_str}" 360 return clean, remark_str, data, sans
Return a sanitized report, remarks, and elements ready for parsing.
363def parse( 364 station: str, 365 report: str, 366 issued: date | None = None, 367 use_na: bool | None = None, 368) -> tuple[MetarData | None, Units | None, Sanitization | None]: 369 """Return MetarData and Units dataclasses with parsed data and their associated units.""" 370 valid_station(station) 371 if not report: 372 return None, None, None 373 if use_na is None: 374 use_na = uses_na_format(station[:2]) 375 parser = parse_na if use_na else parse_in 376 return parser(report, issued)
Return MetarData and Units dataclasses with parsed data and their associated units.
379def parse_na(report: str, issued: date | None = None) -> tuple[MetarData, Units, Sanitization]: 380 """Parser for the North American METAR variant.""" 381 units = Units.north_american() 382 sanitized, remarks_str, data, sans = sanitize(report) 383 data, station, time = core.get_station_and_time(data) 384 data, runway_visibility = get_runway_visibility(data) 385 data, clouds = core.get_clouds(data) 386 ( 387 data, 388 wind_direction, 389 wind_speed, 390 wind_gust, 391 wind_variable_direction, 392 ) = core.get_wind(data, units) 393 data, altimeter = get_altimeter(data, units, "NA") 394 data, visibility = core.get_visibility(data, units) 395 data, temperature, dewpoint = get_temp_and_dew(data) 396 condition = core.get_flight_rules(visibility, core.get_ceiling(clouds)) 397 other, wx_codes = get_wx_codes(data) 398 remarks_info = remarks.parse(remarks_str) 399 humidity = get_relative_humidity(temperature, dewpoint, remarks_info, units) 400 struct = MetarData( 401 altimeter=altimeter, 402 clouds=clouds, 403 dewpoint=dewpoint, 404 flight_rules=FLIGHT_RULES[condition], 405 other=other, 406 raw=report, 407 relative_humidity=humidity, 408 remarks_info=remarks_info, 409 remarks=remarks_str, 410 runway_visibility=runway_visibility, 411 sanitized=sanitized, 412 station=station, 413 temperature=temperature, 414 time=core.make_timestamp(time, target_date=issued), 415 visibility=visibility, 416 wind_direction=wind_direction, 417 wind_gust=wind_gust, 418 wind_speed=wind_speed, 419 wind_variable_direction=wind_variable_direction, 420 wx_codes=wx_codes, 421 ) 422 return struct, units, sans
Parser for the North American METAR variant.
425def parse_in(report: str, issued: date | None = None) -> tuple[MetarData, Units, Sanitization]: 426 """Parser for the International METAR variant.""" 427 units = Units.international() 428 sanitized, remarks_str, data, sans = sanitize(report) 429 data, station, time = core.get_station_and_time(data) 430 data, runway_visibility = get_runway_visibility(data) 431 if "CAVOK" not in data: 432 data, clouds = core.get_clouds(data) 433 ( 434 data, 435 wind_direction, 436 wind_speed, 437 wind_gust, 438 wind_variable_direction, 439 ) = core.get_wind(data, units) 440 data, altimeter = get_altimeter(data, units, "IN") 441 if "CAVOK" in data: 442 visibility = core.make_number("CAVOK") 443 clouds = [] 444 data.remove("CAVOK") 445 else: 446 data, visibility = core.get_visibility(data, units) 447 data, temperature, dewpoint = get_temp_and_dew(data) 448 condition = core.get_flight_rules(visibility, core.get_ceiling(clouds)) 449 other, wx_codes = get_wx_codes(data) 450 remarks_info = remarks.parse(remarks_str) 451 humidity = get_relative_humidity(temperature, dewpoint, remarks_info, units) 452 struct = MetarData( 453 altimeter=altimeter, 454 clouds=clouds, 455 dewpoint=dewpoint, 456 flight_rules=FLIGHT_RULES[condition], 457 other=other, 458 raw=report, 459 relative_humidity=humidity, 460 remarks_info=remarks_info, 461 remarks=remarks_str, 462 runway_visibility=runway_visibility, 463 sanitized=sanitized, 464 station=station, 465 temperature=temperature, 466 time=core.make_timestamp(time, target_date=issued), 467 visibility=visibility, 468 wind_direction=wind_direction, 469 wind_gust=wind_gust, 470 wind_speed=wind_speed, 471 wind_variable_direction=wind_variable_direction, 472 wx_codes=wx_codes, 473 ) 474 return struct, units, sans
Parser for the International METAR variant.