avwx.current.taf
A TAF (Terminal Aerodrome Forecast) is a 24-hour weather forecast for the area 5 statute miles from the reporting station. They are update once every three or six hours or when significant changes warrant an update, and the observations are valid for six hours or until the next report is issued
1""" 2A TAF (Terminal Aerodrome Forecast) is a 24-hour weather forecast for the area 35 statute miles from the reporting station. They are update once every three or 4six hours or when significant changes warrant an update, and the observations 5are valid for six hours or until the next report is issued 6""" 7 8# stdlib 9from __future__ import annotations 10 11from contextlib import suppress 12from typing import TYPE_CHECKING 13 14# module 15from avwx.current.base import Report, get_wx_codes 16from avwx.parsing import core, speech, summary 17from avwx.parsing.remarks import parse as parse_remarks 18from avwx.parsing.sanitization.taf import clean_taf_list, clean_taf_string 19from avwx.parsing.translate.taf import translate_taf 20from avwx.static.core import FLIGHT_RULES 21from avwx.static.taf import TAF_NEWLINE, TAF_NEWLINE_STARTSWITH, TAF_RMK 22from avwx.station import uses_na_format, valid_station 23from avwx.structs import ( 24 Cloud, 25 Number, 26 Sanitization, 27 TafData, 28 TafLineData, 29 TafTrans, 30 Timestamp, 31 Units, 32) 33 34if TYPE_CHECKING: 35 from datetime import date 36 37 38class Taf(Report): 39 """ 40 The Taf class offers an object-oriented approach to managing TAF data for a 41 single station. 42 43 ```python 44 >>> from avwx import Taf 45 >>> kjfk = Taf("KJFK") 46 >>> kjfk.station.name 47 'John F Kennedy International Airport' 48 >>> kjfk.update() 49 True 50 >>> kjfk.last_updated 51 datetime.datetime(2018, 3, 4, 23, 43, 26, 209644, tzinfo=datetime.timezone.utc) 52 >>> kjfk.raw 53 'KJFK 042030Z 0421/0524 33016G27KT P6SM BKN045 FM051600 36016G22KT P6SM BKN040 FM052100 35013KT P6SM SCT035' 54 >>> len(kjfk.data.forecast) 55 3 56 >>> kjfk.data.forecast[0].flight_rules 57 'VFR' 58 >>> kjfk.translations.forecast[0].wind 59 'NNW-330 at 16kt gusting to 27kt' 60 >>> kjfk.speech 61 'Starting on March 4th - From 21 to 16 zulu, Winds three three zero at 16kt gusting to 27kt. Visibility greater than six miles. Broken layer at 4500ft. From 16 to 21 zulu, Winds three six zero at 16kt gusting to 22kt. Visibility greater than six miles. Broken layer at 4000ft. From 21 to midnight zulu, Winds three five zero at 13kt. Visibility greater than six miles. Scattered clouds at 3500ft' 62 ``` 63 64 The `parse` and `from_report` methods can parse a report string if you want 65 to override the normal fetching process. 66 67 ```python 68 >>> from avwx import Taf 69 >>> report = "TAF ZYHB 082300Z 0823/0911 VRB03KT 9999 SCT018 BKN120 TX14/0907Z TN04/0921Z FM090100 09015KT 9999 -SHRA WS020/13045KT SCT018 BKN120 BECMG 0904/0906 34008KT PROB30 TEMPO 0906/0911 7000 -RA SCT020 650104 530804 RMK FCST BASED ON AUTO OBS. NXT FCST BY 090600Z" 70 >>> zyhb = Taf.from_report(report) 71 True 72 >>> zyhb.station.city 73 'Hulan' 74 >>> zyhb.data.remarks 75 'RMK FCST BASED ON AUTO OBS. NXT FCST BY 090600Z' 76 >>> zyhb.summary[-1] 77 'Vis 7km, Light Rain, Scattered clouds at 2000ft, Frequent moderate turbulence in clear air from 8000ft to 12000ft, Moderate icing in clouds from 1000ft to 5000ft' 78 ``` 79 """ 80 81 data: TafData | None = None 82 translations: TafTrans | None = None # type: ignore 83 84 async def _post_update(self) -> None: 85 if self.code is None or self.raw is None: 86 return 87 self.data, self.units, self.sanitization = parse(self.code, self.raw, self.issued) 88 if self.data is None or self.units is None: 89 return 90 self.translations = translate_taf(self.data, self.units) 91 92 def _post_parse(self) -> None: 93 if self.code is None or self.raw is None: 94 return 95 self.data, self.units, self.sanitization = parse(self.code, self.raw, self.issued) 96 if self.data is None or self.units is None: 97 return 98 self.translations = translate_taf(self.data, self.units) 99 100 @property 101 def summary(self) -> list[str]: 102 """Condensed summary for each forecast created from translations.""" 103 if not self.translations: 104 self.update() 105 if self.translations is None or self.translations.forecast is None: 106 return [] 107 return [summary.taf(trans) for trans in self.translations.forecast] 108 109 @property 110 def speech(self) -> str | None: 111 """Report summary designed to be read by a text-to-speech program.""" 112 if not self.data: 113 self.update() 114 if self.data is None or self.units is None: 115 return None 116 return speech.taf(self.data, self.units) 117 118 119LINE_FIXES = { 120 "TEMP0": "TEMPO", 121 "TEMP O": "TEMPO", 122 "TMPO": "TEMPO", 123 "TE MPO": "TEMPO", 124 "TEMP ": "TEMPO ", 125 "T EMPO": "TEMPO", 126 " EMPO": " TEMPO", 127 "TEMO": "TEMPO", 128 "BECM G": "BECMG", 129 "BEMCG": "BECMG", 130 "BE CMG": "BECMG", 131 "B ECMG": "BECMG", 132 " BEC ": " BECMG ", 133 "BCEMG": "BECMG", 134 "BEMG": "BECMG", 135} 136 137 138def sanitize_line(txt: str, sans: Sanitization) -> str: 139 """Fix common mistakes with 'new line' signifiers so that they can be recognized.""" 140 for key, fix in LINE_FIXES.items(): 141 if key in txt: 142 txt = txt.replace(key, fix) 143 sans.log(key, fix) 144 # Fix when space is missing following new line signifiers 145 for item in ["BECMG", "TEMPO"]: 146 if item in txt and f"{item} " not in txt: 147 index = txt.find(item) + len(item) 148 txt = f"{txt[:index]} {txt[index:]}" 149 sans.extra_spaces_needed = True 150 return txt 151 152 153def get_taf_remarks(txt: str) -> tuple[str, str]: 154 """Return report and remarks separated if found.""" 155 remarks_start = core.find_first_in_list(txt, TAF_RMK) 156 if remarks_start == -1: 157 return txt, "" 158 remarks = txt[remarks_start:] 159 txt = txt[:remarks_start].strip() 160 return txt, remarks 161 162 163def get_alt_ice_turb( 164 data: list[str], 165) -> tuple[list[str], Number | None, list[str], list[str]]: 166 """Return the report list and removed: Altimeter string, Icing list, Turbulence list.""" 167 altimeter_number = None 168 icing, turbulence = [], [] 169 for i, item in reversed(list(enumerate(data))): 170 if len(item) > 6 and item.startswith("QNH") and item[3:7].isdigit(): 171 altimeter = data.pop(i)[3:7] 172 if altimeter[0] in ("2", "3"): 173 altimeter = f"{altimeter[:2]}.{altimeter[2:]}" 174 altimeter_number = core.make_number(altimeter, literal=True) 175 elif item.isdigit(): 176 if item[0] == "6": 177 icing.append(data.pop(i)) 178 elif item[0] == "5": 179 turbulence.append(data.pop(i)) 180 return data, altimeter_number, icing, turbulence 181 182 183def is_normal_time(item: str) -> bool: 184 """Return if the item looks like a valid TAF (1200/1400) time range.""" 185 return len(item) == 9 and item[4] == "/" and item[:4].isdigit() and item[5:].isdigit() 186 187 188def starts_new_line(item: str) -> bool: 189 """Returns True if the given element should start a new report line""" 190 if item in TAF_NEWLINE: 191 return True 192 return any(item.startswith(start) for start in TAF_NEWLINE_STARTSWITH) 193 194 195def split_taf(txt: str) -> list[str]: 196 """Split a TAF report into each distinct time period.""" 197 lines = [] 198 split = txt.split() 199 last_index = 0 200 e_splits = enumerate(split) 201 next(e_splits) 202 for i, item in e_splits: 203 if (starts_new_line(item) and not split[i - 1].startswith("PROB")) or ( 204 is_normal_time(item) and not starts_new_line(split[i - 1]) 205 ): 206 lines.append(" ".join(split[last_index:i])) 207 last_index = i 208 lines.append(" ".join(split[last_index:])) 209 return lines 210 211 212# TAF line report type and start/end times 213def get_type_and_times( 214 data: list[str], 215) -> tuple[list[str], str, str | None, str | None, str | None]: 216 """Extract the report type string, start time string, and end time string.""" 217 report_type, start_time, end_time, transition = "FROM", None, None, None 218 # TEMPO, BECMG, INTER 219 if data and data[0] in TAF_NEWLINE or len(data[0]) == 6 and data[0].startswith("PROB"): 220 report_type = data.pop(0) 221 if data: 222 item, length = data[0], len(data[0]) 223 # 1200/1306 224 if is_normal_time(item): 225 start_time, end_time = data.pop(0).split("/") 226 227 # 1200 1306 228 elif len(data) == 8 and length == 4 and len(data[1]) == 4 and item.isdigit() and data[1].isdigit(): 229 start_time = data.pop(0) 230 end_time = data.pop(0) 231 232 # 120000 233 elif length == 6 and item.isdigit() and item[-2:] == "00": 234 start_time = data.pop(0)[:4] 235 # FM120000 236 elif length > 7 and item.startswith("FM"): 237 report_type = "FROM" 238 if "/" in item and item[2:].split("/")[0].isdigit() and item[2:].split("/")[1].isdigit(): 239 start_time, end_time = data.pop(0)[2:].split("/") 240 elif item[2:8].isdigit(): 241 start_time = data.pop(0)[2:6] 242 # TL120600 243 if data and length > 7 and data[0].startswith("TL") and data[0][2:8].isdigit(): 244 end_time = data.pop(0)[2:6] 245 elif report_type == "BECMG" and length == 5: 246 # 1200/ 247 if item[-1] == "/" and item[:4].isdigit(): 248 start_time = data.pop(0)[:4] 249 # /1200 250 elif item[0] == "/" and item[1:].isdigit(): 251 end_time = data.pop(0)[1:] 252 if report_type == "BECMG": 253 transition, start_time, end_time = start_time, end_time, None 254 return data, report_type, start_time, end_time, transition 255 256 257def _is_tempo_or_prob(line: TafLineData) -> bool: 258 """Return True if report type is TEMPO or non-null probability.""" 259 return line.type == "TEMPO" or line.probability is not None 260 261 262def _get_next_time(lines: list[TafLineData], target: str) -> Timestamp | None: 263 """Returns the next normal time target value or empty""" 264 for line in lines: 265 if _is_tempo_or_prob(line): 266 continue 267 time = line.transition_start or getattr(line, target) if target == "start_time" else getattr(line, target) 268 if time: 269 return time # type: ignore 270 return None 271 272 273def find_missing_taf_times( 274 lines: list[TafLineData], start: Timestamp | None, end: Timestamp | None 275) -> list[TafLineData]: 276 """Fix any missing time issues except for error/empty lines.""" 277 if not lines: 278 return lines 279 # Assign start time 280 lines[0].start_time = start 281 # Fix other times 282 last_fm_line = 0 283 for i, line in enumerate(lines): 284 if _is_tempo_or_prob(line): 285 continue 286 last_fm_line = i 287 # Search remaining lines to fill empty end or previous for empty start 288 for target, other, direc in (("start", "end", -1), ("end", "start", 1)): 289 target += "_time" # noqa: PLW2901 290 if not getattr(line, target): 291 setattr(line, target, _get_next_time(lines[i::direc][1:], f"{other}_time")) 292 # Special case for final forcast 293 if last_fm_line: 294 lines[last_fm_line].end_time = end 295 # Reset original end time if still empty 296 if lines and not lines[0].end_time: 297 lines[0].end_time = end 298 return lines 299 300 301def get_wind_shear(data: list[str]) -> tuple[list[str], str | None]: 302 """Return the report list and the remove wind shear.""" 303 shear = None 304 for i, item in reversed(list(enumerate(data))): 305 if len(item) > 6 and item.startswith("WS") and item[5] == "/": 306 shear = data.pop(i).replace("KT", "") 307 return data, shear 308 309 310def get_temp_min_and_max( 311 data: list[str], 312) -> tuple[list[str], str | None, str | None]: 313 """Pull out Max temp at time and Min temp at time items from wx list.""" 314 temp_max, temp_min = "", "" 315 for i, item in reversed(list(enumerate(data))): 316 if len(item) > 6 and item[0] == "T" and "/" in item: 317 # TX12/1316Z 318 if item[1] == "X": 319 temp_max = data.pop(i) 320 # TNM03/1404Z 321 elif item[1] == "N": 322 temp_min = data.pop(i) 323 # TM03/1404Z T12/1316Z -> Will fix TN/TX 324 elif item[1] == "M" or item[1].isdigit(): 325 if temp_min: 326 if int(temp_min[2 : temp_min.find("/")].replace("M", "-")) > int( 327 item[1 : item.find("/")].replace("M", "-") 328 ): 329 temp_max, temp_min = f"TX{temp_min[2:]}", f"TN{item[1:]}" 330 else: 331 temp_max = f"TX{item[1:]}" 332 else: 333 temp_min = f"TN{item[1:]}" 334 data.pop(i) 335 return data, temp_max or None, temp_min or None 336 337 338def get_oceania_temp_and_alt(data: list[str]) -> tuple[list[str], list[str], list[str]]: 339 """Get Temperature and Altimeter lists for Oceania TAFs.""" 340 tlist: list[str] = [] 341 qlist: list[str] = [] 342 if "T" in data: 343 data, tlist = core.get_digit_list(data, data.index("T")) 344 if "Q" in data: 345 data, qlist = core.get_digit_list(data, data.index("Q")) 346 return data, tlist, qlist 347 348 349def get_taf_flight_rules(lines: list[TafLineData]) -> list[TafLineData]: 350 """Get flight rules by looking for missing data in prior reports.""" 351 for i, line in enumerate(lines): 352 temp_vis, temp_cloud, is_clear = line.visibility, line.clouds, False 353 for report in reversed(lines[: i + 1]): 354 if not _is_tempo_or_prob(report): 355 if not temp_vis: 356 temp_vis = report.visibility 357 # SKC or CLR should force no clouds instead of looking back 358 if "SKC" in report.other or "CLR" in report.other or temp_vis and temp_vis.repr == "CAVOK": 359 is_clear = True 360 elif temp_cloud == []: 361 temp_cloud = report.clouds 362 if temp_vis and temp_cloud != []: 363 break 364 if is_clear: 365 temp_cloud = [] 366 line.flight_rules = FLIGHT_RULES[core.get_flight_rules(temp_vis, core.get_ceiling(temp_cloud))] 367 return lines 368 369 370def fix_report_header(report: str) -> str: 371 """Correct the header order for key elements.""" 372 split_report = report.split() 373 374 # Limit scope to only the first few elements. Remarks may include similar tokens 375 header_length = min(len(split_report), 6) 376 headers = split_report[:header_length] 377 378 fixed_headers = [] 379 for target in ("TAF", "AMD", "COR"): 380 with suppress(ValueError): 381 headers.remove(target) 382 fixed_headers.append(target) 383 384 return " ".join(fixed_headers + headers + split_report[header_length:]) 385 386 387def _is_possible_start_end_time_slash(item: str) -> bool: 388 """Return True if item is a possible period start or end with missing element.""" 389 return len(item) == 5 and ( 390 # 1200/ 391 (item[-1] == "/" and item[:4].isdigit()) 392 or 393 # /1200 394 (item[0] == "/" and item[1:].isdigit()) 395 ) 396 397 398def parse( 399 station: str, report: str, issued: date | None = None 400) -> tuple[TafData | None, Units | None, Sanitization | None]: 401 """Return TafData and Units dataclasses with parsed data and their associated units.""" 402 if not report: 403 return None, None, None 404 valid_station(station) 405 report = fix_report_header(report) 406 is_amended, is_correction = False, False 407 while len(report) > 3 and report[:4] in ("TAF ", "AMD ", "COR "): 408 if report[:3] == "AMD": 409 is_amended = True 410 elif report[:3] == "COR": 411 is_correction = True 412 report = report[4:] 413 start_time: Timestamp | None = None 414 end_time: Timestamp | None = None 415 sans = Sanitization() 416 sanitized = clean_taf_string(report, sans) 417 _, new_station, time = core.get_station_and_time(sanitized[:20].split()) 418 if new_station is not None: 419 station = new_station 420 sanitized = sanitized.replace(station, "") 421 if time: 422 sanitized = sanitized.replace(time, "").strip() 423 units = Units.north_american() if uses_na_format(station) else Units.international() 424 # Find and remove remarks 425 sanitized, remarks = get_taf_remarks(sanitized) 426 if remarks.startswith("AMD"): 427 is_amended = True 428 # Split and parse each line 429 lines = split_taf(sanitized) 430 parsed_lines = parse_lines(lines, units, sans, issued) 431 # Perform additional info extract and corrections 432 max_temp: str | None = None 433 min_temp: str | None = None 434 if parsed_lines: 435 ( 436 parsed_lines[-1].other, 437 max_temp, 438 min_temp, 439 ) = get_temp_min_and_max(parsed_lines[-1].other) 440 if not (max_temp or min_temp): 441 ( 442 parsed_lines[0].other, 443 max_temp, 444 min_temp, 445 ) = get_temp_min_and_max(parsed_lines[0].other) 446 # Set start and end times based on the first line 447 start_time, end_time = parsed_lines[0].start_time, parsed_lines[0].end_time 448 parsed_lines[0].end_time = None 449 parsed_lines = find_missing_taf_times(parsed_lines, start_time, end_time) 450 parsed_lines = get_taf_flight_rules(parsed_lines) 451 # Extract Oceania-specific data 452 alts: list[str] | None = None 453 temps: list[str] | None = None 454 if station[0] == "A": 455 ( 456 parsed_lines[-1].other, 457 alts, 458 temps, 459 ) = get_oceania_temp_and_alt(parsed_lines[-1].other) 460 # Convert wx codes 461 for line in parsed_lines: 462 line.other, line.wx_codes = get_wx_codes(line.other) 463 sanitized = " ".join(i for i in (station, time, sanitized) if i) 464 struct = TafData( 465 raw=report, 466 sanitized=sanitized, 467 station=station, 468 time=core.make_timestamp(time, target_date=issued), 469 remarks=remarks, 470 remarks_info=parse_remarks(remarks), 471 forecast=parsed_lines, 472 start_time=start_time, 473 end_time=end_time, 474 is_amended=is_amended, 475 is_correction=is_correction, 476 max_temp=max_temp, 477 min_temp=min_temp, 478 alts=alts, 479 temps=temps, 480 ) 481 return struct, units, sans 482 483 484def parse_lines(lines: list[str], units: Units, sans: Sanitization, issued: date | None = None) -> list[TafLineData]: 485 """Return a list of parsed line dictionaries.""" 486 parsed_lines: list[TafLineData] = [] 487 prob = "" 488 while lines: 489 raw_line = lines[0].strip() 490 line = sanitize_line(raw_line, sans) 491 # Remove prob from the beginning of a line 492 if line.startswith("PROB"): 493 # Add standalone prob to next line 494 if len(line) == 6: 495 prob = line 496 line = "" 497 # Add to current line 498 elif len(line) > 6: 499 prob = line[:6] 500 line = line[6:].strip() 501 if line: 502 parsed_line = parse_line(line, units, sans, issued) 503 parsed_line.probability = None if " " in prob else core.make_number(prob[4:]) 504 parsed_line.raw = raw_line 505 if prob: 506 parsed_line.sanitized = f"{prob} {parsed_line.sanitized}" 507 prob = "" 508 parsed_lines.append(parsed_line) 509 lines.pop(0) 510 return parsed_lines 511 512 513def parse_line(line: str, units: Units, sans: Sanitization, issued: date | None = None) -> TafLineData: 514 """Parser for the International TAF forcast variant.""" 515 data: list[str] = core.dedupe(line.split()) 516 # Grab original time piece under certain conditions to preserve a useful slash 517 old_time = data[1] if len(data) > 1 and _is_possible_start_end_time_slash(data[1]) else None 518 data = clean_taf_list(data, sans) 519 if old_time and len(data) > 1 and data[1] == old_time.strip("/"): 520 data[1] = old_time 521 sanitized = " ".join(data) 522 data, report_type, start_time, end_time, transition = get_type_and_times(data) 523 data, wind_shear = get_wind_shear(data) 524 ( 525 data, 526 wind_direction, 527 wind_speed, 528 wind_gust, 529 wind_variable_direction, 530 ) = core.get_wind(data, units) 531 if "CAVOK" in data: 532 visibility = core.make_number("CAVOK") 533 clouds: list[Cloud] = [] 534 data.pop(data.index("CAVOK")) 535 else: 536 data, visibility = core.get_visibility(data, units) 537 data, clouds = core.get_clouds(data) 538 other, altimeter, icing, turbulence = get_alt_ice_turb(data) 539 return TafLineData( 540 altimeter=altimeter, 541 clouds=clouds, 542 flight_rules="", 543 other=other, 544 visibility=visibility, 545 wind_direction=wind_direction, 546 wind_gust=wind_gust, 547 wind_speed=wind_speed, 548 wx_codes=[], 549 end_time=core.make_timestamp(end_time, target_date=issued), 550 icing=icing, 551 probability=None, 552 raw=line, 553 sanitized=sanitized, 554 start_time=core.make_timestamp(start_time, target_date=issued), 555 transition_start=core.make_timestamp(transition, target_date=issued), 556 turbulence=turbulence, 557 type=report_type, 558 wind_shear=wind_shear, 559 wind_variable_direction=wind_variable_direction, 560 )
39class Taf(Report): 40 """ 41 The Taf class offers an object-oriented approach to managing TAF data for a 42 single station. 43 44 ```python 45 >>> from avwx import Taf 46 >>> kjfk = Taf("KJFK") 47 >>> kjfk.station.name 48 'John F Kennedy International Airport' 49 >>> kjfk.update() 50 True 51 >>> kjfk.last_updated 52 datetime.datetime(2018, 3, 4, 23, 43, 26, 209644, tzinfo=datetime.timezone.utc) 53 >>> kjfk.raw 54 'KJFK 042030Z 0421/0524 33016G27KT P6SM BKN045 FM051600 36016G22KT P6SM BKN040 FM052100 35013KT P6SM SCT035' 55 >>> len(kjfk.data.forecast) 56 3 57 >>> kjfk.data.forecast[0].flight_rules 58 'VFR' 59 >>> kjfk.translations.forecast[0].wind 60 'NNW-330 at 16kt gusting to 27kt' 61 >>> kjfk.speech 62 'Starting on March 4th - From 21 to 16 zulu, Winds three three zero at 16kt gusting to 27kt. Visibility greater than six miles. Broken layer at 4500ft. From 16 to 21 zulu, Winds three six zero at 16kt gusting to 22kt. Visibility greater than six miles. Broken layer at 4000ft. From 21 to midnight zulu, Winds three five zero at 13kt. Visibility greater than six miles. Scattered clouds at 3500ft' 63 ``` 64 65 The `parse` and `from_report` methods can parse a report string if you want 66 to override the normal fetching process. 67 68 ```python 69 >>> from avwx import Taf 70 >>> report = "TAF ZYHB 082300Z 0823/0911 VRB03KT 9999 SCT018 BKN120 TX14/0907Z TN04/0921Z FM090100 09015KT 9999 -SHRA WS020/13045KT SCT018 BKN120 BECMG 0904/0906 34008KT PROB30 TEMPO 0906/0911 7000 -RA SCT020 650104 530804 RMK FCST BASED ON AUTO OBS. NXT FCST BY 090600Z" 71 >>> zyhb = Taf.from_report(report) 72 True 73 >>> zyhb.station.city 74 'Hulan' 75 >>> zyhb.data.remarks 76 'RMK FCST BASED ON AUTO OBS. NXT FCST BY 090600Z' 77 >>> zyhb.summary[-1] 78 'Vis 7km, Light Rain, Scattered clouds at 2000ft, Frequent moderate turbulence in clear air from 8000ft to 12000ft, Moderate icing in clouds from 1000ft to 5000ft' 79 ``` 80 """ 81 82 data: TafData | None = None 83 translations: TafTrans | None = None # type: ignore 84 85 async def _post_update(self) -> None: 86 if self.code is None or self.raw is None: 87 return 88 self.data, self.units, self.sanitization = parse(self.code, self.raw, self.issued) 89 if self.data is None or self.units is None: 90 return 91 self.translations = translate_taf(self.data, self.units) 92 93 def _post_parse(self) -> None: 94 if self.code is None or self.raw is None: 95 return 96 self.data, self.units, self.sanitization = parse(self.code, self.raw, self.issued) 97 if self.data is None or self.units is None: 98 return 99 self.translations = translate_taf(self.data, self.units) 100 101 @property 102 def summary(self) -> list[str]: 103 """Condensed summary for each forecast created from translations.""" 104 if not self.translations: 105 self.update() 106 if self.translations is None or self.translations.forecast is None: 107 return [] 108 return [summary.taf(trans) for trans in self.translations.forecast] 109 110 @property 111 def speech(self) -> str | None: 112 """Report summary designed to be read by a text-to-speech program.""" 113 if not self.data: 114 self.update() 115 if self.data is None or self.units is None: 116 return None 117 return speech.taf(self.data, self.units)
The Taf class offers an object-oriented approach to managing TAF data for a single station.
>>> from avwx import Taf
>>> kjfk = Taf("KJFK")
>>> kjfk.station.name
'John F Kennedy International Airport'
>>> kjfk.update()
True
>>> kjfk.last_updated
datetime.datetime(2018, 3, 4, 23, 43, 26, 209644, tzinfo=datetime.timezone.utc)
>>> kjfk.raw
'KJFK 042030Z 0421/0524 33016G27KT P6SM BKN045 FM051600 36016G22KT P6SM BKN040 FM052100 35013KT P6SM SCT035'
>>> len(kjfk.data.forecast)
3
>>> kjfk.data.forecast[0].flight_rules
'VFR'
>>> kjfk.translations.forecast[0].wind
'NNW-330 at 16kt gusting to 27kt'
>>> kjfk.speech
'Starting on March 4th - From 21 to 16 zulu, Winds three three zero at 16kt gusting to 27kt. Visibility greater than six miles. Broken layer at 4500ft. From 16 to 21 zulu, Winds three six zero at 16kt gusting to 22kt. Visibility greater than six miles. Broken layer at 4000ft. From 21 to midnight zulu, Winds three five zero at 13kt. Visibility greater than six miles. Scattered clouds at 3500ft'
The parse
and from_report
methods can parse a report string if you want
to override the normal fetching process.
>>> from avwx import Taf
>>> report = "TAF ZYHB 082300Z 0823/0911 VRB03KT 9999 SCT018 BKN120 TX14/0907Z TN04/0921Z FM090100 09015KT 9999 -SHRA WS020/13045KT SCT018 BKN120 BECMG 0904/0906 34008KT PROB30 TEMPO 0906/0911 7000 -RA SCT020 650104 530804 RMK FCST BASED ON AUTO OBS. NXT FCST BY 090600Z"
>>> zyhb = Taf.from_report(report)
True
>>> zyhb.station.city
'Hulan'
>>> zyhb.data.remarks
'RMK FCST BASED ON AUTO OBS. NXT FCST BY 090600Z'
>>> zyhb.summary[-1]
'Vis 7km, Light Rain, Scattered clouds at 2000ft, Frequent moderate turbulence in clear air from 8000ft to 12000ft, Moderate icing in clouds from 1000ft to 5000ft'
101 @property 102 def summary(self) -> list[str]: 103 """Condensed summary for each forecast created from translations.""" 104 if not self.translations: 105 self.update() 106 if self.translations is None or self.translations.forecast is None: 107 return [] 108 return [summary.taf(trans) for trans in self.translations.forecast]
Condensed summary for each forecast created from translations.
110 @property 111 def speech(self) -> str | None: 112 """Report summary designed to be read by a text-to-speech program.""" 113 if not self.data: 114 self.update() 115 if self.data is None or self.units is None: 116 return None 117 return speech.taf(self.data, self.units)
Report summary designed to be read by a text-to-speech program.
139def sanitize_line(txt: str, sans: Sanitization) -> str: 140 """Fix common mistakes with 'new line' signifiers so that they can be recognized.""" 141 for key, fix in LINE_FIXES.items(): 142 if key in txt: 143 txt = txt.replace(key, fix) 144 sans.log(key, fix) 145 # Fix when space is missing following new line signifiers 146 for item in ["BECMG", "TEMPO"]: 147 if item in txt and f"{item} " not in txt: 148 index = txt.find(item) + len(item) 149 txt = f"{txt[:index]} {txt[index:]}" 150 sans.extra_spaces_needed = True 151 return txt
Fix common mistakes with 'new line' signifiers so that they can be recognized.
154def get_taf_remarks(txt: str) -> tuple[str, str]: 155 """Return report and remarks separated if found.""" 156 remarks_start = core.find_first_in_list(txt, TAF_RMK) 157 if remarks_start == -1: 158 return txt, "" 159 remarks = txt[remarks_start:] 160 txt = txt[:remarks_start].strip() 161 return txt, remarks
Return report and remarks separated if found.
164def get_alt_ice_turb( 165 data: list[str], 166) -> tuple[list[str], Number | None, list[str], list[str]]: 167 """Return the report list and removed: Altimeter string, Icing list, Turbulence list.""" 168 altimeter_number = None 169 icing, turbulence = [], [] 170 for i, item in reversed(list(enumerate(data))): 171 if len(item) > 6 and item.startswith("QNH") and item[3:7].isdigit(): 172 altimeter = data.pop(i)[3:7] 173 if altimeter[0] in ("2", "3"): 174 altimeter = f"{altimeter[:2]}.{altimeter[2:]}" 175 altimeter_number = core.make_number(altimeter, literal=True) 176 elif item.isdigit(): 177 if item[0] == "6": 178 icing.append(data.pop(i)) 179 elif item[0] == "5": 180 turbulence.append(data.pop(i)) 181 return data, altimeter_number, icing, turbulence
Return the report list and removed: Altimeter string, Icing list, Turbulence list.
184def is_normal_time(item: str) -> bool: 185 """Return if the item looks like a valid TAF (1200/1400) time range.""" 186 return len(item) == 9 and item[4] == "/" and item[:4].isdigit() and item[5:].isdigit()
Return if the item looks like a valid TAF (1200/1400) time range.
189def starts_new_line(item: str) -> bool: 190 """Returns True if the given element should start a new report line""" 191 if item in TAF_NEWLINE: 192 return True 193 return any(item.startswith(start) for start in TAF_NEWLINE_STARTSWITH)
Returns True if the given element should start a new report line
196def split_taf(txt: str) -> list[str]: 197 """Split a TAF report into each distinct time period.""" 198 lines = [] 199 split = txt.split() 200 last_index = 0 201 e_splits = enumerate(split) 202 next(e_splits) 203 for i, item in e_splits: 204 if (starts_new_line(item) and not split[i - 1].startswith("PROB")) or ( 205 is_normal_time(item) and not starts_new_line(split[i - 1]) 206 ): 207 lines.append(" ".join(split[last_index:i])) 208 last_index = i 209 lines.append(" ".join(split[last_index:])) 210 return lines
Split a TAF report into each distinct time period.
214def get_type_and_times( 215 data: list[str], 216) -> tuple[list[str], str, str | None, str | None, str | None]: 217 """Extract the report type string, start time string, and end time string.""" 218 report_type, start_time, end_time, transition = "FROM", None, None, None 219 # TEMPO, BECMG, INTER 220 if data and data[0] in TAF_NEWLINE or len(data[0]) == 6 and data[0].startswith("PROB"): 221 report_type = data.pop(0) 222 if data: 223 item, length = data[0], len(data[0]) 224 # 1200/1306 225 if is_normal_time(item): 226 start_time, end_time = data.pop(0).split("/") 227 228 # 1200 1306 229 elif len(data) == 8 and length == 4 and len(data[1]) == 4 and item.isdigit() and data[1].isdigit(): 230 start_time = data.pop(0) 231 end_time = data.pop(0) 232 233 # 120000 234 elif length == 6 and item.isdigit() and item[-2:] == "00": 235 start_time = data.pop(0)[:4] 236 # FM120000 237 elif length > 7 and item.startswith("FM"): 238 report_type = "FROM" 239 if "/" in item and item[2:].split("/")[0].isdigit() and item[2:].split("/")[1].isdigit(): 240 start_time, end_time = data.pop(0)[2:].split("/") 241 elif item[2:8].isdigit(): 242 start_time = data.pop(0)[2:6] 243 # TL120600 244 if data and length > 7 and data[0].startswith("TL") and data[0][2:8].isdigit(): 245 end_time = data.pop(0)[2:6] 246 elif report_type == "BECMG" and length == 5: 247 # 1200/ 248 if item[-1] == "/" and item[:4].isdigit(): 249 start_time = data.pop(0)[:4] 250 # /1200 251 elif item[0] == "/" and item[1:].isdigit(): 252 end_time = data.pop(0)[1:] 253 if report_type == "BECMG": 254 transition, start_time, end_time = start_time, end_time, None 255 return data, report_type, start_time, end_time, transition
Extract the report type string, start time string, and end time string.
274def find_missing_taf_times( 275 lines: list[TafLineData], start: Timestamp | None, end: Timestamp | None 276) -> list[TafLineData]: 277 """Fix any missing time issues except for error/empty lines.""" 278 if not lines: 279 return lines 280 # Assign start time 281 lines[0].start_time = start 282 # Fix other times 283 last_fm_line = 0 284 for i, line in enumerate(lines): 285 if _is_tempo_or_prob(line): 286 continue 287 last_fm_line = i 288 # Search remaining lines to fill empty end or previous for empty start 289 for target, other, direc in (("start", "end", -1), ("end", "start", 1)): 290 target += "_time" # noqa: PLW2901 291 if not getattr(line, target): 292 setattr(line, target, _get_next_time(lines[i::direc][1:], f"{other}_time")) 293 # Special case for final forcast 294 if last_fm_line: 295 lines[last_fm_line].end_time = end 296 # Reset original end time if still empty 297 if lines and not lines[0].end_time: 298 lines[0].end_time = end 299 return lines
Fix any missing time issues except for error/empty lines.
302def get_wind_shear(data: list[str]) -> tuple[list[str], str | None]: 303 """Return the report list and the remove wind shear.""" 304 shear = None 305 for i, item in reversed(list(enumerate(data))): 306 if len(item) > 6 and item.startswith("WS") and item[5] == "/": 307 shear = data.pop(i).replace("KT", "") 308 return data, shear
Return the report list and the remove wind shear.
311def get_temp_min_and_max( 312 data: list[str], 313) -> tuple[list[str], str | None, str | None]: 314 """Pull out Max temp at time and Min temp at time items from wx list.""" 315 temp_max, temp_min = "", "" 316 for i, item in reversed(list(enumerate(data))): 317 if len(item) > 6 and item[0] == "T" and "/" in item: 318 # TX12/1316Z 319 if item[1] == "X": 320 temp_max = data.pop(i) 321 # TNM03/1404Z 322 elif item[1] == "N": 323 temp_min = data.pop(i) 324 # TM03/1404Z T12/1316Z -> Will fix TN/TX 325 elif item[1] == "M" or item[1].isdigit(): 326 if temp_min: 327 if int(temp_min[2 : temp_min.find("/")].replace("M", "-")) > int( 328 item[1 : item.find("/")].replace("M", "-") 329 ): 330 temp_max, temp_min = f"TX{temp_min[2:]}", f"TN{item[1:]}" 331 else: 332 temp_max = f"TX{item[1:]}" 333 else: 334 temp_min = f"TN{item[1:]}" 335 data.pop(i) 336 return data, temp_max or None, temp_min or None
Pull out Max temp at time and Min temp at time items from wx list.
339def get_oceania_temp_and_alt(data: list[str]) -> tuple[list[str], list[str], list[str]]: 340 """Get Temperature and Altimeter lists for Oceania TAFs.""" 341 tlist: list[str] = [] 342 qlist: list[str] = [] 343 if "T" in data: 344 data, tlist = core.get_digit_list(data, data.index("T")) 345 if "Q" in data: 346 data, qlist = core.get_digit_list(data, data.index("Q")) 347 return data, tlist, qlist
Get Temperature and Altimeter lists for Oceania TAFs.
350def get_taf_flight_rules(lines: list[TafLineData]) -> list[TafLineData]: 351 """Get flight rules by looking for missing data in prior reports.""" 352 for i, line in enumerate(lines): 353 temp_vis, temp_cloud, is_clear = line.visibility, line.clouds, False 354 for report in reversed(lines[: i + 1]): 355 if not _is_tempo_or_prob(report): 356 if not temp_vis: 357 temp_vis = report.visibility 358 # SKC or CLR should force no clouds instead of looking back 359 if "SKC" in report.other or "CLR" in report.other or temp_vis and temp_vis.repr == "CAVOK": 360 is_clear = True 361 elif temp_cloud == []: 362 temp_cloud = report.clouds 363 if temp_vis and temp_cloud != []: 364 break 365 if is_clear: 366 temp_cloud = [] 367 line.flight_rules = FLIGHT_RULES[core.get_flight_rules(temp_vis, core.get_ceiling(temp_cloud))] 368 return lines
Get flight rules by looking for missing data in prior reports.
371def fix_report_header(report: str) -> str: 372 """Correct the header order for key elements.""" 373 split_report = report.split() 374 375 # Limit scope to only the first few elements. Remarks may include similar tokens 376 header_length = min(len(split_report), 6) 377 headers = split_report[:header_length] 378 379 fixed_headers = [] 380 for target in ("TAF", "AMD", "COR"): 381 with suppress(ValueError): 382 headers.remove(target) 383 fixed_headers.append(target) 384 385 return " ".join(fixed_headers + headers + split_report[header_length:])
Correct the header order for key elements.
399def parse( 400 station: str, report: str, issued: date | None = None 401) -> tuple[TafData | None, Units | None, Sanitization | None]: 402 """Return TafData and Units dataclasses with parsed data and their associated units.""" 403 if not report: 404 return None, None, None 405 valid_station(station) 406 report = fix_report_header(report) 407 is_amended, is_correction = False, False 408 while len(report) > 3 and report[:4] in ("TAF ", "AMD ", "COR "): 409 if report[:3] == "AMD": 410 is_amended = True 411 elif report[:3] == "COR": 412 is_correction = True 413 report = report[4:] 414 start_time: Timestamp | None = None 415 end_time: Timestamp | None = None 416 sans = Sanitization() 417 sanitized = clean_taf_string(report, sans) 418 _, new_station, time = core.get_station_and_time(sanitized[:20].split()) 419 if new_station is not None: 420 station = new_station 421 sanitized = sanitized.replace(station, "") 422 if time: 423 sanitized = sanitized.replace(time, "").strip() 424 units = Units.north_american() if uses_na_format(station) else Units.international() 425 # Find and remove remarks 426 sanitized, remarks = get_taf_remarks(sanitized) 427 if remarks.startswith("AMD"): 428 is_amended = True 429 # Split and parse each line 430 lines = split_taf(sanitized) 431 parsed_lines = parse_lines(lines, units, sans, issued) 432 # Perform additional info extract and corrections 433 max_temp: str | None = None 434 min_temp: str | None = None 435 if parsed_lines: 436 ( 437 parsed_lines[-1].other, 438 max_temp, 439 min_temp, 440 ) = get_temp_min_and_max(parsed_lines[-1].other) 441 if not (max_temp or min_temp): 442 ( 443 parsed_lines[0].other, 444 max_temp, 445 min_temp, 446 ) = get_temp_min_and_max(parsed_lines[0].other) 447 # Set start and end times based on the first line 448 start_time, end_time = parsed_lines[0].start_time, parsed_lines[0].end_time 449 parsed_lines[0].end_time = None 450 parsed_lines = find_missing_taf_times(parsed_lines, start_time, end_time) 451 parsed_lines = get_taf_flight_rules(parsed_lines) 452 # Extract Oceania-specific data 453 alts: list[str] | None = None 454 temps: list[str] | None = None 455 if station[0] == "A": 456 ( 457 parsed_lines[-1].other, 458 alts, 459 temps, 460 ) = get_oceania_temp_and_alt(parsed_lines[-1].other) 461 # Convert wx codes 462 for line in parsed_lines: 463 line.other, line.wx_codes = get_wx_codes(line.other) 464 sanitized = " ".join(i for i in (station, time, sanitized) if i) 465 struct = TafData( 466 raw=report, 467 sanitized=sanitized, 468 station=station, 469 time=core.make_timestamp(time, target_date=issued), 470 remarks=remarks, 471 remarks_info=parse_remarks(remarks), 472 forecast=parsed_lines, 473 start_time=start_time, 474 end_time=end_time, 475 is_amended=is_amended, 476 is_correction=is_correction, 477 max_temp=max_temp, 478 min_temp=min_temp, 479 alts=alts, 480 temps=temps, 481 ) 482 return struct, units, sans
Return TafData and Units dataclasses with parsed data and their associated units.
485def parse_lines(lines: list[str], units: Units, sans: Sanitization, issued: date | None = None) -> list[TafLineData]: 486 """Return a list of parsed line dictionaries.""" 487 parsed_lines: list[TafLineData] = [] 488 prob = "" 489 while lines: 490 raw_line = lines[0].strip() 491 line = sanitize_line(raw_line, sans) 492 # Remove prob from the beginning of a line 493 if line.startswith("PROB"): 494 # Add standalone prob to next line 495 if len(line) == 6: 496 prob = line 497 line = "" 498 # Add to current line 499 elif len(line) > 6: 500 prob = line[:6] 501 line = line[6:].strip() 502 if line: 503 parsed_line = parse_line(line, units, sans, issued) 504 parsed_line.probability = None if " " in prob else core.make_number(prob[4:]) 505 parsed_line.raw = raw_line 506 if prob: 507 parsed_line.sanitized = f"{prob} {parsed_line.sanitized}" 508 prob = "" 509 parsed_lines.append(parsed_line) 510 lines.pop(0) 511 return parsed_lines
Return a list of parsed line dictionaries.
514def parse_line(line: str, units: Units, sans: Sanitization, issued: date | None = None) -> TafLineData: 515 """Parser for the International TAF forcast variant.""" 516 data: list[str] = core.dedupe(line.split()) 517 # Grab original time piece under certain conditions to preserve a useful slash 518 old_time = data[1] if len(data) > 1 and _is_possible_start_end_time_slash(data[1]) else None 519 data = clean_taf_list(data, sans) 520 if old_time and len(data) > 1 and data[1] == old_time.strip("/"): 521 data[1] = old_time 522 sanitized = " ".join(data) 523 data, report_type, start_time, end_time, transition = get_type_and_times(data) 524 data, wind_shear = get_wind_shear(data) 525 ( 526 data, 527 wind_direction, 528 wind_speed, 529 wind_gust, 530 wind_variable_direction, 531 ) = core.get_wind(data, units) 532 if "CAVOK" in data: 533 visibility = core.make_number("CAVOK") 534 clouds: list[Cloud] = [] 535 data.pop(data.index("CAVOK")) 536 else: 537 data, visibility = core.get_visibility(data, units) 538 data, clouds = core.get_clouds(data) 539 other, altimeter, icing, turbulence = get_alt_ice_turb(data) 540 return TafLineData( 541 altimeter=altimeter, 542 clouds=clouds, 543 flight_rules="", 544 other=other, 545 visibility=visibility, 546 wind_direction=wind_direction, 547 wind_gust=wind_gust, 548 wind_speed=wind_speed, 549 wx_codes=[], 550 end_time=core.make_timestamp(end_time, target_date=issued), 551 icing=icing, 552 probability=None, 553 raw=line, 554 sanitized=sanitized, 555 start_time=core.make_timestamp(start_time, target_date=issued), 556 transition_start=core.make_timestamp(transition, target_date=issued), 557 turbulence=turbulence, 558 type=report_type, 559 wind_shear=wind_shear, 560 wind_variable_direction=wind_variable_direction, 561 )
Parser for the International TAF forcast variant.