avwx.current.taf
A TAF (Terminal Aerodrome Forecast) is a 24-hour weather forecast for the area 5 statute miles from the reporting station. They are update once every three or six hours or when significant changes warrant an update, and the observations are valid for six hours or until the next report is issued
1""" 2A TAF (Terminal Aerodrome Forecast) is a 24-hour weather forecast for the area 35 statute miles from the reporting station. They are update once every three or 4six hours or when significant changes warrant an update, and the observations 5are valid for six hours or until the next report is issued 6""" 7 8# stdlib 9from contextlib import suppress 10from datetime import date 11from typing import List, Tuple, Optional 12 13# module 14from avwx.current.base import Report, get_wx_codes 15from avwx.parsing import core, speech, summary 16from avwx.parsing.remarks import parse as parse_remarks 17from avwx.parsing.sanitization.taf import clean_taf_list, clean_taf_string 18from avwx.parsing.translate.taf import translate_taf 19from avwx.static.core import FLIGHT_RULES 20from avwx.static.taf import TAF_RMK, TAF_NEWLINE, TAF_NEWLINE_STARTSWITH 21from avwx.station import uses_na_format, valid_station 22from avwx.structs import ( 23 Cloud, 24 Number, 25 Sanitization, 26 TafData, 27 TafLineData, 28 TafTrans, 29 Timestamp, 30 Units, 31) 32 33 34class Taf(Report): 35 """ 36 The Taf class offers an object-oriented approach to managing TAF data for a 37 single station. 38 39 ```python 40 >>> from avwx import Taf 41 >>> kjfk = Taf("KJFK") 42 >>> kjfk.station.name 43 'John F Kennedy International Airport' 44 >>> kjfk.update() 45 True 46 >>> kjfk.last_updated 47 datetime.datetime(2018, 3, 4, 23, 43, 26, 209644, tzinfo=datetime.timezone.utc) 48 >>> kjfk.raw 49 'KJFK 042030Z 0421/0524 33016G27KT P6SM BKN045 FM051600 36016G22KT P6SM BKN040 FM052100 35013KT P6SM SCT035' 50 >>> len(kjfk.data.forecast) 51 3 52 >>> kjfk.data.forecast[0].flight_rules 53 'VFR' 54 >>> kjfk.translations.forecast[0].wind 55 'NNW-330 at 16kt gusting to 27kt' 56 >>> kjfk.speech 57 'Starting on March 4th - From 21 to 16 zulu, Winds three three zero at 16kt gusting to 27kt. Visibility greater than six miles. Broken layer at 4500ft. From 16 to 21 zulu, Winds three six zero at 16kt gusting to 22kt. Visibility greater than six miles. Broken layer at 4000ft. From 21 to midnight zulu, Winds three five zero at 13kt. Visibility greater than six miles. Scattered clouds at 3500ft' 58 ``` 59 60 The `parse` and `from_report` methods can parse a report string if you want 61 to override the normal fetching process. 62 63 ```python 64 >>> from avwx import Taf 65 >>> report = "TAF ZYHB 082300Z 0823/0911 VRB03KT 9999 SCT018 BKN120 TX14/0907Z TN04/0921Z FM090100 09015KT 9999 -SHRA WS020/13045KT SCT018 BKN120 BECMG 0904/0906 34008KT PROB30 TEMPO 0906/0911 7000 -RA SCT020 650104 530804 RMK FCST BASED ON AUTO OBS. NXT FCST BY 090600Z" 66 >>> zyhb = Taf.from_report(report) 67 True 68 >>> zyhb.station.city 69 'Hulan' 70 >>> zyhb.data.remarks 71 'RMK FCST BASED ON AUTO OBS. NXT FCST BY 090600Z' 72 >>> zyhb.summary[-1] 73 'Vis 7km, Light Rain, Scattered clouds at 2000ft, Frequent moderate turbulence in clear air from 8000ft to 12000ft, Moderate icing in clouds from 1000ft to 5000ft' 74 ``` 75 """ 76 77 data: Optional[TafData] = None 78 translations: Optional[TafTrans] = None # type: ignore 79 80 async def _post_update(self) -> None: 81 if self.code is None or self.raw is None: 82 return 83 self.data, self.units, self.sanitization = parse( 84 self.code, self.raw, self.issued 85 ) 86 if self.data is None or self.units is None: 87 return 88 self.translations = translate_taf(self.data, self.units) 89 90 def _post_parse(self) -> None: 91 if self.code is None or self.raw is None: 92 return 93 self.data, self.units, self.sanitization = parse( 94 self.code, self.raw, self.issued 95 ) 96 if self.data is None or self.units is None: 97 return 98 self.translations = translate_taf(self.data, self.units) 99 100 @property 101 def summary(self) -> List[str]: 102 """Condensed summary for each forecast created from translations""" 103 if not self.translations: 104 self.update() 105 if self.translations is None or self.translations.forecast is None: 106 return [] 107 return [summary.taf(trans) for trans in self.translations.forecast] 108 109 @property 110 def speech(self) -> Optional[str]: 111 """Report summary designed to be read by a text-to-speech program""" 112 if not self.data: 113 self.update() 114 if self.data is None or self.units is None: 115 return None 116 return speech.taf(self.data, self.units) 117 118 119LINE_FIXES = { 120 "TEMP0": "TEMPO", 121 "TEMP O": "TEMPO", 122 "TMPO": "TEMPO", 123 "TE MPO": "TEMPO", 124 "TEMP ": "TEMPO ", 125 "T EMPO": "TEMPO", 126 " EMPO": " TEMPO", 127 "TEMO": "TEMPO", 128 "BECM G": "BECMG", 129 "BEMCG": "BECMG", 130 "BE CMG": "BECMG", 131 "B ECMG": "BECMG", 132 " BEC ": " BECMG ", 133 "BCEMG": "BECMG", 134 "BEMG": "BECMG", 135} 136 137 138def sanitize_line(txt: str, sans: Sanitization) -> str: 139 """Fixes common mistakes with 'new line' signifiers so that they can be recognized""" 140 for key, fix in LINE_FIXES.items(): 141 if key in txt: 142 txt = txt.replace(key, fix) 143 sans.log(key, fix) 144 # Fix when space is missing following new line signifiers 145 for item in ["BECMG", "TEMPO"]: 146 if item in txt and f"{item} " not in txt: 147 index = txt.find(item) + len(item) 148 txt = f"{txt[:index]} {txt[index:]}" 149 sans.extra_spaces_needed = True 150 return txt 151 152 153def get_taf_remarks(txt: str) -> Tuple[str, str]: 154 """Returns report and remarks separated if found""" 155 remarks_start = core.find_first_in_list(txt, TAF_RMK) 156 if remarks_start == -1: 157 return txt, "" 158 remarks = txt[remarks_start:] 159 txt = txt[:remarks_start].strip() 160 return txt, remarks 161 162 163def get_alt_ice_turb( 164 data: List[str], 165) -> Tuple[List[str], Optional[Number], List[str], List[str]]: 166 """Returns the report list and removed: Altimeter string, Icing list, Turbulence list""" 167 altimeter_number = None 168 icing, turbulence = [], [] 169 for i, item in reversed(list(enumerate(data))): 170 if len(item) > 6 and item.startswith("QNH") and item[3:7].isdigit(): 171 altimeter = data.pop(i)[3:7] 172 if altimeter[0] in ("2", "3"): 173 altimeter = f"{altimeter[:2]}.{altimeter[2:]}" 174 altimeter_number = core.make_number(altimeter, literal=True) 175 elif item.isdigit(): 176 if item[0] == "6": 177 icing.append(data.pop(i)) 178 elif item[0] == "5": 179 turbulence.append(data.pop(i)) 180 return data, altimeter_number, icing, turbulence 181 182 183def starts_new_line(item: str) -> bool: 184 """Returns True if the given element should start a new report line""" 185 if item in TAF_NEWLINE: 186 return True 187 return any(item.startswith(start) for start in TAF_NEWLINE_STARTSWITH) 188 189 190def split_taf(txt: str) -> List[str]: 191 """Splits a TAF report into each distinct time period""" 192 lines = [] 193 split = txt.split() 194 last_index = 0 195 for i, item in enumerate(split): 196 if starts_new_line(item) and i != 0 and not split[i - 1].startswith("PROB"): 197 lines.append(" ".join(split[last_index:i])) 198 last_index = i 199 lines.append(" ".join(split[last_index:])) 200 return lines 201 202 203# TAF line report type and start/end times 204def get_type_and_times( 205 data: List[str], 206) -> Tuple[List[str], str, Optional[str], Optional[str], Optional[str]]: 207 """Returns the report list and removed: 208 209 Report type string, start time string, end time string 210 """ 211 report_type, start_time, end_time, transition = "FROM", None, None, None 212 if data: 213 # TEMPO, BECMG, INTER 214 if data[0] in TAF_NEWLINE: 215 report_type = data.pop(0) 216 # PROB[30,40] 217 elif len(data[0]) == 6 and data[0].startswith("PROB"): 218 report_type = data.pop(0) 219 if data: 220 # 1200/1306 221 if ( 222 len(data[0]) == 9 223 and data[0][4] == "/" 224 and data[0][:4].isdigit() 225 and data[0][5:].isdigit() 226 ): 227 start_time, end_time = data.pop(0).split("/") 228 229 # 1200 1306 230 elif ( 231 len(data) == 8 232 and len(data[0]) == 4 233 and len(data[1]) == 4 234 and data[0].isdigit() 235 and data[1].isdigit() 236 ): 237 start_time = data.pop(0) 238 end_time = data.pop(0) 239 240 # 120000 241 elif len(data[0]) == 6 and data[0].isdigit() and data[0][-2:] == "00": 242 start_time = data.pop(0)[:4] 243 # FM120000 244 elif len(data[0]) > 7 and data[0].startswith("FM"): 245 report_type = "FROM" 246 if ( 247 "/" in data[0] 248 and data[0][2:].split("/")[0].isdigit() 249 and data[0][2:].split("/")[1].isdigit() 250 ): 251 start_time, end_time = data.pop(0)[2:].split("/") 252 elif data[0][2:8].isdigit(): 253 start_time = data.pop(0)[2:6] 254 # TL120600 255 if ( 256 data 257 and len(data[0]) > 7 258 and data[0].startswith("TL") 259 and data[0][2:8].isdigit() 260 ): 261 end_time = data.pop(0)[2:6] 262 if report_type == "BECMG": 263 transition, start_time, end_time = start_time, end_time, None 264 return data, report_type, start_time, end_time, transition 265 266 267def _is_tempo_or_prob(line: TafLineData) -> bool: 268 """Returns True if report type is TEMPO or non-null probability""" 269 return line.type == "TEMPO" or line.probability is not None 270 271 272def _get_next_time(lines: List[TafLineData], target: str) -> Optional[Timestamp]: 273 """Returns the next normal time target value or empty""" 274 for line in lines: 275 if _is_tempo_or_prob(line): 276 continue 277 if target == "start_time": 278 time = line.transition_start or getattr(line, target) 279 else: 280 time = getattr(line, target) 281 if time: 282 return time 283 return None 284 285 286def find_missing_taf_times( 287 lines: List[TafLineData], start: Optional[Timestamp], end: Optional[Timestamp] 288) -> List[TafLineData]: 289 """Fix any missing time issues (except for error/empty lines)""" 290 if not lines: 291 return lines 292 # Assign start time 293 lines[0].start_time = start 294 # Fix other times 295 last_fm_line = 0 296 for i, line in enumerate(lines): 297 if _is_tempo_or_prob(line): 298 continue 299 last_fm_line = i 300 # Search remaining lines to fill empty end or previous for empty start 301 for target, other, direc in (("start", "end", -1), ("end", "start", 1)): 302 target += "_time" 303 if not getattr(line, target): 304 setattr( 305 line, target, _get_next_time(lines[i::direc][1:], f"{other}_time") 306 ) 307 # Special case for final forcast 308 if last_fm_line: 309 lines[last_fm_line].end_time = end 310 # Reset original end time if still empty 311 if lines and not lines[0].end_time: 312 lines[0].end_time = end 313 return lines 314 315 316def get_wind_shear(data: List[str]) -> Tuple[List[str], Optional[str]]: 317 """Returns the report list and the remove wind shear""" 318 shear = None 319 for i, item in reversed(list(enumerate(data))): 320 if len(item) > 6 and item.startswith("WS") and item[5] == "/": 321 shear = data.pop(i).replace("KT", "") 322 return data, shear 323 324 325def get_temp_min_and_max( 326 data: List[str], 327) -> Tuple[List[str], Optional[str], Optional[str]]: 328 """Pull out Max temp at time and Min temp at time items from wx list""" 329 temp_max, temp_min = "", "" 330 for i, item in reversed(list(enumerate(data))): 331 if len(item) > 6 and item[0] == "T" and "/" in item: 332 # TX12/1316Z 333 if item[1] == "X": 334 temp_max = data.pop(i) 335 # TNM03/1404Z 336 elif item[1] == "N": 337 temp_min = data.pop(i) 338 # TM03/1404Z T12/1316Z -> Will fix TN/TX 339 elif item[1] == "M" or item[1].isdigit(): 340 if temp_min: 341 if int(temp_min[2 : temp_min.find("/")].replace("M", "-")) > int( 342 item[1 : item.find("/")].replace("M", "-") 343 ): 344 temp_max, temp_min = f"TX{temp_min[2:]}", f"TN{item[1:]}" 345 else: 346 temp_max = f"TX{item[1:]}" 347 else: 348 temp_min = f"TN{item[1:]}" 349 data.pop(i) 350 return data, temp_max or None, temp_min or None 351 352 353def get_oceania_temp_and_alt(data: List[str]) -> Tuple[List[str], List[str], List[str]]: 354 """Get Temperature and Altimeter lists for Oceania TAFs""" 355 tlist: List[str] = [] 356 qlist: List[str] = [] 357 if "T" in data: 358 data, tlist = core.get_digit_list(data, data.index("T")) 359 if "Q" in data: 360 data, qlist = core.get_digit_list(data, data.index("Q")) 361 return data, tlist, qlist 362 363 364def get_taf_flight_rules(lines: List[TafLineData]) -> List[TafLineData]: 365 """Get flight rules by looking for missing data in prior reports""" 366 for i, line in enumerate(lines): 367 temp_vis, temp_cloud, is_clear = line.visibility, line.clouds, False 368 for report in reversed(lines[: i + 1]): 369 if not _is_tempo_or_prob(report): 370 if not temp_vis: 371 temp_vis = report.visibility 372 # SKC or CLR should force no clouds instead of looking back 373 if "SKC" in report.other or "CLR" in report.other: 374 is_clear = True 375 elif temp_vis and temp_vis.repr == "CAVOK": 376 is_clear = True 377 elif temp_cloud == []: 378 temp_cloud = report.clouds 379 if temp_vis and temp_cloud != []: 380 break 381 if is_clear: 382 temp_cloud = [] 383 line.flight_rules = FLIGHT_RULES[ 384 core.get_flight_rules(temp_vis, core.get_ceiling(temp_cloud)) 385 ] 386 return lines 387 388 389def fix_report_header(report: str) -> str: 390 """Corrects the header order for key elements""" 391 split_report = report.split() 392 393 # Limit scope to only the first few elements. Remarks may include similar tokens 394 header_length = min(len(split_report), 6) 395 headers = split_report[:header_length] 396 397 fixed_headers = [] 398 for target in ("TAF", "AMD", "COR"): 399 with suppress(ValueError): 400 headers.remove(target) 401 fixed_headers.append(target) 402 403 return " ".join(fixed_headers + headers + split_report[header_length:]) 404 405 406def parse( 407 station: str, report: str, issued: Optional[date] = None 408) -> Tuple[Optional[TafData], Optional[Units], Optional[Sanitization]]: 409 """Returns TafData and Units dataclasses with parsed data and their associated units""" 410 # pylint: disable=too-many-locals 411 if not report: 412 return None, None, None 413 valid_station(station) 414 report = fix_report_header(report) 415 while len(report) > 3 and report[:4] in ("TAF ", "AMD ", "COR "): 416 report = report[4:] 417 start_time: Optional[Timestamp] = None 418 end_time: Optional[Timestamp] = None 419 sans = Sanitization() 420 sanitized = clean_taf_string(report, sans) 421 _, new_station, time = core.get_station_and_time(sanitized[:20].split()) 422 if new_station is not None: 423 station = new_station 424 sanitized = sanitized.replace(station, "") 425 if time: 426 sanitized = sanitized.replace(time, "").strip() 427 units = Units.north_american() if uses_na_format(station) else Units.international() 428 # Find and remove remarks 429 sanitized, remarks = get_taf_remarks(sanitized) 430 # Split and parse each line 431 lines = split_taf(sanitized) 432 parsed_lines = parse_lines(lines, units, sans, issued) 433 # Perform additional info extract and corrections 434 max_temp: Optional[str] = None 435 min_temp: Optional[str] = None 436 if parsed_lines: 437 ( 438 parsed_lines[-1].other, 439 max_temp, 440 min_temp, 441 ) = get_temp_min_and_max(parsed_lines[-1].other) 442 if not (max_temp or min_temp): 443 ( 444 parsed_lines[0].other, 445 max_temp, 446 min_temp, 447 ) = get_temp_min_and_max(parsed_lines[0].other) 448 # Set start and end times based on the first line 449 start_time, end_time = parsed_lines[0].start_time, parsed_lines[0].end_time 450 parsed_lines[0].end_time = None 451 parsed_lines = find_missing_taf_times(parsed_lines, start_time, end_time) 452 parsed_lines = get_taf_flight_rules(parsed_lines) 453 # Extract Oceania-specific data 454 alts: Optional[List[str]] = None 455 temps: Optional[List[str]] = None 456 if station[0] == "A": 457 ( 458 parsed_lines[-1].other, 459 alts, 460 temps, 461 ) = get_oceania_temp_and_alt(parsed_lines[-1].other) 462 # Convert wx codes 463 for line in parsed_lines: 464 line.other, line.wx_codes = get_wx_codes(line.other) 465 sanitized = " ".join(i for i in (station, time, sanitized) if i) 466 struct = TafData( 467 raw=report, 468 sanitized=sanitized, 469 station=station, 470 time=core.make_timestamp(time, target_date=issued), 471 remarks=remarks, 472 remarks_info=parse_remarks(remarks), 473 forecast=parsed_lines, 474 start_time=start_time, 475 end_time=end_time, 476 max_temp=max_temp, 477 min_temp=min_temp, 478 alts=alts, 479 temps=temps, 480 ) 481 return struct, units, sans 482 483 484def parse_lines( 485 lines: List[str], units: Units, sans: Sanitization, issued: Optional[date] = None 486) -> List[TafLineData]: 487 """Returns a list of parsed line dictionaries""" 488 parsed_lines: List[TafLineData] = [] 489 prob = "" 490 while lines: 491 raw_line = lines[0].strip() 492 line = sanitize_line(raw_line, sans) 493 # Remove prob from the beginning of a line 494 if line.startswith("PROB"): 495 # Add standalone prob to next line 496 if len(line) == 6: 497 prob = line 498 line = "" 499 # Add to current line 500 elif len(line) > 6: 501 prob = line[:6] 502 line = line[6:].strip() 503 if line: 504 parsed_line = parse_line(line, units, sans, issued) 505 parsed_line.probability = ( 506 None if " " in prob else core.make_number(prob[4:]) 507 ) 508 parsed_line.raw = raw_line 509 if prob: 510 parsed_line.sanitized = f"{prob} {parsed_line.sanitized}" 511 prob = "" 512 parsed_lines.append(parsed_line) 513 lines.pop(0) 514 return parsed_lines 515 516 517def parse_line( 518 line: str, units: Units, sans: Sanitization, issued: Optional[date] = None 519) -> TafLineData: 520 """Parser for the International TAF forcast variant""" 521 # pylint: disable=too-many-locals 522 data = core.dedupe(line.split()) 523 data = clean_taf_list(data, sans) 524 sanitized = " ".join(data) 525 data, report_type, start_time, end_time, transition = get_type_and_times(data) 526 data, wind_shear = get_wind_shear(data) 527 ( 528 data, 529 wind_direction, 530 wind_speed, 531 wind_gust, 532 wind_variable_direction, 533 ) = core.get_wind(data, units) 534 if "CAVOK" in data: 535 visibility = core.make_number("CAVOK") 536 clouds: List[Cloud] = [] 537 data.pop(data.index("CAVOK")) 538 else: 539 data, visibility = core.get_visibility(data, units) 540 data, clouds = core.get_clouds(data) 541 other, altimeter, icing, turbulence = get_alt_ice_turb(data) 542 return TafLineData( 543 altimeter=altimeter, 544 clouds=clouds, 545 flight_rules="", 546 other=other, 547 visibility=visibility, 548 wind_direction=wind_direction, 549 wind_gust=wind_gust, 550 wind_speed=wind_speed, 551 wx_codes=[], 552 end_time=core.make_timestamp(end_time, target_date=issued), 553 icing=icing, 554 probability=None, 555 raw=line, 556 sanitized=sanitized, 557 start_time=core.make_timestamp(start_time, target_date=issued), 558 transition_start=core.make_timestamp(transition, target_date=issued), 559 turbulence=turbulence, 560 type=report_type, 561 wind_shear=wind_shear, 562 wind_variable_direction=wind_variable_direction, 563 )
35class Taf(Report): 36 """ 37 The Taf class offers an object-oriented approach to managing TAF data for a 38 single station. 39 40 ```python 41 >>> from avwx import Taf 42 >>> kjfk = Taf("KJFK") 43 >>> kjfk.station.name 44 'John F Kennedy International Airport' 45 >>> kjfk.update() 46 True 47 >>> kjfk.last_updated 48 datetime.datetime(2018, 3, 4, 23, 43, 26, 209644, tzinfo=datetime.timezone.utc) 49 >>> kjfk.raw 50 'KJFK 042030Z 0421/0524 33016G27KT P6SM BKN045 FM051600 36016G22KT P6SM BKN040 FM052100 35013KT P6SM SCT035' 51 >>> len(kjfk.data.forecast) 52 3 53 >>> kjfk.data.forecast[0].flight_rules 54 'VFR' 55 >>> kjfk.translations.forecast[0].wind 56 'NNW-330 at 16kt gusting to 27kt' 57 >>> kjfk.speech 58 'Starting on March 4th - From 21 to 16 zulu, Winds three three zero at 16kt gusting to 27kt. Visibility greater than six miles. Broken layer at 4500ft. From 16 to 21 zulu, Winds three six zero at 16kt gusting to 22kt. Visibility greater than six miles. Broken layer at 4000ft. From 21 to midnight zulu, Winds three five zero at 13kt. Visibility greater than six miles. Scattered clouds at 3500ft' 59 ``` 60 61 The `parse` and `from_report` methods can parse a report string if you want 62 to override the normal fetching process. 63 64 ```python 65 >>> from avwx import Taf 66 >>> report = "TAF ZYHB 082300Z 0823/0911 VRB03KT 9999 SCT018 BKN120 TX14/0907Z TN04/0921Z FM090100 09015KT 9999 -SHRA WS020/13045KT SCT018 BKN120 BECMG 0904/0906 34008KT PROB30 TEMPO 0906/0911 7000 -RA SCT020 650104 530804 RMK FCST BASED ON AUTO OBS. NXT FCST BY 090600Z" 67 >>> zyhb = Taf.from_report(report) 68 True 69 >>> zyhb.station.city 70 'Hulan' 71 >>> zyhb.data.remarks 72 'RMK FCST BASED ON AUTO OBS. NXT FCST BY 090600Z' 73 >>> zyhb.summary[-1] 74 'Vis 7km, Light Rain, Scattered clouds at 2000ft, Frequent moderate turbulence in clear air from 8000ft to 12000ft, Moderate icing in clouds from 1000ft to 5000ft' 75 ``` 76 """ 77 78 data: Optional[TafData] = None 79 translations: Optional[TafTrans] = None # type: ignore 80 81 async def _post_update(self) -> None: 82 if self.code is None or self.raw is None: 83 return 84 self.data, self.units, self.sanitization = parse( 85 self.code, self.raw, self.issued 86 ) 87 if self.data is None or self.units is None: 88 return 89 self.translations = translate_taf(self.data, self.units) 90 91 def _post_parse(self) -> None: 92 if self.code is None or self.raw is None: 93 return 94 self.data, self.units, self.sanitization = parse( 95 self.code, self.raw, self.issued 96 ) 97 if self.data is None or self.units is None: 98 return 99 self.translations = translate_taf(self.data, self.units) 100 101 @property 102 def summary(self) -> List[str]: 103 """Condensed summary for each forecast created from translations""" 104 if not self.translations: 105 self.update() 106 if self.translations is None or self.translations.forecast is None: 107 return [] 108 return [summary.taf(trans) for trans in self.translations.forecast] 109 110 @property 111 def speech(self) -> Optional[str]: 112 """Report summary designed to be read by a text-to-speech program""" 113 if not self.data: 114 self.update() 115 if self.data is None or self.units is None: 116 return None 117 return speech.taf(self.data, self.units)
The Taf class offers an object-oriented approach to managing TAF data for a single station.
>>> from avwx import Taf
>>> kjfk = Taf("KJFK")
>>> kjfk.station.name
'John F Kennedy International Airport'
>>> kjfk.update()
True
>>> kjfk.last_updated
datetime.datetime(2018, 3, 4, 23, 43, 26, 209644, tzinfo=datetime.timezone.utc)
>>> kjfk.raw
'KJFK 042030Z 0421/0524 33016G27KT P6SM BKN045 FM051600 36016G22KT P6SM BKN040 FM052100 35013KT P6SM SCT035'
>>> len(kjfk.data.forecast)
3
>>> kjfk.data.forecast[0].flight_rules
'VFR'
>>> kjfk.translations.forecast[0].wind
'NNW-330 at 16kt gusting to 27kt'
>>> kjfk.speech
'Starting on March 4th - From 21 to 16 zulu, Winds three three zero at 16kt gusting to 27kt. Visibility greater than six miles. Broken layer at 4500ft. From 16 to 21 zulu, Winds three six zero at 16kt gusting to 22kt. Visibility greater than six miles. Broken layer at 4000ft. From 21 to midnight zulu, Winds three five zero at 13kt. Visibility greater than six miles. Scattered clouds at 3500ft'
The parse
and from_report
methods can parse a report string if you want
to override the normal fetching process.
>>> from avwx import Taf
>>> report = "TAF ZYHB 082300Z 0823/0911 VRB03KT 9999 SCT018 BKN120 TX14/0907Z TN04/0921Z FM090100 09015KT 9999 -SHRA WS020/13045KT SCT018 BKN120 BECMG 0904/0906 34008KT PROB30 TEMPO 0906/0911 7000 -RA SCT020 650104 530804 RMK FCST BASED ON AUTO OBS. NXT FCST BY 090600Z"
>>> zyhb = Taf.from_report(report)
True
>>> zyhb.station.city
'Hulan'
>>> zyhb.data.remarks
'RMK FCST BASED ON AUTO OBS. NXT FCST BY 090600Z'
>>> zyhb.summary[-1]
'Vis 7km, Light Rain, Scattered clouds at 2000ft, Frequent moderate turbulence in clear air from 8000ft to 12000ft, Moderate icing in clouds from 1000ft to 5000ft'
139def sanitize_line(txt: str, sans: Sanitization) -> str: 140 """Fixes common mistakes with 'new line' signifiers so that they can be recognized""" 141 for key, fix in LINE_FIXES.items(): 142 if key in txt: 143 txt = txt.replace(key, fix) 144 sans.log(key, fix) 145 # Fix when space is missing following new line signifiers 146 for item in ["BECMG", "TEMPO"]: 147 if item in txt and f"{item} " not in txt: 148 index = txt.find(item) + len(item) 149 txt = f"{txt[:index]} {txt[index:]}" 150 sans.extra_spaces_needed = True 151 return txt
Fixes common mistakes with 'new line' signifiers so that they can be recognized
154def get_taf_remarks(txt: str) -> Tuple[str, str]: 155 """Returns report and remarks separated if found""" 156 remarks_start = core.find_first_in_list(txt, TAF_RMK) 157 if remarks_start == -1: 158 return txt, "" 159 remarks = txt[remarks_start:] 160 txt = txt[:remarks_start].strip() 161 return txt, remarks
Returns report and remarks separated if found
164def get_alt_ice_turb( 165 data: List[str], 166) -> Tuple[List[str], Optional[Number], List[str], List[str]]: 167 """Returns the report list and removed: Altimeter string, Icing list, Turbulence list""" 168 altimeter_number = None 169 icing, turbulence = [], [] 170 for i, item in reversed(list(enumerate(data))): 171 if len(item) > 6 and item.startswith("QNH") and item[3:7].isdigit(): 172 altimeter = data.pop(i)[3:7] 173 if altimeter[0] in ("2", "3"): 174 altimeter = f"{altimeter[:2]}.{altimeter[2:]}" 175 altimeter_number = core.make_number(altimeter, literal=True) 176 elif item.isdigit(): 177 if item[0] == "6": 178 icing.append(data.pop(i)) 179 elif item[0] == "5": 180 turbulence.append(data.pop(i)) 181 return data, altimeter_number, icing, turbulence
Returns the report list and removed: Altimeter string, Icing list, Turbulence list
184def starts_new_line(item: str) -> bool: 185 """Returns True if the given element should start a new report line""" 186 if item in TAF_NEWLINE: 187 return True 188 return any(item.startswith(start) for start in TAF_NEWLINE_STARTSWITH)
Returns True if the given element should start a new report line
191def split_taf(txt: str) -> List[str]: 192 """Splits a TAF report into each distinct time period""" 193 lines = [] 194 split = txt.split() 195 last_index = 0 196 for i, item in enumerate(split): 197 if starts_new_line(item) and i != 0 and not split[i - 1].startswith("PROB"): 198 lines.append(" ".join(split[last_index:i])) 199 last_index = i 200 lines.append(" ".join(split[last_index:])) 201 return lines
Splits a TAF report into each distinct time period
205def get_type_and_times( 206 data: List[str], 207) -> Tuple[List[str], str, Optional[str], Optional[str], Optional[str]]: 208 """Returns the report list and removed: 209 210 Report type string, start time string, end time string 211 """ 212 report_type, start_time, end_time, transition = "FROM", None, None, None 213 if data: 214 # TEMPO, BECMG, INTER 215 if data[0] in TAF_NEWLINE: 216 report_type = data.pop(0) 217 # PROB[30,40] 218 elif len(data[0]) == 6 and data[0].startswith("PROB"): 219 report_type = data.pop(0) 220 if data: 221 # 1200/1306 222 if ( 223 len(data[0]) == 9 224 and data[0][4] == "/" 225 and data[0][:4].isdigit() 226 and data[0][5:].isdigit() 227 ): 228 start_time, end_time = data.pop(0).split("/") 229 230 # 1200 1306 231 elif ( 232 len(data) == 8 233 and len(data[0]) == 4 234 and len(data[1]) == 4 235 and data[0].isdigit() 236 and data[1].isdigit() 237 ): 238 start_time = data.pop(0) 239 end_time = data.pop(0) 240 241 # 120000 242 elif len(data[0]) == 6 and data[0].isdigit() and data[0][-2:] == "00": 243 start_time = data.pop(0)[:4] 244 # FM120000 245 elif len(data[0]) > 7 and data[0].startswith("FM"): 246 report_type = "FROM" 247 if ( 248 "/" in data[0] 249 and data[0][2:].split("/")[0].isdigit() 250 and data[0][2:].split("/")[1].isdigit() 251 ): 252 start_time, end_time = data.pop(0)[2:].split("/") 253 elif data[0][2:8].isdigit(): 254 start_time = data.pop(0)[2:6] 255 # TL120600 256 if ( 257 data 258 and len(data[0]) > 7 259 and data[0].startswith("TL") 260 and data[0][2:8].isdigit() 261 ): 262 end_time = data.pop(0)[2:6] 263 if report_type == "BECMG": 264 transition, start_time, end_time = start_time, end_time, None 265 return data, report_type, start_time, end_time, transition
Returns the report list and removed:
Report type string, start time string, end time string
287def find_missing_taf_times( 288 lines: List[TafLineData], start: Optional[Timestamp], end: Optional[Timestamp] 289) -> List[TafLineData]: 290 """Fix any missing time issues (except for error/empty lines)""" 291 if not lines: 292 return lines 293 # Assign start time 294 lines[0].start_time = start 295 # Fix other times 296 last_fm_line = 0 297 for i, line in enumerate(lines): 298 if _is_tempo_or_prob(line): 299 continue 300 last_fm_line = i 301 # Search remaining lines to fill empty end or previous for empty start 302 for target, other, direc in (("start", "end", -1), ("end", "start", 1)): 303 target += "_time" 304 if not getattr(line, target): 305 setattr( 306 line, target, _get_next_time(lines[i::direc][1:], f"{other}_time") 307 ) 308 # Special case for final forcast 309 if last_fm_line: 310 lines[last_fm_line].end_time = end 311 # Reset original end time if still empty 312 if lines and not lines[0].end_time: 313 lines[0].end_time = end 314 return lines
Fix any missing time issues (except for error/empty lines)
317def get_wind_shear(data: List[str]) -> Tuple[List[str], Optional[str]]: 318 """Returns the report list and the remove wind shear""" 319 shear = None 320 for i, item in reversed(list(enumerate(data))): 321 if len(item) > 6 and item.startswith("WS") and item[5] == "/": 322 shear = data.pop(i).replace("KT", "") 323 return data, shear
Returns the report list and the remove wind shear
326def get_temp_min_and_max( 327 data: List[str], 328) -> Tuple[List[str], Optional[str], Optional[str]]: 329 """Pull out Max temp at time and Min temp at time items from wx list""" 330 temp_max, temp_min = "", "" 331 for i, item in reversed(list(enumerate(data))): 332 if len(item) > 6 and item[0] == "T" and "/" in item: 333 # TX12/1316Z 334 if item[1] == "X": 335 temp_max = data.pop(i) 336 # TNM03/1404Z 337 elif item[1] == "N": 338 temp_min = data.pop(i) 339 # TM03/1404Z T12/1316Z -> Will fix TN/TX 340 elif item[1] == "M" or item[1].isdigit(): 341 if temp_min: 342 if int(temp_min[2 : temp_min.find("/")].replace("M", "-")) > int( 343 item[1 : item.find("/")].replace("M", "-") 344 ): 345 temp_max, temp_min = f"TX{temp_min[2:]}", f"TN{item[1:]}" 346 else: 347 temp_max = f"TX{item[1:]}" 348 else: 349 temp_min = f"TN{item[1:]}" 350 data.pop(i) 351 return data, temp_max or None, temp_min or None
Pull out Max temp at time and Min temp at time items from wx list
354def get_oceania_temp_and_alt(data: List[str]) -> Tuple[List[str], List[str], List[str]]: 355 """Get Temperature and Altimeter lists for Oceania TAFs""" 356 tlist: List[str] = [] 357 qlist: List[str] = [] 358 if "T" in data: 359 data, tlist = core.get_digit_list(data, data.index("T")) 360 if "Q" in data: 361 data, qlist = core.get_digit_list(data, data.index("Q")) 362 return data, tlist, qlist
Get Temperature and Altimeter lists for Oceania TAFs
365def get_taf_flight_rules(lines: List[TafLineData]) -> List[TafLineData]: 366 """Get flight rules by looking for missing data in prior reports""" 367 for i, line in enumerate(lines): 368 temp_vis, temp_cloud, is_clear = line.visibility, line.clouds, False 369 for report in reversed(lines[: i + 1]): 370 if not _is_tempo_or_prob(report): 371 if not temp_vis: 372 temp_vis = report.visibility 373 # SKC or CLR should force no clouds instead of looking back 374 if "SKC" in report.other or "CLR" in report.other: 375 is_clear = True 376 elif temp_vis and temp_vis.repr == "CAVOK": 377 is_clear = True 378 elif temp_cloud == []: 379 temp_cloud = report.clouds 380 if temp_vis and temp_cloud != []: 381 break 382 if is_clear: 383 temp_cloud = [] 384 line.flight_rules = FLIGHT_RULES[ 385 core.get_flight_rules(temp_vis, core.get_ceiling(temp_cloud)) 386 ] 387 return lines
Get flight rules by looking for missing data in prior reports
390def fix_report_header(report: str) -> str: 391 """Corrects the header order for key elements""" 392 split_report = report.split() 393 394 # Limit scope to only the first few elements. Remarks may include similar tokens 395 header_length = min(len(split_report), 6) 396 headers = split_report[:header_length] 397 398 fixed_headers = [] 399 for target in ("TAF", "AMD", "COR"): 400 with suppress(ValueError): 401 headers.remove(target) 402 fixed_headers.append(target) 403 404 return " ".join(fixed_headers + headers + split_report[header_length:])
Corrects the header order for key elements
407def parse( 408 station: str, report: str, issued: Optional[date] = None 409) -> Tuple[Optional[TafData], Optional[Units], Optional[Sanitization]]: 410 """Returns TafData and Units dataclasses with parsed data and their associated units""" 411 # pylint: disable=too-many-locals 412 if not report: 413 return None, None, None 414 valid_station(station) 415 report = fix_report_header(report) 416 while len(report) > 3 and report[:4] in ("TAF ", "AMD ", "COR "): 417 report = report[4:] 418 start_time: Optional[Timestamp] = None 419 end_time: Optional[Timestamp] = None 420 sans = Sanitization() 421 sanitized = clean_taf_string(report, sans) 422 _, new_station, time = core.get_station_and_time(sanitized[:20].split()) 423 if new_station is not None: 424 station = new_station 425 sanitized = sanitized.replace(station, "") 426 if time: 427 sanitized = sanitized.replace(time, "").strip() 428 units = Units.north_american() if uses_na_format(station) else Units.international() 429 # Find and remove remarks 430 sanitized, remarks = get_taf_remarks(sanitized) 431 # Split and parse each line 432 lines = split_taf(sanitized) 433 parsed_lines = parse_lines(lines, units, sans, issued) 434 # Perform additional info extract and corrections 435 max_temp: Optional[str] = None 436 min_temp: Optional[str] = None 437 if parsed_lines: 438 ( 439 parsed_lines[-1].other, 440 max_temp, 441 min_temp, 442 ) = get_temp_min_and_max(parsed_lines[-1].other) 443 if not (max_temp or min_temp): 444 ( 445 parsed_lines[0].other, 446 max_temp, 447 min_temp, 448 ) = get_temp_min_and_max(parsed_lines[0].other) 449 # Set start and end times based on the first line 450 start_time, end_time = parsed_lines[0].start_time, parsed_lines[0].end_time 451 parsed_lines[0].end_time = None 452 parsed_lines = find_missing_taf_times(parsed_lines, start_time, end_time) 453 parsed_lines = get_taf_flight_rules(parsed_lines) 454 # Extract Oceania-specific data 455 alts: Optional[List[str]] = None 456 temps: Optional[List[str]] = None 457 if station[0] == "A": 458 ( 459 parsed_lines[-1].other, 460 alts, 461 temps, 462 ) = get_oceania_temp_and_alt(parsed_lines[-1].other) 463 # Convert wx codes 464 for line in parsed_lines: 465 line.other, line.wx_codes = get_wx_codes(line.other) 466 sanitized = " ".join(i for i in (station, time, sanitized) if i) 467 struct = TafData( 468 raw=report, 469 sanitized=sanitized, 470 station=station, 471 time=core.make_timestamp(time, target_date=issued), 472 remarks=remarks, 473 remarks_info=parse_remarks(remarks), 474 forecast=parsed_lines, 475 start_time=start_time, 476 end_time=end_time, 477 max_temp=max_temp, 478 min_temp=min_temp, 479 alts=alts, 480 temps=temps, 481 ) 482 return struct, units, sans
Returns TafData and Units dataclasses with parsed data and their associated units
485def parse_lines( 486 lines: List[str], units: Units, sans: Sanitization, issued: Optional[date] = None 487) -> List[TafLineData]: 488 """Returns a list of parsed line dictionaries""" 489 parsed_lines: List[TafLineData] = [] 490 prob = "" 491 while lines: 492 raw_line = lines[0].strip() 493 line = sanitize_line(raw_line, sans) 494 # Remove prob from the beginning of a line 495 if line.startswith("PROB"): 496 # Add standalone prob to next line 497 if len(line) == 6: 498 prob = line 499 line = "" 500 # Add to current line 501 elif len(line) > 6: 502 prob = line[:6] 503 line = line[6:].strip() 504 if line: 505 parsed_line = parse_line(line, units, sans, issued) 506 parsed_line.probability = ( 507 None if " " in prob else core.make_number(prob[4:]) 508 ) 509 parsed_line.raw = raw_line 510 if prob: 511 parsed_line.sanitized = f"{prob} {parsed_line.sanitized}" 512 prob = "" 513 parsed_lines.append(parsed_line) 514 lines.pop(0) 515 return parsed_lines
Returns a list of parsed line dictionaries
518def parse_line( 519 line: str, units: Units, sans: Sanitization, issued: Optional[date] = None 520) -> TafLineData: 521 """Parser for the International TAF forcast variant""" 522 # pylint: disable=too-many-locals 523 data = core.dedupe(line.split()) 524 data = clean_taf_list(data, sans) 525 sanitized = " ".join(data) 526 data, report_type, start_time, end_time, transition = get_type_and_times(data) 527 data, wind_shear = get_wind_shear(data) 528 ( 529 data, 530 wind_direction, 531 wind_speed, 532 wind_gust, 533 wind_variable_direction, 534 ) = core.get_wind(data, units) 535 if "CAVOK" in data: 536 visibility = core.make_number("CAVOK") 537 clouds: List[Cloud] = [] 538 data.pop(data.index("CAVOK")) 539 else: 540 data, visibility = core.get_visibility(data, units) 541 data, clouds = core.get_clouds(data) 542 other, altimeter, icing, turbulence = get_alt_ice_turb(data) 543 return TafLineData( 544 altimeter=altimeter, 545 clouds=clouds, 546 flight_rules="", 547 other=other, 548 visibility=visibility, 549 wind_direction=wind_direction, 550 wind_gust=wind_gust, 551 wind_speed=wind_speed, 552 wx_codes=[], 553 end_time=core.make_timestamp(end_time, target_date=issued), 554 icing=icing, 555 probability=None, 556 raw=line, 557 sanitized=sanitized, 558 start_time=core.make_timestamp(start_time, target_date=issued), 559 transition_start=core.make_timestamp(transition, target_date=issued), 560 turbulence=turbulence, 561 type=report_type, 562 wind_shear=wind_shear, 563 wind_variable_direction=wind_variable_direction, 564 )
Parser for the International TAF forcast variant