avwx.current.taf
A TAF (Terminal Aerodrome Forecast) is a 24-hour weather forecast for the area 5 statute miles from the reporting station. They are update once every three or six hours or when significant changes warrant an update, and the observations are valid for six hours or until the next report is issued
1""" 2A TAF (Terminal Aerodrome Forecast) is a 24-hour weather forecast for the area 35 statute miles from the reporting station. They are update once every three or 4six hours or when significant changes warrant an update, and the observations 5are valid for six hours or until the next report is issued 6""" 7 8# stdlib 9from __future__ import annotations 10 11from contextlib import suppress 12from typing import TYPE_CHECKING 13 14# module 15from avwx.current.base import Report, get_wx_codes 16from avwx.parsing import core, speech, summary 17from avwx.parsing.remarks import parse as parse_remarks 18from avwx.parsing.sanitization.taf import clean_taf_list, clean_taf_string 19from avwx.parsing.translate.taf import translate_taf 20from avwx.static.core import FLIGHT_RULES 21from avwx.static.taf import TAF_NEWLINE, TAF_NEWLINE_STARTSWITH, TAF_RMK 22from avwx.station import uses_na_format, valid_station 23from avwx.structs import ( 24 Cloud, 25 Number, 26 Sanitization, 27 TafData, 28 TafLineData, 29 TafTrans, 30 Timestamp, 31 Units, 32) 33 34if TYPE_CHECKING: 35 from datetime import date 36 37 38class Taf(Report): 39 """ 40 The Taf class offers an object-oriented approach to managing TAF data for a 41 single station. 42 43 ```python 44 >>> from avwx import Taf 45 >>> kjfk = Taf("KJFK") 46 >>> kjfk.station.name 47 'John F Kennedy International Airport' 48 >>> kjfk.update() 49 True 50 >>> kjfk.last_updated 51 datetime.datetime(2018, 3, 4, 23, 43, 26, 209644, tzinfo=datetime.timezone.utc) 52 >>> kjfk.raw 53 'KJFK 042030Z 0421/0524 33016G27KT P6SM BKN045 FM051600 36016G22KT P6SM BKN040 FM052100 35013KT P6SM SCT035' 54 >>> len(kjfk.data.forecast) 55 3 56 >>> kjfk.data.forecast[0].flight_rules 57 'VFR' 58 >>> kjfk.translations.forecast[0].wind 59 'NNW-330 at 16kt gusting to 27kt' 60 >>> kjfk.speech 61 'Starting on March 4th - From 21 to 16 zulu, Winds three three zero at 16kt gusting to 27kt. Visibility greater than six miles. Broken layer at 4500ft. From 16 to 21 zulu, Winds three six zero at 16kt gusting to 22kt. Visibility greater than six miles. Broken layer at 4000ft. From 21 to midnight zulu, Winds three five zero at 13kt. Visibility greater than six miles. Scattered clouds at 3500ft' 62 ``` 63 64 The `parse` and `from_report` methods can parse a report string if you want 65 to override the normal fetching process. 66 67 ```python 68 >>> from avwx import Taf 69 >>> report = "TAF ZYHB 082300Z 0823/0911 VRB03KT 9999 SCT018 BKN120 TX14/0907Z TN04/0921Z FM090100 09015KT 9999 -SHRA WS020/13045KT SCT018 BKN120 BECMG 0904/0906 34008KT PROB30 TEMPO 0906/0911 7000 -RA SCT020 650104 530804 RMK FCST BASED ON AUTO OBS. NXT FCST BY 090600Z" 70 >>> zyhb = Taf.from_report(report) 71 True 72 >>> zyhb.station.city 73 'Hulan' 74 >>> zyhb.data.remarks 75 'RMK FCST BASED ON AUTO OBS. NXT FCST BY 090600Z' 76 >>> zyhb.summary[-1] 77 'Vis 7km, Light Rain, Scattered clouds at 2000ft, Frequent moderate turbulence in clear air from 8000ft to 12000ft, Moderate icing in clouds from 1000ft to 5000ft' 78 ``` 79 """ 80 81 data: TafData | None = None 82 translations: TafTrans | None = None # type: ignore 83 84 async def _post_update(self) -> None: 85 if self.code is None or self.raw is None: 86 return 87 self.data, self.units, self.sanitization = parse(self.code, self.raw, self.issued) 88 if self.data is None or self.units is None: 89 return 90 self.translations = translate_taf(self.data, self.units) 91 92 def _post_parse(self) -> None: 93 if self.code is None or self.raw is None: 94 return 95 self.data, self.units, self.sanitization = parse(self.code, self.raw, self.issued) 96 if self.data is None or self.units is None: 97 return 98 self.translations = translate_taf(self.data, self.units) 99 100 @property 101 def summary(self) -> list[str]: 102 """Condensed summary for each forecast created from translations.""" 103 if not self.translations: 104 self.update() 105 if self.translations is None or self.translations.forecast is None: 106 return [] 107 return [summary.taf(trans) for trans in self.translations.forecast] 108 109 @property 110 def speech(self) -> str | None: 111 """Report summary designed to be read by a text-to-speech program.""" 112 if not self.data: 113 self.update() 114 if self.data is None or self.units is None: 115 return None 116 return speech.taf(self.data, self.units) 117 118 119LINE_FIXES = { 120 "TEMP0": "TEMPO", 121 "TEMP O": "TEMPO", 122 "TMPO": "TEMPO", 123 "TE MPO": "TEMPO", 124 "TEMP ": "TEMPO ", 125 "T EMPO": "TEMPO", 126 " EMPO": " TEMPO", 127 "TEMO": "TEMPO", 128 "BECM G": "BECMG", 129 "BEMCG": "BECMG", 130 "BE CMG": "BECMG", 131 "B ECMG": "BECMG", 132 " BEC ": " BECMG ", 133 "BCEMG": "BECMG", 134 "BEMG": "BECMG", 135} 136 137 138def sanitize_line(txt: str, sans: Sanitization) -> str: 139 """Fix common mistakes with 'new line' signifiers so that they can be recognized.""" 140 for key, fix in LINE_FIXES.items(): 141 if key in txt: 142 txt = txt.replace(key, fix) 143 sans.log(key, fix) 144 # Fix when space is missing following new line signifiers 145 for item in ["BECMG", "TEMPO"]: 146 if item in txt and f"{item} " not in txt: 147 index = txt.find(item) + len(item) 148 txt = f"{txt[:index]} {txt[index:]}" 149 sans.extra_spaces_needed = True 150 return txt 151 152 153def get_taf_remarks(txt: str) -> tuple[str, str]: 154 """Return report and remarks separated if found.""" 155 remarks_start = core.find_first_in_list(txt, TAF_RMK) 156 if remarks_start == -1: 157 return txt, "" 158 remarks = txt[remarks_start:] 159 txt = txt[:remarks_start].strip() 160 return txt, remarks 161 162 163def get_alt_ice_turb( 164 data: list[str], 165) -> tuple[list[str], Number | None, list[str], list[str]]: 166 """Return the report list and removed: Altimeter string, Icing list, Turbulence list.""" 167 altimeter_number = None 168 icing, turbulence = [], [] 169 for i, item in reversed(list(enumerate(data))): 170 if len(item) > 6 and item.startswith("QNH") and item[3:7].isdigit(): 171 altimeter = data.pop(i)[3:7] 172 if altimeter[0] in ("2", "3"): 173 altimeter = f"{altimeter[:2]}.{altimeter[2:]}" 174 altimeter_number = core.make_number(altimeter, literal=True) 175 elif item.isdigit(): 176 if item[0] == "6": 177 icing.append(data.pop(i)) 178 elif item[0] == "5": 179 turbulence.append(data.pop(i)) 180 return data, altimeter_number, icing, turbulence 181 182 183def is_normal_time(item: str) -> bool: 184 """Return if the item looks like a valid TAF (1200/1400) time range.""" 185 return len(item) == 9 and item[4] == "/" and item[:4].isdigit() and item[5:].isdigit() 186 187 188def starts_new_line(item: str) -> bool: 189 """Returns True if the given element should start a new report line""" 190 if item in TAF_NEWLINE: 191 return True 192 return any(item.startswith(start) for start in TAF_NEWLINE_STARTSWITH) 193 194 195def split_taf(txt: str) -> list[str]: 196 """Split a TAF report into each distinct time period.""" 197 lines = [] 198 split = txt.split() 199 last_index = 0 200 e_splits = enumerate(split) 201 next(e_splits) 202 for i, item in e_splits: 203 if (starts_new_line(item) and not split[i - 1].startswith("PROB")) or ( 204 is_normal_time(item) and not starts_new_line(split[i - 1]) 205 ): 206 lines.append(" ".join(split[last_index:i])) 207 last_index = i 208 lines.append(" ".join(split[last_index:])) 209 return lines 210 211 212# TAF line report type and start/end times 213def get_type_and_times( 214 data: list[str], 215) -> tuple[list[str], str, str | None, str | None, str | None]: 216 """Extract the report type string, start time string, and end time string.""" 217 report_type, start_time, end_time, transition = "FROM", None, None, None 218 # TEMPO, BECMG, INTER 219 if data and data[0] in TAF_NEWLINE or len(data[0]) == 6 and data[0].startswith("PROB"): 220 report_type = data.pop(0) 221 if data: 222 item, length = data[0], len(data[0]) 223 # 1200/1306 224 if is_normal_time(item): 225 start_time, end_time = data.pop(0).split("/") 226 227 # 1200 1306 228 elif len(data) == 8 and length == 4 and len(data[1]) == 4 and item.isdigit() and data[1].isdigit(): 229 start_time = data.pop(0) 230 end_time = data.pop(0) 231 232 # 120000 233 elif length == 6 and item.isdigit() and item[-2:] == "00": 234 start_time = data.pop(0)[:4] 235 # FM120000 236 elif length > 7 and item.startswith("FM"): 237 report_type = "FROM" 238 if "/" in item and item[2:].split("/")[0].isdigit() and item[2:].split("/")[1].isdigit(): 239 start_time, end_time = data.pop(0)[2:].split("/") 240 elif item[2:8].isdigit(): 241 start_time = data.pop(0)[2:6] 242 # TL120600 243 if data and length > 7 and data[0].startswith("TL") and data[0][2:8].isdigit(): 244 end_time = data.pop(0)[2:6] 245 elif report_type == "BECMG" and length == 5: 246 # 1200/ 247 if item[-1] == "/" and item[:4].isdigit(): 248 start_time = data.pop(0)[:4] 249 # /1200 250 elif item[0] == "/" and item[1:].isdigit(): 251 end_time = data.pop(0)[1:] 252 if report_type == "BECMG": 253 transition, start_time, end_time = start_time, end_time, None 254 return data, report_type, start_time, end_time, transition 255 256 257def _is_tempo_or_prob(line: TafLineData) -> bool: 258 """Return True if report type is TEMPO or non-null probability.""" 259 return line.type == "TEMPO" or line.probability is not None 260 261 262def _get_next_time(lines: list[TafLineData], target: str) -> Timestamp | None: 263 """Returns the next normal time target value or empty""" 264 for line in lines: 265 if _is_tempo_or_prob(line): 266 continue 267 time = line.transition_start or getattr(line, target) if target == "start_time" else getattr(line, target) 268 if time: 269 return time # type: ignore 270 return None 271 272 273def find_missing_taf_times( 274 lines: list[TafLineData], start: Timestamp | None, end: Timestamp | None 275) -> list[TafLineData]: 276 """Fix any missing time issues except for error/empty lines.""" 277 if not lines: 278 return lines 279 # Assign start time 280 lines[0].start_time = start 281 # Fix other times 282 last_fm_line = 0 283 for i, line in enumerate(lines): 284 if _is_tempo_or_prob(line): 285 continue 286 last_fm_line = i 287 # Search remaining lines to fill empty end or previous for empty start 288 for target, other, direc in (("start", "end", -1), ("end", "start", 1)): 289 target += "_time" # noqa: PLW2901 290 if not getattr(line, target): 291 setattr(line, target, _get_next_time(lines[i::direc][1:], f"{other}_time")) 292 # Special case for final forcast 293 if last_fm_line: 294 lines[last_fm_line].end_time = end 295 # Reset original end time if still empty 296 if lines and not lines[0].end_time: 297 lines[0].end_time = end 298 return lines 299 300 301def get_wind_shear(data: list[str]) -> tuple[list[str], str | None]: 302 """Return the report list and the remove wind shear.""" 303 shear = None 304 for i, item in reversed(list(enumerate(data))): 305 if len(item) > 6 and item.startswith("WS") and item[5] == "/": 306 shear = data.pop(i).replace("KT", "") 307 return data, shear 308 309 310def get_temp_min_and_max( 311 data: list[str], 312) -> tuple[list[str], str | None, str | None]: 313 """Pull out Max temp at time and Min temp at time items from wx list.""" 314 temp_max, temp_min = "", "" 315 for i, item in reversed(list(enumerate(data))): 316 if len(item) > 6 and item[0] == "T" and "/" in item: 317 # TX12/1316Z 318 if item[1] == "X": 319 temp_max = data.pop(i) 320 # TNM03/1404Z 321 elif item[1] == "N": 322 temp_min = data.pop(i) 323 # TM03/1404Z T12/1316Z -> Will fix TN/TX 324 elif item[1] == "M" or item[1].isdigit(): 325 if temp_min: 326 if int(temp_min[2 : temp_min.find("/")].replace("M", "-")) > int( 327 item[1 : item.find("/")].replace("M", "-") 328 ): 329 temp_max, temp_min = f"TX{temp_min[2:]}", f"TN{item[1:]}" 330 else: 331 temp_max = f"TX{item[1:]}" 332 else: 333 temp_min = f"TN{item[1:]}" 334 data.pop(i) 335 return data, temp_max or None, temp_min or None 336 337 338def get_oceania_temp_and_alt(data: list[str]) -> tuple[list[str], list[str], list[str]]: 339 """Get Temperature and Altimeter lists for Oceania TAFs.""" 340 tlist: list[str] = [] 341 qlist: list[str] = [] 342 if "T" in data: 343 data, tlist = core.get_digit_list(data, data.index("T")) 344 if "Q" in data: 345 data, qlist = core.get_digit_list(data, data.index("Q")) 346 return data, tlist, qlist 347 348 349def get_taf_flight_rules(lines: list[TafLineData]) -> list[TafLineData]: 350 """Get flight rules by looking for missing data in prior reports.""" 351 for i, line in enumerate(lines): 352 temp_vis, temp_cloud, is_clear = line.visibility, line.clouds, False 353 for report in reversed(lines[: i + 1]): 354 if not _is_tempo_or_prob(report): 355 if not temp_vis: 356 temp_vis = report.visibility 357 # SKC or CLR should force no clouds instead of looking back 358 if "SKC" in report.other or "CLR" in report.other or temp_vis and temp_vis.repr == "CAVOK": 359 is_clear = True 360 elif temp_cloud == []: 361 temp_cloud = report.clouds 362 if temp_vis and temp_cloud != []: 363 break 364 if is_clear: 365 temp_cloud = [] 366 line.flight_rules = FLIGHT_RULES[core.get_flight_rules(temp_vis, core.get_ceiling(temp_cloud))] 367 return lines 368 369 370def fix_report_header(report: str) -> str: 371 """Correct the header order for key elements.""" 372 split_report = report.split() 373 374 # Limit scope to only the first few elements. Remarks may include similar tokens 375 header_length = min(len(split_report), 6) 376 headers = split_report[:header_length] 377 378 fixed_headers = [] 379 for target in ("TAF", "AMD", "COR"): 380 with suppress(ValueError): 381 headers.remove(target) 382 fixed_headers.append(target) 383 384 return " ".join(fixed_headers + headers + split_report[header_length:]) 385 386 387def _is_possible_start_end_time_slash(item: str) -> bool: 388 """Return True if item is a possible period start or end with missing element.""" 389 return len(item) == 5 and ( 390 # 1200/ 391 (item[-1] == "/" and item[:4].isdigit()) 392 or 393 # /1200 394 (item[0] == "/" and item[1:].isdigit()) 395 ) 396 397 398def parse( 399 station: str, report: str, issued: date | None = None 400) -> tuple[TafData | None, Units | None, Sanitization | None]: 401 """Return TafData and Units dataclasses with parsed data and their associated units.""" 402 if not report: 403 return None, None, None 404 valid_station(station) 405 report = fix_report_header(report) 406 while len(report) > 3 and report[:4] in ("TAF ", "AMD ", "COR "): 407 report = report[4:] 408 start_time: Timestamp | None = None 409 end_time: Timestamp | None = None 410 sans = Sanitization() 411 sanitized = clean_taf_string(report, sans) 412 _, new_station, time = core.get_station_and_time(sanitized[:20].split()) 413 if new_station is not None: 414 station = new_station 415 sanitized = sanitized.replace(station, "") 416 if time: 417 sanitized = sanitized.replace(time, "").strip() 418 units = Units.north_american() if uses_na_format(station) else Units.international() 419 # Find and remove remarks 420 sanitized, remarks = get_taf_remarks(sanitized) 421 # Split and parse each line 422 lines = split_taf(sanitized) 423 parsed_lines = parse_lines(lines, units, sans, issued) 424 # Perform additional info extract and corrections 425 max_temp: str | None = None 426 min_temp: str | None = None 427 if parsed_lines: 428 ( 429 parsed_lines[-1].other, 430 max_temp, 431 min_temp, 432 ) = get_temp_min_and_max(parsed_lines[-1].other) 433 if not (max_temp or min_temp): 434 ( 435 parsed_lines[0].other, 436 max_temp, 437 min_temp, 438 ) = get_temp_min_and_max(parsed_lines[0].other) 439 # Set start and end times based on the first line 440 start_time, end_time = parsed_lines[0].start_time, parsed_lines[0].end_time 441 parsed_lines[0].end_time = None 442 parsed_lines = find_missing_taf_times(parsed_lines, start_time, end_time) 443 parsed_lines = get_taf_flight_rules(parsed_lines) 444 # Extract Oceania-specific data 445 alts: list[str] | None = None 446 temps: list[str] | None = None 447 if station[0] == "A": 448 ( 449 parsed_lines[-1].other, 450 alts, 451 temps, 452 ) = get_oceania_temp_and_alt(parsed_lines[-1].other) 453 # Convert wx codes 454 for line in parsed_lines: 455 line.other, line.wx_codes = get_wx_codes(line.other) 456 sanitized = " ".join(i for i in (station, time, sanitized) if i) 457 struct = TafData( 458 raw=report, 459 sanitized=sanitized, 460 station=station, 461 time=core.make_timestamp(time, target_date=issued), 462 remarks=remarks, 463 remarks_info=parse_remarks(remarks), 464 forecast=parsed_lines, 465 start_time=start_time, 466 end_time=end_time, 467 max_temp=max_temp, 468 min_temp=min_temp, 469 alts=alts, 470 temps=temps, 471 ) 472 return struct, units, sans 473 474 475def parse_lines(lines: list[str], units: Units, sans: Sanitization, issued: date | None = None) -> list[TafLineData]: 476 """Return a list of parsed line dictionaries.""" 477 parsed_lines: list[TafLineData] = [] 478 prob = "" 479 while lines: 480 raw_line = lines[0].strip() 481 line = sanitize_line(raw_line, sans) 482 # Remove prob from the beginning of a line 483 if line.startswith("PROB"): 484 # Add standalone prob to next line 485 if len(line) == 6: 486 prob = line 487 line = "" 488 # Add to current line 489 elif len(line) > 6: 490 prob = line[:6] 491 line = line[6:].strip() 492 if line: 493 parsed_line = parse_line(line, units, sans, issued) 494 parsed_line.probability = None if " " in prob else core.make_number(prob[4:]) 495 parsed_line.raw = raw_line 496 if prob: 497 parsed_line.sanitized = f"{prob} {parsed_line.sanitized}" 498 prob = "" 499 parsed_lines.append(parsed_line) 500 lines.pop(0) 501 return parsed_lines 502 503 504def parse_line(line: str, units: Units, sans: Sanitization, issued: date | None = None) -> TafLineData: 505 """Parser for the International TAF forcast variant.""" 506 data: list[str] = core.dedupe(line.split()) 507 # Grab original time piece under certain conditions to preserve a useful slash 508 old_time = data[1] if len(data) > 1 and _is_possible_start_end_time_slash(data[1]) else None 509 data = clean_taf_list(data, sans) 510 if old_time and len(data) > 1 and data[1] == old_time.strip("/"): 511 data[1] = old_time 512 sanitized = " ".join(data) 513 data, report_type, start_time, end_time, transition = get_type_and_times(data) 514 data, wind_shear = get_wind_shear(data) 515 ( 516 data, 517 wind_direction, 518 wind_speed, 519 wind_gust, 520 wind_variable_direction, 521 ) = core.get_wind(data, units) 522 if "CAVOK" in data: 523 visibility = core.make_number("CAVOK") 524 clouds: list[Cloud] = [] 525 data.pop(data.index("CAVOK")) 526 else: 527 data, visibility = core.get_visibility(data, units) 528 data, clouds = core.get_clouds(data) 529 other, altimeter, icing, turbulence = get_alt_ice_turb(data) 530 return TafLineData( 531 altimeter=altimeter, 532 clouds=clouds, 533 flight_rules="", 534 other=other, 535 visibility=visibility, 536 wind_direction=wind_direction, 537 wind_gust=wind_gust, 538 wind_speed=wind_speed, 539 wx_codes=[], 540 end_time=core.make_timestamp(end_time, target_date=issued), 541 icing=icing, 542 probability=None, 543 raw=line, 544 sanitized=sanitized, 545 start_time=core.make_timestamp(start_time, target_date=issued), 546 transition_start=core.make_timestamp(transition, target_date=issued), 547 turbulence=turbulence, 548 type=report_type, 549 wind_shear=wind_shear, 550 wind_variable_direction=wind_variable_direction, 551 )
39class Taf(Report): 40 """ 41 The Taf class offers an object-oriented approach to managing TAF data for a 42 single station. 43 44 ```python 45 >>> from avwx import Taf 46 >>> kjfk = Taf("KJFK") 47 >>> kjfk.station.name 48 'John F Kennedy International Airport' 49 >>> kjfk.update() 50 True 51 >>> kjfk.last_updated 52 datetime.datetime(2018, 3, 4, 23, 43, 26, 209644, tzinfo=datetime.timezone.utc) 53 >>> kjfk.raw 54 'KJFK 042030Z 0421/0524 33016G27KT P6SM BKN045 FM051600 36016G22KT P6SM BKN040 FM052100 35013KT P6SM SCT035' 55 >>> len(kjfk.data.forecast) 56 3 57 >>> kjfk.data.forecast[0].flight_rules 58 'VFR' 59 >>> kjfk.translations.forecast[0].wind 60 'NNW-330 at 16kt gusting to 27kt' 61 >>> kjfk.speech 62 'Starting on March 4th - From 21 to 16 zulu, Winds three three zero at 16kt gusting to 27kt. Visibility greater than six miles. Broken layer at 4500ft. From 16 to 21 zulu, Winds three six zero at 16kt gusting to 22kt. Visibility greater than six miles. Broken layer at 4000ft. From 21 to midnight zulu, Winds three five zero at 13kt. Visibility greater than six miles. Scattered clouds at 3500ft' 63 ``` 64 65 The `parse` and `from_report` methods can parse a report string if you want 66 to override the normal fetching process. 67 68 ```python 69 >>> from avwx import Taf 70 >>> report = "TAF ZYHB 082300Z 0823/0911 VRB03KT 9999 SCT018 BKN120 TX14/0907Z TN04/0921Z FM090100 09015KT 9999 -SHRA WS020/13045KT SCT018 BKN120 BECMG 0904/0906 34008KT PROB30 TEMPO 0906/0911 7000 -RA SCT020 650104 530804 RMK FCST BASED ON AUTO OBS. NXT FCST BY 090600Z" 71 >>> zyhb = Taf.from_report(report) 72 True 73 >>> zyhb.station.city 74 'Hulan' 75 >>> zyhb.data.remarks 76 'RMK FCST BASED ON AUTO OBS. NXT FCST BY 090600Z' 77 >>> zyhb.summary[-1] 78 'Vis 7km, Light Rain, Scattered clouds at 2000ft, Frequent moderate turbulence in clear air from 8000ft to 12000ft, Moderate icing in clouds from 1000ft to 5000ft' 79 ``` 80 """ 81 82 data: TafData | None = None 83 translations: TafTrans | None = None # type: ignore 84 85 async def _post_update(self) -> None: 86 if self.code is None or self.raw is None: 87 return 88 self.data, self.units, self.sanitization = parse(self.code, self.raw, self.issued) 89 if self.data is None or self.units is None: 90 return 91 self.translations = translate_taf(self.data, self.units) 92 93 def _post_parse(self) -> None: 94 if self.code is None or self.raw is None: 95 return 96 self.data, self.units, self.sanitization = parse(self.code, self.raw, self.issued) 97 if self.data is None or self.units is None: 98 return 99 self.translations = translate_taf(self.data, self.units) 100 101 @property 102 def summary(self) -> list[str]: 103 """Condensed summary for each forecast created from translations.""" 104 if not self.translations: 105 self.update() 106 if self.translations is None or self.translations.forecast is None: 107 return [] 108 return [summary.taf(trans) for trans in self.translations.forecast] 109 110 @property 111 def speech(self) -> str | None: 112 """Report summary designed to be read by a text-to-speech program.""" 113 if not self.data: 114 self.update() 115 if self.data is None or self.units is None: 116 return None 117 return speech.taf(self.data, self.units)
The Taf class offers an object-oriented approach to managing TAF data for a single station.
>>> from avwx import Taf
>>> kjfk = Taf("KJFK")
>>> kjfk.station.name
'John F Kennedy International Airport'
>>> kjfk.update()
True
>>> kjfk.last_updated
datetime.datetime(2018, 3, 4, 23, 43, 26, 209644, tzinfo=datetime.timezone.utc)
>>> kjfk.raw
'KJFK 042030Z 0421/0524 33016G27KT P6SM BKN045 FM051600 36016G22KT P6SM BKN040 FM052100 35013KT P6SM SCT035'
>>> len(kjfk.data.forecast)
3
>>> kjfk.data.forecast[0].flight_rules
'VFR'
>>> kjfk.translations.forecast[0].wind
'NNW-330 at 16kt gusting to 27kt'
>>> kjfk.speech
'Starting on March 4th - From 21 to 16 zulu, Winds three three zero at 16kt gusting to 27kt. Visibility greater than six miles. Broken layer at 4500ft. From 16 to 21 zulu, Winds three six zero at 16kt gusting to 22kt. Visibility greater than six miles. Broken layer at 4000ft. From 21 to midnight zulu, Winds three five zero at 13kt. Visibility greater than six miles. Scattered clouds at 3500ft'
The parse
and from_report
methods can parse a report string if you want
to override the normal fetching process.
>>> from avwx import Taf
>>> report = "TAF ZYHB 082300Z 0823/0911 VRB03KT 9999 SCT018 BKN120 TX14/0907Z TN04/0921Z FM090100 09015KT 9999 -SHRA WS020/13045KT SCT018 BKN120 BECMG 0904/0906 34008KT PROB30 TEMPO 0906/0911 7000 -RA SCT020 650104 530804 RMK FCST BASED ON AUTO OBS. NXT FCST BY 090600Z"
>>> zyhb = Taf.from_report(report)
True
>>> zyhb.station.city
'Hulan'
>>> zyhb.data.remarks
'RMK FCST BASED ON AUTO OBS. NXT FCST BY 090600Z'
>>> zyhb.summary[-1]
'Vis 7km, Light Rain, Scattered clouds at 2000ft, Frequent moderate turbulence in clear air from 8000ft to 12000ft, Moderate icing in clouds from 1000ft to 5000ft'
101 @property 102 def summary(self) -> list[str]: 103 """Condensed summary for each forecast created from translations.""" 104 if not self.translations: 105 self.update() 106 if self.translations is None or self.translations.forecast is None: 107 return [] 108 return [summary.taf(trans) for trans in self.translations.forecast]
Condensed summary for each forecast created from translations.
110 @property 111 def speech(self) -> str | None: 112 """Report summary designed to be read by a text-to-speech program.""" 113 if not self.data: 114 self.update() 115 if self.data is None or self.units is None: 116 return None 117 return speech.taf(self.data, self.units)
Report summary designed to be read by a text-to-speech program.
139def sanitize_line(txt: str, sans: Sanitization) -> str: 140 """Fix common mistakes with 'new line' signifiers so that they can be recognized.""" 141 for key, fix in LINE_FIXES.items(): 142 if key in txt: 143 txt = txt.replace(key, fix) 144 sans.log(key, fix) 145 # Fix when space is missing following new line signifiers 146 for item in ["BECMG", "TEMPO"]: 147 if item in txt and f"{item} " not in txt: 148 index = txt.find(item) + len(item) 149 txt = f"{txt[:index]} {txt[index:]}" 150 sans.extra_spaces_needed = True 151 return txt
Fix common mistakes with 'new line' signifiers so that they can be recognized.
154def get_taf_remarks(txt: str) -> tuple[str, str]: 155 """Return report and remarks separated if found.""" 156 remarks_start = core.find_first_in_list(txt, TAF_RMK) 157 if remarks_start == -1: 158 return txt, "" 159 remarks = txt[remarks_start:] 160 txt = txt[:remarks_start].strip() 161 return txt, remarks
Return report and remarks separated if found.
164def get_alt_ice_turb( 165 data: list[str], 166) -> tuple[list[str], Number | None, list[str], list[str]]: 167 """Return the report list and removed: Altimeter string, Icing list, Turbulence list.""" 168 altimeter_number = None 169 icing, turbulence = [], [] 170 for i, item in reversed(list(enumerate(data))): 171 if len(item) > 6 and item.startswith("QNH") and item[3:7].isdigit(): 172 altimeter = data.pop(i)[3:7] 173 if altimeter[0] in ("2", "3"): 174 altimeter = f"{altimeter[:2]}.{altimeter[2:]}" 175 altimeter_number = core.make_number(altimeter, literal=True) 176 elif item.isdigit(): 177 if item[0] == "6": 178 icing.append(data.pop(i)) 179 elif item[0] == "5": 180 turbulence.append(data.pop(i)) 181 return data, altimeter_number, icing, turbulence
Return the report list and removed: Altimeter string, Icing list, Turbulence list.
184def is_normal_time(item: str) -> bool: 185 """Return if the item looks like a valid TAF (1200/1400) time range.""" 186 return len(item) == 9 and item[4] == "/" and item[:4].isdigit() and item[5:].isdigit()
Return if the item looks like a valid TAF (1200/1400) time range.
189def starts_new_line(item: str) -> bool: 190 """Returns True if the given element should start a new report line""" 191 if item in TAF_NEWLINE: 192 return True 193 return any(item.startswith(start) for start in TAF_NEWLINE_STARTSWITH)
Returns True if the given element should start a new report line
196def split_taf(txt: str) -> list[str]: 197 """Split a TAF report into each distinct time period.""" 198 lines = [] 199 split = txt.split() 200 last_index = 0 201 e_splits = enumerate(split) 202 next(e_splits) 203 for i, item in e_splits: 204 if (starts_new_line(item) and not split[i - 1].startswith("PROB")) or ( 205 is_normal_time(item) and not starts_new_line(split[i - 1]) 206 ): 207 lines.append(" ".join(split[last_index:i])) 208 last_index = i 209 lines.append(" ".join(split[last_index:])) 210 return lines
Split a TAF report into each distinct time period.
214def get_type_and_times( 215 data: list[str], 216) -> tuple[list[str], str, str | None, str | None, str | None]: 217 """Extract the report type string, start time string, and end time string.""" 218 report_type, start_time, end_time, transition = "FROM", None, None, None 219 # TEMPO, BECMG, INTER 220 if data and data[0] in TAF_NEWLINE or len(data[0]) == 6 and data[0].startswith("PROB"): 221 report_type = data.pop(0) 222 if data: 223 item, length = data[0], len(data[0]) 224 # 1200/1306 225 if is_normal_time(item): 226 start_time, end_time = data.pop(0).split("/") 227 228 # 1200 1306 229 elif len(data) == 8 and length == 4 and len(data[1]) == 4 and item.isdigit() and data[1].isdigit(): 230 start_time = data.pop(0) 231 end_time = data.pop(0) 232 233 # 120000 234 elif length == 6 and item.isdigit() and item[-2:] == "00": 235 start_time = data.pop(0)[:4] 236 # FM120000 237 elif length > 7 and item.startswith("FM"): 238 report_type = "FROM" 239 if "/" in item and item[2:].split("/")[0].isdigit() and item[2:].split("/")[1].isdigit(): 240 start_time, end_time = data.pop(0)[2:].split("/") 241 elif item[2:8].isdigit(): 242 start_time = data.pop(0)[2:6] 243 # TL120600 244 if data and length > 7 and data[0].startswith("TL") and data[0][2:8].isdigit(): 245 end_time = data.pop(0)[2:6] 246 elif report_type == "BECMG" and length == 5: 247 # 1200/ 248 if item[-1] == "/" and item[:4].isdigit(): 249 start_time = data.pop(0)[:4] 250 # /1200 251 elif item[0] == "/" and item[1:].isdigit(): 252 end_time = data.pop(0)[1:] 253 if report_type == "BECMG": 254 transition, start_time, end_time = start_time, end_time, None 255 return data, report_type, start_time, end_time, transition
Extract the report type string, start time string, and end time string.
274def find_missing_taf_times( 275 lines: list[TafLineData], start: Timestamp | None, end: Timestamp | None 276) -> list[TafLineData]: 277 """Fix any missing time issues except for error/empty lines.""" 278 if not lines: 279 return lines 280 # Assign start time 281 lines[0].start_time = start 282 # Fix other times 283 last_fm_line = 0 284 for i, line in enumerate(lines): 285 if _is_tempo_or_prob(line): 286 continue 287 last_fm_line = i 288 # Search remaining lines to fill empty end or previous for empty start 289 for target, other, direc in (("start", "end", -1), ("end", "start", 1)): 290 target += "_time" # noqa: PLW2901 291 if not getattr(line, target): 292 setattr(line, target, _get_next_time(lines[i::direc][1:], f"{other}_time")) 293 # Special case for final forcast 294 if last_fm_line: 295 lines[last_fm_line].end_time = end 296 # Reset original end time if still empty 297 if lines and not lines[0].end_time: 298 lines[0].end_time = end 299 return lines
Fix any missing time issues except for error/empty lines.
302def get_wind_shear(data: list[str]) -> tuple[list[str], str | None]: 303 """Return the report list and the remove wind shear.""" 304 shear = None 305 for i, item in reversed(list(enumerate(data))): 306 if len(item) > 6 and item.startswith("WS") and item[5] == "/": 307 shear = data.pop(i).replace("KT", "") 308 return data, shear
Return the report list and the remove wind shear.
311def get_temp_min_and_max( 312 data: list[str], 313) -> tuple[list[str], str | None, str | None]: 314 """Pull out Max temp at time and Min temp at time items from wx list.""" 315 temp_max, temp_min = "", "" 316 for i, item in reversed(list(enumerate(data))): 317 if len(item) > 6 and item[0] == "T" and "/" in item: 318 # TX12/1316Z 319 if item[1] == "X": 320 temp_max = data.pop(i) 321 # TNM03/1404Z 322 elif item[1] == "N": 323 temp_min = data.pop(i) 324 # TM03/1404Z T12/1316Z -> Will fix TN/TX 325 elif item[1] == "M" or item[1].isdigit(): 326 if temp_min: 327 if int(temp_min[2 : temp_min.find("/")].replace("M", "-")) > int( 328 item[1 : item.find("/")].replace("M", "-") 329 ): 330 temp_max, temp_min = f"TX{temp_min[2:]}", f"TN{item[1:]}" 331 else: 332 temp_max = f"TX{item[1:]}" 333 else: 334 temp_min = f"TN{item[1:]}" 335 data.pop(i) 336 return data, temp_max or None, temp_min or None
Pull out Max temp at time and Min temp at time items from wx list.
339def get_oceania_temp_and_alt(data: list[str]) -> tuple[list[str], list[str], list[str]]: 340 """Get Temperature and Altimeter lists for Oceania TAFs.""" 341 tlist: list[str] = [] 342 qlist: list[str] = [] 343 if "T" in data: 344 data, tlist = core.get_digit_list(data, data.index("T")) 345 if "Q" in data: 346 data, qlist = core.get_digit_list(data, data.index("Q")) 347 return data, tlist, qlist
Get Temperature and Altimeter lists for Oceania TAFs.
350def get_taf_flight_rules(lines: list[TafLineData]) -> list[TafLineData]: 351 """Get flight rules by looking for missing data in prior reports.""" 352 for i, line in enumerate(lines): 353 temp_vis, temp_cloud, is_clear = line.visibility, line.clouds, False 354 for report in reversed(lines[: i + 1]): 355 if not _is_tempo_or_prob(report): 356 if not temp_vis: 357 temp_vis = report.visibility 358 # SKC or CLR should force no clouds instead of looking back 359 if "SKC" in report.other or "CLR" in report.other or temp_vis and temp_vis.repr == "CAVOK": 360 is_clear = True 361 elif temp_cloud == []: 362 temp_cloud = report.clouds 363 if temp_vis and temp_cloud != []: 364 break 365 if is_clear: 366 temp_cloud = [] 367 line.flight_rules = FLIGHT_RULES[core.get_flight_rules(temp_vis, core.get_ceiling(temp_cloud))] 368 return lines
Get flight rules by looking for missing data in prior reports.
371def fix_report_header(report: str) -> str: 372 """Correct the header order for key elements.""" 373 split_report = report.split() 374 375 # Limit scope to only the first few elements. Remarks may include similar tokens 376 header_length = min(len(split_report), 6) 377 headers = split_report[:header_length] 378 379 fixed_headers = [] 380 for target in ("TAF", "AMD", "COR"): 381 with suppress(ValueError): 382 headers.remove(target) 383 fixed_headers.append(target) 384 385 return " ".join(fixed_headers + headers + split_report[header_length:])
Correct the header order for key elements.
399def parse( 400 station: str, report: str, issued: date | None = None 401) -> tuple[TafData | None, Units | None, Sanitization | None]: 402 """Return TafData and Units dataclasses with parsed data and their associated units.""" 403 if not report: 404 return None, None, None 405 valid_station(station) 406 report = fix_report_header(report) 407 while len(report) > 3 and report[:4] in ("TAF ", "AMD ", "COR "): 408 report = report[4:] 409 start_time: Timestamp | None = None 410 end_time: Timestamp | None = None 411 sans = Sanitization() 412 sanitized = clean_taf_string(report, sans) 413 _, new_station, time = core.get_station_and_time(sanitized[:20].split()) 414 if new_station is not None: 415 station = new_station 416 sanitized = sanitized.replace(station, "") 417 if time: 418 sanitized = sanitized.replace(time, "").strip() 419 units = Units.north_american() if uses_na_format(station) else Units.international() 420 # Find and remove remarks 421 sanitized, remarks = get_taf_remarks(sanitized) 422 # Split and parse each line 423 lines = split_taf(sanitized) 424 parsed_lines = parse_lines(lines, units, sans, issued) 425 # Perform additional info extract and corrections 426 max_temp: str | None = None 427 min_temp: str | None = None 428 if parsed_lines: 429 ( 430 parsed_lines[-1].other, 431 max_temp, 432 min_temp, 433 ) = get_temp_min_and_max(parsed_lines[-1].other) 434 if not (max_temp or min_temp): 435 ( 436 parsed_lines[0].other, 437 max_temp, 438 min_temp, 439 ) = get_temp_min_and_max(parsed_lines[0].other) 440 # Set start and end times based on the first line 441 start_time, end_time = parsed_lines[0].start_time, parsed_lines[0].end_time 442 parsed_lines[0].end_time = None 443 parsed_lines = find_missing_taf_times(parsed_lines, start_time, end_time) 444 parsed_lines = get_taf_flight_rules(parsed_lines) 445 # Extract Oceania-specific data 446 alts: list[str] | None = None 447 temps: list[str] | None = None 448 if station[0] == "A": 449 ( 450 parsed_lines[-1].other, 451 alts, 452 temps, 453 ) = get_oceania_temp_and_alt(parsed_lines[-1].other) 454 # Convert wx codes 455 for line in parsed_lines: 456 line.other, line.wx_codes = get_wx_codes(line.other) 457 sanitized = " ".join(i for i in (station, time, sanitized) if i) 458 struct = TafData( 459 raw=report, 460 sanitized=sanitized, 461 station=station, 462 time=core.make_timestamp(time, target_date=issued), 463 remarks=remarks, 464 remarks_info=parse_remarks(remarks), 465 forecast=parsed_lines, 466 start_time=start_time, 467 end_time=end_time, 468 max_temp=max_temp, 469 min_temp=min_temp, 470 alts=alts, 471 temps=temps, 472 ) 473 return struct, units, sans
Return TafData and Units dataclasses with parsed data and their associated units.
476def parse_lines(lines: list[str], units: Units, sans: Sanitization, issued: date | None = None) -> list[TafLineData]: 477 """Return a list of parsed line dictionaries.""" 478 parsed_lines: list[TafLineData] = [] 479 prob = "" 480 while lines: 481 raw_line = lines[0].strip() 482 line = sanitize_line(raw_line, sans) 483 # Remove prob from the beginning of a line 484 if line.startswith("PROB"): 485 # Add standalone prob to next line 486 if len(line) == 6: 487 prob = line 488 line = "" 489 # Add to current line 490 elif len(line) > 6: 491 prob = line[:6] 492 line = line[6:].strip() 493 if line: 494 parsed_line = parse_line(line, units, sans, issued) 495 parsed_line.probability = None if " " in prob else core.make_number(prob[4:]) 496 parsed_line.raw = raw_line 497 if prob: 498 parsed_line.sanitized = f"{prob} {parsed_line.sanitized}" 499 prob = "" 500 parsed_lines.append(parsed_line) 501 lines.pop(0) 502 return parsed_lines
Return a list of parsed line dictionaries.
505def parse_line(line: str, units: Units, sans: Sanitization, issued: date | None = None) -> TafLineData: 506 """Parser for the International TAF forcast variant.""" 507 data: list[str] = core.dedupe(line.split()) 508 # Grab original time piece under certain conditions to preserve a useful slash 509 old_time = data[1] if len(data) > 1 and _is_possible_start_end_time_slash(data[1]) else None 510 data = clean_taf_list(data, sans) 511 if old_time and len(data) > 1 and data[1] == old_time.strip("/"): 512 data[1] = old_time 513 sanitized = " ".join(data) 514 data, report_type, start_time, end_time, transition = get_type_and_times(data) 515 data, wind_shear = get_wind_shear(data) 516 ( 517 data, 518 wind_direction, 519 wind_speed, 520 wind_gust, 521 wind_variable_direction, 522 ) = core.get_wind(data, units) 523 if "CAVOK" in data: 524 visibility = core.make_number("CAVOK") 525 clouds: list[Cloud] = [] 526 data.pop(data.index("CAVOK")) 527 else: 528 data, visibility = core.get_visibility(data, units) 529 data, clouds = core.get_clouds(data) 530 other, altimeter, icing, turbulence = get_alt_ice_turb(data) 531 return TafLineData( 532 altimeter=altimeter, 533 clouds=clouds, 534 flight_rules="", 535 other=other, 536 visibility=visibility, 537 wind_direction=wind_direction, 538 wind_gust=wind_gust, 539 wind_speed=wind_speed, 540 wx_codes=[], 541 end_time=core.make_timestamp(end_time, target_date=issued), 542 icing=icing, 543 probability=None, 544 raw=line, 545 sanitized=sanitized, 546 start_time=core.make_timestamp(start_time, target_date=issued), 547 transition_start=core.make_timestamp(transition, target_date=issued), 548 turbulence=turbulence, 549 type=report_type, 550 wind_shear=wind_shear, 551 wind_variable_direction=wind_variable_direction, 552 )
Parser for the International TAF forcast variant.