avwx.current.taf

A TAF (Terminal Aerodrome Forecast) is a 24-hour weather forecast for the area 5 statute miles from the reporting station. They are update once every three or six hours or when significant changes warrant an update, and the observations are valid for six hours or until the next report is issued

  1"""
  2A TAF (Terminal Aerodrome Forecast) is a 24-hour weather forecast for the area
  35 statute miles from the reporting station. They are update once every three or
  4six hours or when significant changes warrant an update, and the observations
  5are valid for six hours or until the next report is issued
  6"""
  7
  8# stdlib
  9from __future__ import annotations
 10
 11from contextlib import suppress
 12from typing import TYPE_CHECKING
 13
 14# module
 15from avwx.current.base import Report, get_wx_codes
 16from avwx.parsing import core, speech, summary
 17from avwx.parsing.remarks import parse as parse_remarks
 18from avwx.parsing.sanitization.taf import clean_taf_list, clean_taf_string
 19from avwx.parsing.translate.taf import translate_taf
 20from avwx.static.core import FLIGHT_RULES
 21from avwx.static.taf import TAF_NEWLINE, TAF_NEWLINE_STARTSWITH, TAF_RMK
 22from avwx.station import uses_na_format, valid_station
 23from avwx.structs import (
 24    Cloud,
 25    Number,
 26    Sanitization,
 27    TafData,
 28    TafLineData,
 29    TafTrans,
 30    Timestamp,
 31    Units,
 32)
 33
 34if TYPE_CHECKING:
 35    from datetime import date
 36
 37
 38class Taf(Report):
 39    """
 40    The Taf class offers an object-oriented approach to managing TAF data for a
 41    single station.
 42
 43    ```python
 44    >>> from avwx import Taf
 45    >>> kjfk = Taf("KJFK")
 46    >>> kjfk.station.name
 47    'John F Kennedy International Airport'
 48    >>> kjfk.update()
 49    True
 50    >>> kjfk.last_updated
 51    datetime.datetime(2018, 3, 4, 23, 43, 26, 209644, tzinfo=datetime.timezone.utc)
 52    >>> kjfk.raw
 53    'KJFK 042030Z 0421/0524 33016G27KT P6SM BKN045 FM051600 36016G22KT P6SM BKN040 FM052100 35013KT P6SM SCT035'
 54    >>> len(kjfk.data.forecast)
 55    3
 56    >>> kjfk.data.forecast[0].flight_rules
 57    'VFR'
 58    >>> kjfk.translations.forecast[0].wind
 59    'NNW-330 at 16kt gusting to 27kt'
 60    >>> kjfk.speech
 61    'Starting on March 4th - From 21 to 16 zulu, Winds three three zero at 16kt gusting to 27kt. Visibility greater than six miles. Broken layer at 4500ft. From 16 to 21 zulu, Winds three six zero at 16kt gusting to 22kt. Visibility greater than six miles. Broken layer at 4000ft. From 21 to midnight zulu, Winds three five zero at 13kt. Visibility greater than six miles. Scattered clouds at 3500ft'
 62    ```
 63
 64    The `parse` and `from_report` methods can parse a report string if you want
 65    to override the normal fetching process.
 66
 67    ```python
 68    >>> from avwx import Taf
 69    >>> report = "TAF ZYHB 082300Z 0823/0911 VRB03KT 9999 SCT018 BKN120 TX14/0907Z TN04/0921Z FM090100 09015KT 9999 -SHRA WS020/13045KT SCT018 BKN120 BECMG 0904/0906 34008KT PROB30 TEMPO 0906/0911 7000 -RA SCT020 650104 530804 RMK FCST BASED ON AUTO OBS. NXT FCST BY 090600Z"
 70    >>> zyhb = Taf.from_report(report)
 71    True
 72    >>> zyhb.station.city
 73    'Hulan'
 74    >>> zyhb.data.remarks
 75    'RMK FCST BASED ON AUTO OBS. NXT FCST BY 090600Z'
 76    >>> zyhb.summary[-1]
 77    'Vis 7km, Light Rain, Scattered clouds at 2000ft, Frequent moderate turbulence in clear air from 8000ft to 12000ft, Moderate icing in clouds from 1000ft to 5000ft'
 78    ```
 79    """
 80
 81    data: TafData | None = None
 82    translations: TafTrans | None = None  # type: ignore
 83
 84    async def _post_update(self) -> None:
 85        if self.code is None or self.raw is None:
 86            return
 87        self.data, self.units, self.sanitization = parse(self.code, self.raw, self.issued)
 88        if self.data is None or self.units is None:
 89            return
 90        self.translations = translate_taf(self.data, self.units)
 91
 92    def _post_parse(self) -> None:
 93        if self.code is None or self.raw is None:
 94            return
 95        self.data, self.units, self.sanitization = parse(self.code, self.raw, self.issued)
 96        if self.data is None or self.units is None:
 97            return
 98        self.translations = translate_taf(self.data, self.units)
 99
100    @property
101    def summary(self) -> list[str]:
102        """Condensed summary for each forecast created from translations."""
103        if not self.translations:
104            self.update()
105        if self.translations is None or self.translations.forecast is None:
106            return []
107        return [summary.taf(trans) for trans in self.translations.forecast]
108
109    @property
110    def speech(self) -> str | None:
111        """Report summary designed to be read by a text-to-speech program."""
112        if not self.data:
113            self.update()
114        if self.data is None or self.units is None:
115            return None
116        return speech.taf(self.data, self.units)
117
118
119LINE_FIXES = {
120    "TEMP0": "TEMPO",
121    "TEMP O": "TEMPO",
122    "TMPO": "TEMPO",
123    "TE MPO": "TEMPO",
124    "TEMP ": "TEMPO ",
125    "T EMPO": "TEMPO",
126    " EMPO": " TEMPO",
127    "TEMO": "TEMPO",
128    "BECM G": "BECMG",
129    "BEMCG": "BECMG",
130    "BE CMG": "BECMG",
131    "B ECMG": "BECMG",
132    " BEC ": " BECMG ",
133    "BCEMG": "BECMG",
134    "BEMG": "BECMG",
135}
136
137
138def sanitize_line(txt: str, sans: Sanitization) -> str:
139    """Fix common mistakes with 'new line' signifiers so that they can be recognized."""
140    for key, fix in LINE_FIXES.items():
141        if key in txt:
142            txt = txt.replace(key, fix)
143            sans.log(key, fix)
144    # Fix when space is missing following new line signifiers
145    for item in ["BECMG", "TEMPO"]:
146        if item in txt and f"{item} " not in txt:
147            index = txt.find(item) + len(item)
148            txt = f"{txt[:index]} {txt[index:]}"
149            sans.extra_spaces_needed = True
150    return txt
151
152
153def get_taf_remarks(txt: str) -> tuple[str, str]:
154    """Return report and remarks separated if found."""
155    remarks_start = core.find_first_in_list(txt, TAF_RMK)
156    if remarks_start == -1:
157        return txt, ""
158    remarks = txt[remarks_start:]
159    txt = txt[:remarks_start].strip()
160    return txt, remarks
161
162
163def get_alt_ice_turb(
164    data: list[str],
165) -> tuple[list[str], Number | None, list[str], list[str]]:
166    """Return the report list and removed: Altimeter string, Icing list, Turbulence list."""
167    altimeter_number = None
168    icing, turbulence = [], []
169    for i, item in reversed(list(enumerate(data))):
170        if len(item) > 6 and item.startswith("QNH") and item[3:7].isdigit():
171            altimeter = data.pop(i)[3:7]
172            if altimeter[0] in ("2", "3"):
173                altimeter = f"{altimeter[:2]}.{altimeter[2:]}"
174            altimeter_number = core.make_number(altimeter, literal=True)
175        elif item.isdigit():
176            if item[0] == "6":
177                icing.append(data.pop(i))
178            elif item[0] == "5":
179                turbulence.append(data.pop(i))
180    return data, altimeter_number, icing, turbulence
181
182
183def is_normal_time(item: str) -> bool:
184    """Return if the item looks like a valid TAF (1200/1400) time range."""
185    return len(item) == 9 and item[4] == "/" and item[:4].isdigit() and item[5:].isdigit()
186
187
188def starts_new_line(item: str) -> bool:
189    """Returns True if the given element should start a new report line"""
190    if item in TAF_NEWLINE:
191        return True
192    return any(item.startswith(start) for start in TAF_NEWLINE_STARTSWITH)
193
194
195def split_taf(txt: str) -> list[str]:
196    """Split a TAF report into each distinct time period."""
197    lines = []
198    split = txt.split()
199    last_index = 0
200    e_splits = enumerate(split)
201    next(e_splits)
202    for i, item in e_splits:
203        if (starts_new_line(item) and not split[i - 1].startswith("PROB")) or (
204            is_normal_time(item) and not starts_new_line(split[i - 1])
205        ):
206            lines.append(" ".join(split[last_index:i]))
207            last_index = i
208    lines.append(" ".join(split[last_index:]))
209    return lines
210
211
212# TAF line report type and start/end times
213def get_type_and_times(
214    data: list[str],
215) -> tuple[list[str], str, str | None, str | None, str | None]:
216    """Extract the report type string, start time string, and end time string."""
217    report_type, start_time, end_time, transition = "FROM", None, None, None
218    # TEMPO, BECMG, INTER
219    if data and data[0] in TAF_NEWLINE or len(data[0]) == 6 and data[0].startswith("PROB"):
220        report_type = data.pop(0)
221    if data:
222        item, length = data[0], len(data[0])
223        # 1200/1306
224        if is_normal_time(item):
225            start_time, end_time = data.pop(0).split("/")
226
227        # 1200 1306
228        elif len(data) == 8 and length == 4 and len(data[1]) == 4 and item.isdigit() and data[1].isdigit():
229            start_time = data.pop(0)
230            end_time = data.pop(0)
231
232        # 120000
233        elif length == 6 and item.isdigit() and item[-2:] == "00":
234            start_time = data.pop(0)[:4]
235        # FM120000
236        elif length > 7 and item.startswith("FM"):
237            report_type = "FROM"
238            if "/" in item and item[2:].split("/")[0].isdigit() and item[2:].split("/")[1].isdigit():
239                start_time, end_time = data.pop(0)[2:].split("/")
240            elif item[2:8].isdigit():
241                start_time = data.pop(0)[2:6]
242            # TL120600
243            if data and length > 7 and data[0].startswith("TL") and data[0][2:8].isdigit():
244                end_time = data.pop(0)[2:6]
245        elif report_type == "BECMG" and length == 5:
246            # 1200/
247            if item[-1] == "/" and item[:4].isdigit():
248                start_time = data.pop(0)[:4]
249            # /1200
250            elif item[0] == "/" and item[1:].isdigit():
251                end_time = data.pop(0)[1:]
252    if report_type == "BECMG":
253        transition, start_time, end_time = start_time, end_time, None
254    return data, report_type, start_time, end_time, transition
255
256
257def _is_tempo_or_prob(line: TafLineData) -> bool:
258    """Return True if report type is TEMPO or non-null probability."""
259    return line.type == "TEMPO" or line.probability is not None
260
261
262def _get_next_time(lines: list[TafLineData], target: str) -> Timestamp | None:
263    """Returns the next normal time target value or empty"""
264    for line in lines:
265        if _is_tempo_or_prob(line):
266            continue
267        time = line.transition_start or getattr(line, target) if target == "start_time" else getattr(line, target)
268        if time:
269            return time  # type: ignore
270    return None
271
272
273def find_missing_taf_times(
274    lines: list[TafLineData], start: Timestamp | None, end: Timestamp | None
275) -> list[TafLineData]:
276    """Fix any missing time issues except for error/empty lines."""
277    if not lines:
278        return lines
279    # Assign start time
280    lines[0].start_time = start
281    # Fix other times
282    last_fm_line = 0
283    for i, line in enumerate(lines):
284        if _is_tempo_or_prob(line):
285            continue
286        last_fm_line = i
287        # Search remaining lines to fill empty end or previous for empty start
288        for target, other, direc in (("start", "end", -1), ("end", "start", 1)):
289            target += "_time"  # noqa: PLW2901
290            if not getattr(line, target):
291                setattr(line, target, _get_next_time(lines[i::direc][1:], f"{other}_time"))
292    # Special case for final forcast
293    if last_fm_line:
294        lines[last_fm_line].end_time = end
295    # Reset original end time if still empty
296    if lines and not lines[0].end_time:
297        lines[0].end_time = end
298    return lines
299
300
301def get_wind_shear(data: list[str]) -> tuple[list[str], str | None]:
302    """Return the report list and the remove wind shear."""
303    shear = None
304    for i, item in reversed(list(enumerate(data))):
305        if len(item) > 6 and item.startswith("WS") and item[5] == "/":
306            shear = data.pop(i).replace("KT", "")
307    return data, shear
308
309
310def get_temp_min_and_max(
311    data: list[str],
312) -> tuple[list[str], str | None, str | None]:
313    """Pull out Max temp at time and Min temp at time items from wx list."""
314    temp_max, temp_min = "", ""
315    for i, item in reversed(list(enumerate(data))):
316        if len(item) > 6 and item[0] == "T" and "/" in item:
317            # TX12/1316Z
318            if item[1] == "X":
319                temp_max = data.pop(i)
320            # TNM03/1404Z
321            elif item[1] == "N":
322                temp_min = data.pop(i)
323            # TM03/1404Z T12/1316Z -> Will fix TN/TX
324            elif item[1] == "M" or item[1].isdigit():
325                if temp_min:
326                    if int(temp_min[2 : temp_min.find("/")].replace("M", "-")) > int(
327                        item[1 : item.find("/")].replace("M", "-")
328                    ):
329                        temp_max, temp_min = f"TX{temp_min[2:]}", f"TN{item[1:]}"
330                    else:
331                        temp_max = f"TX{item[1:]}"
332                else:
333                    temp_min = f"TN{item[1:]}"
334                data.pop(i)
335    return data, temp_max or None, temp_min or None
336
337
338def get_oceania_temp_and_alt(data: list[str]) -> tuple[list[str], list[str], list[str]]:
339    """Get Temperature and Altimeter lists for Oceania TAFs."""
340    tlist: list[str] = []
341    qlist: list[str] = []
342    if "T" in data:
343        data, tlist = core.get_digit_list(data, data.index("T"))
344    if "Q" in data:
345        data, qlist = core.get_digit_list(data, data.index("Q"))
346    return data, tlist, qlist
347
348
349def get_taf_flight_rules(lines: list[TafLineData]) -> list[TafLineData]:
350    """Get flight rules by looking for missing data in prior reports."""
351    for i, line in enumerate(lines):
352        temp_vis, temp_cloud, is_clear = line.visibility, line.clouds, False
353        for report in reversed(lines[: i + 1]):
354            if not _is_tempo_or_prob(report):
355                if not temp_vis:
356                    temp_vis = report.visibility
357                # SKC or CLR should force no clouds instead of looking back
358                if "SKC" in report.other or "CLR" in report.other or temp_vis and temp_vis.repr == "CAVOK":
359                    is_clear = True
360                elif temp_cloud == []:
361                    temp_cloud = report.clouds
362                if temp_vis and temp_cloud != []:
363                    break
364        if is_clear:
365            temp_cloud = []
366        line.flight_rules = FLIGHT_RULES[core.get_flight_rules(temp_vis, core.get_ceiling(temp_cloud))]
367    return lines
368
369
370def fix_report_header(report: str) -> str:
371    """Correct the header order for key elements."""
372    split_report = report.split()
373
374    # Limit scope to only the first few elements. Remarks may include similar tokens
375    header_length = min(len(split_report), 6)
376    headers = split_report[:header_length]
377
378    fixed_headers = []
379    for target in ("TAF", "AMD", "COR"):
380        with suppress(ValueError):
381            headers.remove(target)
382            fixed_headers.append(target)
383
384    return " ".join(fixed_headers + headers + split_report[header_length:])
385
386
387def _is_possible_start_end_time_slash(item: str) -> bool:
388    """Return True if item is a possible period start or end with missing element."""
389    return len(item) == 5 and (
390        # 1200/
391        (item[-1] == "/" and item[:4].isdigit())
392        or
393        # /1200
394        (item[0] == "/" and item[1:].isdigit())
395    )
396
397
398def parse(
399    station: str, report: str, issued: date | None = None
400) -> tuple[TafData | None, Units | None, Sanitization | None]:
401    """Return TafData and Units dataclasses with parsed data and their associated units."""
402    if not report:
403        return None, None, None
404    valid_station(station)
405    report = fix_report_header(report)
406    is_amended, is_correction = False, False
407    while len(report) > 3 and report[:4] in ("TAF ", "AMD ", "COR "):
408        if report[:3] == "AMD":
409            is_amended = True
410        elif report[:3] == "COR":
411            is_correction = True
412        report = report[4:]
413    start_time: Timestamp | None = None
414    end_time: Timestamp | None = None
415    sans = Sanitization()
416    sanitized = clean_taf_string(report, sans)
417    _, new_station, time = core.get_station_and_time(sanitized[:20].split())
418    if new_station is not None:
419        station = new_station
420    sanitized = sanitized.replace(station, "")
421    if time:
422        sanitized = sanitized.replace(time, "").strip()
423    units = Units.north_american() if uses_na_format(station) else Units.international()
424    # Find and remove remarks
425    sanitized, remarks = get_taf_remarks(sanitized)
426    if remarks.startswith("AMD"):
427        is_amended = True
428    # Split and parse each line
429    lines = split_taf(sanitized)
430    parsed_lines = parse_lines(lines, units, sans, issued)
431    # Perform additional info extract and corrections
432    max_temp: str | None = None
433    min_temp: str | None = None
434    if parsed_lines:
435        (
436            parsed_lines[-1].other,
437            max_temp,
438            min_temp,
439        ) = get_temp_min_and_max(parsed_lines[-1].other)
440        if not (max_temp or min_temp):
441            (
442                parsed_lines[0].other,
443                max_temp,
444                min_temp,
445            ) = get_temp_min_and_max(parsed_lines[0].other)
446        # Set start and end times based on the first line
447        start_time, end_time = parsed_lines[0].start_time, parsed_lines[0].end_time
448        parsed_lines[0].end_time = None
449        parsed_lines = find_missing_taf_times(parsed_lines, start_time, end_time)
450        parsed_lines = get_taf_flight_rules(parsed_lines)
451    # Extract Oceania-specific data
452    alts: list[str] | None = None
453    temps: list[str] | None = None
454    if station[0] == "A":
455        (
456            parsed_lines[-1].other,
457            alts,
458            temps,
459        ) = get_oceania_temp_and_alt(parsed_lines[-1].other)
460    # Convert wx codes
461    for line in parsed_lines:
462        line.other, line.wx_codes = get_wx_codes(line.other)
463    sanitized = " ".join(i for i in (station, time, sanitized) if i)
464    struct = TafData(
465        raw=report,
466        sanitized=sanitized,
467        station=station,
468        time=core.make_timestamp(time, target_date=issued),
469        remarks=remarks,
470        remarks_info=parse_remarks(remarks),
471        forecast=parsed_lines,
472        start_time=start_time,
473        end_time=end_time,
474        is_amended=is_amended,
475        is_correction=is_correction,
476        max_temp=max_temp,
477        min_temp=min_temp,
478        alts=alts,
479        temps=temps,
480    )
481    return struct, units, sans
482
483
484def parse_lines(lines: list[str], units: Units, sans: Sanitization, issued: date | None = None) -> list[TafLineData]:
485    """Return a list of parsed line dictionaries."""
486    parsed_lines: list[TafLineData] = []
487    prob = ""
488    while lines:
489        raw_line = lines[0].strip()
490        line = sanitize_line(raw_line, sans)
491        # Remove prob from the beginning of a line
492        if line.startswith("PROB"):
493            # Add standalone prob to next line
494            if len(line) == 6:
495                prob = line
496                line = ""
497            # Add to current line
498            elif len(line) > 6:
499                prob = line[:6]
500                line = line[6:].strip()
501        if line:
502            parsed_line = parse_line(line, units, sans, issued)
503            parsed_line.probability = None if " " in prob else core.make_number(prob[4:])
504            parsed_line.raw = raw_line
505            if prob:
506                parsed_line.sanitized = f"{prob} {parsed_line.sanitized}"
507            prob = ""
508            parsed_lines.append(parsed_line)
509        lines.pop(0)
510    return parsed_lines
511
512
513def parse_line(line: str, units: Units, sans: Sanitization, issued: date | None = None) -> TafLineData:
514    """Parser for the International TAF forcast variant."""
515    data: list[str] = core.dedupe(line.split())
516    # Grab original time piece under certain conditions to preserve a useful slash
517    old_time = data[1] if len(data) > 1 and _is_possible_start_end_time_slash(data[1]) else None
518    data = clean_taf_list(data, sans)
519    if old_time and len(data) > 1 and data[1] == old_time.strip("/"):
520        data[1] = old_time
521    sanitized = " ".join(data)
522    data, report_type, start_time, end_time, transition = get_type_and_times(data)
523    data, wind_shear = get_wind_shear(data)
524    (
525        data,
526        wind_direction,
527        wind_speed,
528        wind_gust,
529        wind_variable_direction,
530    ) = core.get_wind(data, units)
531    if "CAVOK" in data:
532        visibility = core.make_number("CAVOK")
533        clouds: list[Cloud] = []
534        data.pop(data.index("CAVOK"))
535    else:
536        data, visibility = core.get_visibility(data, units)
537        data, clouds = core.get_clouds(data)
538    other, altimeter, icing, turbulence = get_alt_ice_turb(data)
539    return TafLineData(
540        altimeter=altimeter,
541        clouds=clouds,
542        flight_rules="",
543        other=other,
544        visibility=visibility,
545        wind_direction=wind_direction,
546        wind_gust=wind_gust,
547        wind_speed=wind_speed,
548        wx_codes=[],
549        end_time=core.make_timestamp(end_time, target_date=issued),
550        icing=icing,
551        probability=None,
552        raw=line,
553        sanitized=sanitized,
554        start_time=core.make_timestamp(start_time, target_date=issued),
555        transition_start=core.make_timestamp(transition, target_date=issued),
556        turbulence=turbulence,
557        type=report_type,
558        wind_shear=wind_shear,
559        wind_variable_direction=wind_variable_direction,
560    )
class Taf(avwx.current.base.Report):
 39class Taf(Report):
 40    """
 41    The Taf class offers an object-oriented approach to managing TAF data for a
 42    single station.
 43
 44    ```python
 45    >>> from avwx import Taf
 46    >>> kjfk = Taf("KJFK")
 47    >>> kjfk.station.name
 48    'John F Kennedy International Airport'
 49    >>> kjfk.update()
 50    True
 51    >>> kjfk.last_updated
 52    datetime.datetime(2018, 3, 4, 23, 43, 26, 209644, tzinfo=datetime.timezone.utc)
 53    >>> kjfk.raw
 54    'KJFK 042030Z 0421/0524 33016G27KT P6SM BKN045 FM051600 36016G22KT P6SM BKN040 FM052100 35013KT P6SM SCT035'
 55    >>> len(kjfk.data.forecast)
 56    3
 57    >>> kjfk.data.forecast[0].flight_rules
 58    'VFR'
 59    >>> kjfk.translations.forecast[0].wind
 60    'NNW-330 at 16kt gusting to 27kt'
 61    >>> kjfk.speech
 62    'Starting on March 4th - From 21 to 16 zulu, Winds three three zero at 16kt gusting to 27kt. Visibility greater than six miles. Broken layer at 4500ft. From 16 to 21 zulu, Winds three six zero at 16kt gusting to 22kt. Visibility greater than six miles. Broken layer at 4000ft. From 21 to midnight zulu, Winds three five zero at 13kt. Visibility greater than six miles. Scattered clouds at 3500ft'
 63    ```
 64
 65    The `parse` and `from_report` methods can parse a report string if you want
 66    to override the normal fetching process.
 67
 68    ```python
 69    >>> from avwx import Taf
 70    >>> report = "TAF ZYHB 082300Z 0823/0911 VRB03KT 9999 SCT018 BKN120 TX14/0907Z TN04/0921Z FM090100 09015KT 9999 -SHRA WS020/13045KT SCT018 BKN120 BECMG 0904/0906 34008KT PROB30 TEMPO 0906/0911 7000 -RA SCT020 650104 530804 RMK FCST BASED ON AUTO OBS. NXT FCST BY 090600Z"
 71    >>> zyhb = Taf.from_report(report)
 72    True
 73    >>> zyhb.station.city
 74    'Hulan'
 75    >>> zyhb.data.remarks
 76    'RMK FCST BASED ON AUTO OBS. NXT FCST BY 090600Z'
 77    >>> zyhb.summary[-1]
 78    'Vis 7km, Light Rain, Scattered clouds at 2000ft, Frequent moderate turbulence in clear air from 8000ft to 12000ft, Moderate icing in clouds from 1000ft to 5000ft'
 79    ```
 80    """
 81
 82    data: TafData | None = None
 83    translations: TafTrans | None = None  # type: ignore
 84
 85    async def _post_update(self) -> None:
 86        if self.code is None or self.raw is None:
 87            return
 88        self.data, self.units, self.sanitization = parse(self.code, self.raw, self.issued)
 89        if self.data is None or self.units is None:
 90            return
 91        self.translations = translate_taf(self.data, self.units)
 92
 93    def _post_parse(self) -> None:
 94        if self.code is None or self.raw is None:
 95            return
 96        self.data, self.units, self.sanitization = parse(self.code, self.raw, self.issued)
 97        if self.data is None or self.units is None:
 98            return
 99        self.translations = translate_taf(self.data, self.units)
100
101    @property
102    def summary(self) -> list[str]:
103        """Condensed summary for each forecast created from translations."""
104        if not self.translations:
105            self.update()
106        if self.translations is None or self.translations.forecast is None:
107            return []
108        return [summary.taf(trans) for trans in self.translations.forecast]
109
110    @property
111    def speech(self) -> str | None:
112        """Report summary designed to be read by a text-to-speech program."""
113        if not self.data:
114            self.update()
115        if self.data is None or self.units is None:
116            return None
117        return speech.taf(self.data, self.units)

The Taf class offers an object-oriented approach to managing TAF data for a single station.

>>> from avwx import Taf
>>> kjfk = Taf("KJFK")
>>> kjfk.station.name
'John F Kennedy International Airport'
>>> kjfk.update()
True
>>> kjfk.last_updated
datetime.datetime(2018, 3, 4, 23, 43, 26, 209644, tzinfo=datetime.timezone.utc)
>>> kjfk.raw
'KJFK 042030Z 0421/0524 33016G27KT P6SM BKN045 FM051600 36016G22KT P6SM BKN040 FM052100 35013KT P6SM SCT035'
>>> len(kjfk.data.forecast)
3
>>> kjfk.data.forecast[0].flight_rules
'VFR'
>>> kjfk.translations.forecast[0].wind
'NNW-330 at 16kt gusting to 27kt'
>>> kjfk.speech
'Starting on March 4th - From 21 to 16 zulu, Winds three three zero at 16kt gusting to 27kt. Visibility greater than six miles. Broken layer at 4500ft. From 16 to 21 zulu, Winds three six zero at 16kt gusting to 22kt. Visibility greater than six miles. Broken layer at 4000ft. From 21 to midnight zulu, Winds three five zero at 13kt. Visibility greater than six miles. Scattered clouds at 3500ft'

The parse and from_report methods can parse a report string if you want to override the normal fetching process.

>>> from avwx import Taf
>>> report = "TAF ZYHB 082300Z 0823/0911 VRB03KT 9999 SCT018 BKN120 TX14/0907Z TN04/0921Z FM090100 09015KT 9999 -SHRA WS020/13045KT SCT018 BKN120 BECMG 0904/0906 34008KT PROB30 TEMPO 0906/0911 7000 -RA SCT020 650104 530804 RMK FCST BASED ON AUTO OBS. NXT FCST BY 090600Z"
>>> zyhb = Taf.from_report(report)
True
>>> zyhb.station.city
'Hulan'
>>> zyhb.data.remarks
'RMK FCST BASED ON AUTO OBS. NXT FCST BY 090600Z'
>>> zyhb.summary[-1]
'Vis 7km, Light Rain, Scattered clouds at 2000ft, Frequent moderate turbulence in clear air from 8000ft to 12000ft, Moderate icing in clouds from 1000ft to 5000ft'
data: avwx.structs.TafData | None = None
translations: avwx.structs.TafTrans | None = None
summary: list[str]
101    @property
102    def summary(self) -> list[str]:
103        """Condensed summary for each forecast created from translations."""
104        if not self.translations:
105            self.update()
106        if self.translations is None or self.translations.forecast is None:
107            return []
108        return [summary.taf(trans) for trans in self.translations.forecast]

Condensed summary for each forecast created from translations.

speech: str | None
110    @property
111    def speech(self) -> str | None:
112        """Report summary designed to be read by a text-to-speech program."""
113        if not self.data:
114            self.update()
115        if self.data is None or self.units is None:
116            return None
117        return speech.taf(self.data, self.units)

Report summary designed to be read by a text-to-speech program.

LINE_FIXES = {'TEMP0': 'TEMPO', 'TEMP O': 'TEMPO', 'TMPO': 'TEMPO', 'TE MPO': 'TEMPO', 'TEMP ': 'TEMPO ', 'T EMPO': 'TEMPO', ' EMPO': ' TEMPO', 'TEMO': 'TEMPO', 'BECM G': 'BECMG', 'BEMCG': 'BECMG', 'BE CMG': 'BECMG', 'B ECMG': 'BECMG', ' BEC ': ' BECMG ', 'BCEMG': 'BECMG', 'BEMG': 'BECMG'}
def sanitize_line(txt: str, sans: avwx.structs.Sanitization) -> str:
139def sanitize_line(txt: str, sans: Sanitization) -> str:
140    """Fix common mistakes with 'new line' signifiers so that they can be recognized."""
141    for key, fix in LINE_FIXES.items():
142        if key in txt:
143            txt = txt.replace(key, fix)
144            sans.log(key, fix)
145    # Fix when space is missing following new line signifiers
146    for item in ["BECMG", "TEMPO"]:
147        if item in txt and f"{item} " not in txt:
148            index = txt.find(item) + len(item)
149            txt = f"{txt[:index]} {txt[index:]}"
150            sans.extra_spaces_needed = True
151    return txt

Fix common mistakes with 'new line' signifiers so that they can be recognized.

def get_taf_remarks(txt: str) -> tuple[str, str]:
154def get_taf_remarks(txt: str) -> tuple[str, str]:
155    """Return report and remarks separated if found."""
156    remarks_start = core.find_first_in_list(txt, TAF_RMK)
157    if remarks_start == -1:
158        return txt, ""
159    remarks = txt[remarks_start:]
160    txt = txt[:remarks_start].strip()
161    return txt, remarks

Return report and remarks separated if found.

def get_alt_ice_turb( data: list[str]) -> tuple[list[str], avwx.structs.Number | None, list[str], list[str]]:
164def get_alt_ice_turb(
165    data: list[str],
166) -> tuple[list[str], Number | None, list[str], list[str]]:
167    """Return the report list and removed: Altimeter string, Icing list, Turbulence list."""
168    altimeter_number = None
169    icing, turbulence = [], []
170    for i, item in reversed(list(enumerate(data))):
171        if len(item) > 6 and item.startswith("QNH") and item[3:7].isdigit():
172            altimeter = data.pop(i)[3:7]
173            if altimeter[0] in ("2", "3"):
174                altimeter = f"{altimeter[:2]}.{altimeter[2:]}"
175            altimeter_number = core.make_number(altimeter, literal=True)
176        elif item.isdigit():
177            if item[0] == "6":
178                icing.append(data.pop(i))
179            elif item[0] == "5":
180                turbulence.append(data.pop(i))
181    return data, altimeter_number, icing, turbulence

Return the report list and removed: Altimeter string, Icing list, Turbulence list.

def is_normal_time(item: str) -> bool:
184def is_normal_time(item: str) -> bool:
185    """Return if the item looks like a valid TAF (1200/1400) time range."""
186    return len(item) == 9 and item[4] == "/" and item[:4].isdigit() and item[5:].isdigit()

Return if the item looks like a valid TAF (1200/1400) time range.

def starts_new_line(item: str) -> bool:
189def starts_new_line(item: str) -> bool:
190    """Returns True if the given element should start a new report line"""
191    if item in TAF_NEWLINE:
192        return True
193    return any(item.startswith(start) for start in TAF_NEWLINE_STARTSWITH)

Returns True if the given element should start a new report line

def split_taf(txt: str) -> list[str]:
196def split_taf(txt: str) -> list[str]:
197    """Split a TAF report into each distinct time period."""
198    lines = []
199    split = txt.split()
200    last_index = 0
201    e_splits = enumerate(split)
202    next(e_splits)
203    for i, item in e_splits:
204        if (starts_new_line(item) and not split[i - 1].startswith("PROB")) or (
205            is_normal_time(item) and not starts_new_line(split[i - 1])
206        ):
207            lines.append(" ".join(split[last_index:i]))
208            last_index = i
209    lines.append(" ".join(split[last_index:]))
210    return lines

Split a TAF report into each distinct time period.

def get_type_and_times( data: list[str]) -> tuple[list[str], str, str | None, str | None, str | None]:
214def get_type_and_times(
215    data: list[str],
216) -> tuple[list[str], str, str | None, str | None, str | None]:
217    """Extract the report type string, start time string, and end time string."""
218    report_type, start_time, end_time, transition = "FROM", None, None, None
219    # TEMPO, BECMG, INTER
220    if data and data[0] in TAF_NEWLINE or len(data[0]) == 6 and data[0].startswith("PROB"):
221        report_type = data.pop(0)
222    if data:
223        item, length = data[0], len(data[0])
224        # 1200/1306
225        if is_normal_time(item):
226            start_time, end_time = data.pop(0).split("/")
227
228        # 1200 1306
229        elif len(data) == 8 and length == 4 and len(data[1]) == 4 and item.isdigit() and data[1].isdigit():
230            start_time = data.pop(0)
231            end_time = data.pop(0)
232
233        # 120000
234        elif length == 6 and item.isdigit() and item[-2:] == "00":
235            start_time = data.pop(0)[:4]
236        # FM120000
237        elif length > 7 and item.startswith("FM"):
238            report_type = "FROM"
239            if "/" in item and item[2:].split("/")[0].isdigit() and item[2:].split("/")[1].isdigit():
240                start_time, end_time = data.pop(0)[2:].split("/")
241            elif item[2:8].isdigit():
242                start_time = data.pop(0)[2:6]
243            # TL120600
244            if data and length > 7 and data[0].startswith("TL") and data[0][2:8].isdigit():
245                end_time = data.pop(0)[2:6]
246        elif report_type == "BECMG" and length == 5:
247            # 1200/
248            if item[-1] == "/" and item[:4].isdigit():
249                start_time = data.pop(0)[:4]
250            # /1200
251            elif item[0] == "/" and item[1:].isdigit():
252                end_time = data.pop(0)[1:]
253    if report_type == "BECMG":
254        transition, start_time, end_time = start_time, end_time, None
255    return data, report_type, start_time, end_time, transition

Extract the report type string, start time string, and end time string.

def find_missing_taf_times( lines: list[avwx.structs.TafLineData], start: avwx.structs.Timestamp | None, end: avwx.structs.Timestamp | None) -> list[avwx.structs.TafLineData]:
274def find_missing_taf_times(
275    lines: list[TafLineData], start: Timestamp | None, end: Timestamp | None
276) -> list[TafLineData]:
277    """Fix any missing time issues except for error/empty lines."""
278    if not lines:
279        return lines
280    # Assign start time
281    lines[0].start_time = start
282    # Fix other times
283    last_fm_line = 0
284    for i, line in enumerate(lines):
285        if _is_tempo_or_prob(line):
286            continue
287        last_fm_line = i
288        # Search remaining lines to fill empty end or previous for empty start
289        for target, other, direc in (("start", "end", -1), ("end", "start", 1)):
290            target += "_time"  # noqa: PLW2901
291            if not getattr(line, target):
292                setattr(line, target, _get_next_time(lines[i::direc][1:], f"{other}_time"))
293    # Special case for final forcast
294    if last_fm_line:
295        lines[last_fm_line].end_time = end
296    # Reset original end time if still empty
297    if lines and not lines[0].end_time:
298        lines[0].end_time = end
299    return lines

Fix any missing time issues except for error/empty lines.

def get_wind_shear(data: list[str]) -> tuple[list[str], str | None]:
302def get_wind_shear(data: list[str]) -> tuple[list[str], str | None]:
303    """Return the report list and the remove wind shear."""
304    shear = None
305    for i, item in reversed(list(enumerate(data))):
306        if len(item) > 6 and item.startswith("WS") and item[5] == "/":
307            shear = data.pop(i).replace("KT", "")
308    return data, shear

Return the report list and the remove wind shear.

def get_temp_min_and_max(data: list[str]) -> tuple[list[str], str | None, str | None]:
311def get_temp_min_and_max(
312    data: list[str],
313) -> tuple[list[str], str | None, str | None]:
314    """Pull out Max temp at time and Min temp at time items from wx list."""
315    temp_max, temp_min = "", ""
316    for i, item in reversed(list(enumerate(data))):
317        if len(item) > 6 and item[0] == "T" and "/" in item:
318            # TX12/1316Z
319            if item[1] == "X":
320                temp_max = data.pop(i)
321            # TNM03/1404Z
322            elif item[1] == "N":
323                temp_min = data.pop(i)
324            # TM03/1404Z T12/1316Z -> Will fix TN/TX
325            elif item[1] == "M" or item[1].isdigit():
326                if temp_min:
327                    if int(temp_min[2 : temp_min.find("/")].replace("M", "-")) > int(
328                        item[1 : item.find("/")].replace("M", "-")
329                    ):
330                        temp_max, temp_min = f"TX{temp_min[2:]}", f"TN{item[1:]}"
331                    else:
332                        temp_max = f"TX{item[1:]}"
333                else:
334                    temp_min = f"TN{item[1:]}"
335                data.pop(i)
336    return data, temp_max or None, temp_min or None

Pull out Max temp at time and Min temp at time items from wx list.

def get_oceania_temp_and_alt(data: list[str]) -> tuple[list[str], list[str], list[str]]:
339def get_oceania_temp_and_alt(data: list[str]) -> tuple[list[str], list[str], list[str]]:
340    """Get Temperature and Altimeter lists for Oceania TAFs."""
341    tlist: list[str] = []
342    qlist: list[str] = []
343    if "T" in data:
344        data, tlist = core.get_digit_list(data, data.index("T"))
345    if "Q" in data:
346        data, qlist = core.get_digit_list(data, data.index("Q"))
347    return data, tlist, qlist

Get Temperature and Altimeter lists for Oceania TAFs.

def get_taf_flight_rules(lines: list[avwx.structs.TafLineData]) -> list[avwx.structs.TafLineData]:
350def get_taf_flight_rules(lines: list[TafLineData]) -> list[TafLineData]:
351    """Get flight rules by looking for missing data in prior reports."""
352    for i, line in enumerate(lines):
353        temp_vis, temp_cloud, is_clear = line.visibility, line.clouds, False
354        for report in reversed(lines[: i + 1]):
355            if not _is_tempo_or_prob(report):
356                if not temp_vis:
357                    temp_vis = report.visibility
358                # SKC or CLR should force no clouds instead of looking back
359                if "SKC" in report.other or "CLR" in report.other or temp_vis and temp_vis.repr == "CAVOK":
360                    is_clear = True
361                elif temp_cloud == []:
362                    temp_cloud = report.clouds
363                if temp_vis and temp_cloud != []:
364                    break
365        if is_clear:
366            temp_cloud = []
367        line.flight_rules = FLIGHT_RULES[core.get_flight_rules(temp_vis, core.get_ceiling(temp_cloud))]
368    return lines

Get flight rules by looking for missing data in prior reports.

def fix_report_header(report: str) -> str:
371def fix_report_header(report: str) -> str:
372    """Correct the header order for key elements."""
373    split_report = report.split()
374
375    # Limit scope to only the first few elements. Remarks may include similar tokens
376    header_length = min(len(split_report), 6)
377    headers = split_report[:header_length]
378
379    fixed_headers = []
380    for target in ("TAF", "AMD", "COR"):
381        with suppress(ValueError):
382            headers.remove(target)
383            fixed_headers.append(target)
384
385    return " ".join(fixed_headers + headers + split_report[header_length:])

Correct the header order for key elements.

def parse( station: str, report: str, issued: datetime.date | None = None) -> tuple[avwx.structs.TafData | None, avwx.structs.Units | None, avwx.structs.Sanitization | None]:
399def parse(
400    station: str, report: str, issued: date | None = None
401) -> tuple[TafData | None, Units | None, Sanitization | None]:
402    """Return TafData and Units dataclasses with parsed data and their associated units."""
403    if not report:
404        return None, None, None
405    valid_station(station)
406    report = fix_report_header(report)
407    is_amended, is_correction = False, False
408    while len(report) > 3 and report[:4] in ("TAF ", "AMD ", "COR "):
409        if report[:3] == "AMD":
410            is_amended = True
411        elif report[:3] == "COR":
412            is_correction = True
413        report = report[4:]
414    start_time: Timestamp | None = None
415    end_time: Timestamp | None = None
416    sans = Sanitization()
417    sanitized = clean_taf_string(report, sans)
418    _, new_station, time = core.get_station_and_time(sanitized[:20].split())
419    if new_station is not None:
420        station = new_station
421    sanitized = sanitized.replace(station, "")
422    if time:
423        sanitized = sanitized.replace(time, "").strip()
424    units = Units.north_american() if uses_na_format(station) else Units.international()
425    # Find and remove remarks
426    sanitized, remarks = get_taf_remarks(sanitized)
427    if remarks.startswith("AMD"):
428        is_amended = True
429    # Split and parse each line
430    lines = split_taf(sanitized)
431    parsed_lines = parse_lines(lines, units, sans, issued)
432    # Perform additional info extract and corrections
433    max_temp: str | None = None
434    min_temp: str | None = None
435    if parsed_lines:
436        (
437            parsed_lines[-1].other,
438            max_temp,
439            min_temp,
440        ) = get_temp_min_and_max(parsed_lines[-1].other)
441        if not (max_temp or min_temp):
442            (
443                parsed_lines[0].other,
444                max_temp,
445                min_temp,
446            ) = get_temp_min_and_max(parsed_lines[0].other)
447        # Set start and end times based on the first line
448        start_time, end_time = parsed_lines[0].start_time, parsed_lines[0].end_time
449        parsed_lines[0].end_time = None
450        parsed_lines = find_missing_taf_times(parsed_lines, start_time, end_time)
451        parsed_lines = get_taf_flight_rules(parsed_lines)
452    # Extract Oceania-specific data
453    alts: list[str] | None = None
454    temps: list[str] | None = None
455    if station[0] == "A":
456        (
457            parsed_lines[-1].other,
458            alts,
459            temps,
460        ) = get_oceania_temp_and_alt(parsed_lines[-1].other)
461    # Convert wx codes
462    for line in parsed_lines:
463        line.other, line.wx_codes = get_wx_codes(line.other)
464    sanitized = " ".join(i for i in (station, time, sanitized) if i)
465    struct = TafData(
466        raw=report,
467        sanitized=sanitized,
468        station=station,
469        time=core.make_timestamp(time, target_date=issued),
470        remarks=remarks,
471        remarks_info=parse_remarks(remarks),
472        forecast=parsed_lines,
473        start_time=start_time,
474        end_time=end_time,
475        is_amended=is_amended,
476        is_correction=is_correction,
477        max_temp=max_temp,
478        min_temp=min_temp,
479        alts=alts,
480        temps=temps,
481    )
482    return struct, units, sans

Return TafData and Units dataclasses with parsed data and their associated units.

def parse_lines( lines: list[str], units: avwx.structs.Units, sans: avwx.structs.Sanitization, issued: datetime.date | None = None) -> list[avwx.structs.TafLineData]:
485def parse_lines(lines: list[str], units: Units, sans: Sanitization, issued: date | None = None) -> list[TafLineData]:
486    """Return a list of parsed line dictionaries."""
487    parsed_lines: list[TafLineData] = []
488    prob = ""
489    while lines:
490        raw_line = lines[0].strip()
491        line = sanitize_line(raw_line, sans)
492        # Remove prob from the beginning of a line
493        if line.startswith("PROB"):
494            # Add standalone prob to next line
495            if len(line) == 6:
496                prob = line
497                line = ""
498            # Add to current line
499            elif len(line) > 6:
500                prob = line[:6]
501                line = line[6:].strip()
502        if line:
503            parsed_line = parse_line(line, units, sans, issued)
504            parsed_line.probability = None if " " in prob else core.make_number(prob[4:])
505            parsed_line.raw = raw_line
506            if prob:
507                parsed_line.sanitized = f"{prob} {parsed_line.sanitized}"
508            prob = ""
509            parsed_lines.append(parsed_line)
510        lines.pop(0)
511    return parsed_lines

Return a list of parsed line dictionaries.

def parse_line( line: str, units: avwx.structs.Units, sans: avwx.structs.Sanitization, issued: datetime.date | None = None) -> avwx.structs.TafLineData:
514def parse_line(line: str, units: Units, sans: Sanitization, issued: date | None = None) -> TafLineData:
515    """Parser for the International TAF forcast variant."""
516    data: list[str] = core.dedupe(line.split())
517    # Grab original time piece under certain conditions to preserve a useful slash
518    old_time = data[1] if len(data) > 1 and _is_possible_start_end_time_slash(data[1]) else None
519    data = clean_taf_list(data, sans)
520    if old_time and len(data) > 1 and data[1] == old_time.strip("/"):
521        data[1] = old_time
522    sanitized = " ".join(data)
523    data, report_type, start_time, end_time, transition = get_type_and_times(data)
524    data, wind_shear = get_wind_shear(data)
525    (
526        data,
527        wind_direction,
528        wind_speed,
529        wind_gust,
530        wind_variable_direction,
531    ) = core.get_wind(data, units)
532    if "CAVOK" in data:
533        visibility = core.make_number("CAVOK")
534        clouds: list[Cloud] = []
535        data.pop(data.index("CAVOK"))
536    else:
537        data, visibility = core.get_visibility(data, units)
538        data, clouds = core.get_clouds(data)
539    other, altimeter, icing, turbulence = get_alt_ice_turb(data)
540    return TafLineData(
541        altimeter=altimeter,
542        clouds=clouds,
543        flight_rules="",
544        other=other,
545        visibility=visibility,
546        wind_direction=wind_direction,
547        wind_gust=wind_gust,
548        wind_speed=wind_speed,
549        wx_codes=[],
550        end_time=core.make_timestamp(end_time, target_date=issued),
551        icing=icing,
552        probability=None,
553        raw=line,
554        sanitized=sanitized,
555        start_time=core.make_timestamp(start_time, target_date=issued),
556        transition_start=core.make_timestamp(transition, target_date=issued),
557        turbulence=turbulence,
558        type=report_type,
559        wind_shear=wind_shear,
560        wind_variable_direction=wind_variable_direction,
561    )

Parser for the International TAF forcast variant.