avwx.current.taf

A TAF (Terminal Aerodrome Forecast) is a 24-hour weather forecast for the area 5 statute miles from the reporting station. They are update once every three or six hours or when significant changes warrant an update, and the observations are valid for six hours or until the next report is issued

  1"""
  2A TAF (Terminal Aerodrome Forecast) is a 24-hour weather forecast for the area
  35 statute miles from the reporting station. They are update once every three or
  4six hours or when significant changes warrant an update, and the observations
  5are valid for six hours or until the next report is issued
  6"""
  7
  8# stdlib
  9from __future__ import annotations
 10
 11from contextlib import suppress
 12from typing import TYPE_CHECKING
 13
 14# module
 15from avwx.current.base import Report, get_wx_codes
 16from avwx.parsing import core, speech, summary
 17from avwx.parsing.remarks import parse as parse_remarks
 18from avwx.parsing.sanitization.taf import clean_taf_list, clean_taf_string
 19from avwx.parsing.translate.taf import translate_taf
 20from avwx.static.core import FLIGHT_RULES
 21from avwx.static.taf import TAF_NEWLINE, TAF_NEWLINE_STARTSWITH, TAF_RMK
 22from avwx.station import uses_na_format, valid_station
 23from avwx.structs import (
 24    Cloud,
 25    Number,
 26    Sanitization,
 27    TafData,
 28    TafLineData,
 29    TafTrans,
 30    Timestamp,
 31    Units,
 32)
 33
 34if TYPE_CHECKING:
 35    from datetime import date
 36
 37
 38class Taf(Report):
 39    """
 40    The Taf class offers an object-oriented approach to managing TAF data for a
 41    single station.
 42
 43    ```python
 44    >>> from avwx import Taf
 45    >>> kjfk = Taf("KJFK")
 46    >>> kjfk.station.name
 47    'John F Kennedy International Airport'
 48    >>> kjfk.update()
 49    True
 50    >>> kjfk.last_updated
 51    datetime.datetime(2018, 3, 4, 23, 43, 26, 209644, tzinfo=datetime.timezone.utc)
 52    >>> kjfk.raw
 53    'KJFK 042030Z 0421/0524 33016G27KT P6SM BKN045 FM051600 36016G22KT P6SM BKN040 FM052100 35013KT P6SM SCT035'
 54    >>> len(kjfk.data.forecast)
 55    3
 56    >>> kjfk.data.forecast[0].flight_rules
 57    'VFR'
 58    >>> kjfk.translations.forecast[0].wind
 59    'NNW-330 at 16kt gusting to 27kt'
 60    >>> kjfk.speech
 61    'Starting on March 4th - From 21 to 16 zulu, Winds three three zero at 16kt gusting to 27kt. Visibility greater than six miles. Broken layer at 4500ft. From 16 to 21 zulu, Winds three six zero at 16kt gusting to 22kt. Visibility greater than six miles. Broken layer at 4000ft. From 21 to midnight zulu, Winds three five zero at 13kt. Visibility greater than six miles. Scattered clouds at 3500ft'
 62    ```
 63
 64    The `parse` and `from_report` methods can parse a report string if you want
 65    to override the normal fetching process.
 66
 67    ```python
 68    >>> from avwx import Taf
 69    >>> report = "TAF ZYHB 082300Z 0823/0911 VRB03KT 9999 SCT018 BKN120 TX14/0907Z TN04/0921Z FM090100 09015KT 9999 -SHRA WS020/13045KT SCT018 BKN120 BECMG 0904/0906 34008KT PROB30 TEMPO 0906/0911 7000 -RA SCT020 650104 530804 RMK FCST BASED ON AUTO OBS. NXT FCST BY 090600Z"
 70    >>> zyhb = Taf.from_report(report)
 71    True
 72    >>> zyhb.station.city
 73    'Hulan'
 74    >>> zyhb.data.remarks
 75    'RMK FCST BASED ON AUTO OBS. NXT FCST BY 090600Z'
 76    >>> zyhb.summary[-1]
 77    'Vis 7km, Light Rain, Scattered clouds at 2000ft, Frequent moderate turbulence in clear air from 8000ft to 12000ft, Moderate icing in clouds from 1000ft to 5000ft'
 78    ```
 79    """
 80
 81    data: TafData | None = None
 82    translations: TafTrans | None = None  # type: ignore
 83
 84    async def _post_update(self) -> None:
 85        if self.code is None or self.raw is None:
 86            return
 87        self.data, self.units, self.sanitization = parse(self.code, self.raw, self.issued)
 88        if self.data is None or self.units is None:
 89            return
 90        self.translations = translate_taf(self.data, self.units)
 91
 92    def _post_parse(self) -> None:
 93        if self.code is None or self.raw is None:
 94            return
 95        self.data, self.units, self.sanitization = parse(self.code, self.raw, self.issued)
 96        if self.data is None or self.units is None:
 97            return
 98        self.translations = translate_taf(self.data, self.units)
 99
100    @property
101    def summary(self) -> list[str]:
102        """Condensed summary for each forecast created from translations."""
103        if not self.translations:
104            self.update()
105        if self.translations is None or self.translations.forecast is None:
106            return []
107        return [summary.taf(trans) for trans in self.translations.forecast]
108
109    @property
110    def speech(self) -> str | None:
111        """Report summary designed to be read by a text-to-speech program."""
112        if not self.data:
113            self.update()
114        if self.data is None or self.units is None:
115            return None
116        return speech.taf(self.data, self.units)
117
118
119LINE_FIXES = {
120    "TEMP0": "TEMPO",
121    "TEMP O": "TEMPO",
122    "TMPO": "TEMPO",
123    "TE MPO": "TEMPO",
124    "TEMP ": "TEMPO ",
125    "T EMPO": "TEMPO",
126    " EMPO": " TEMPO",
127    "TEMO": "TEMPO",
128    "BECM G": "BECMG",
129    "BEMCG": "BECMG",
130    "BE CMG": "BECMG",
131    "B ECMG": "BECMG",
132    " BEC ": " BECMG ",
133    "BCEMG": "BECMG",
134    "BEMG": "BECMG",
135}
136
137
138def sanitize_line(txt: str, sans: Sanitization) -> str:
139    """Fix common mistakes with 'new line' signifiers so that they can be recognized."""
140    for key, fix in LINE_FIXES.items():
141        if key in txt:
142            txt = txt.replace(key, fix)
143            sans.log(key, fix)
144    # Fix when space is missing following new line signifiers
145    for item in ["BECMG", "TEMPO"]:
146        if item in txt and f"{item} " not in txt:
147            index = txt.find(item) + len(item)
148            txt = f"{txt[:index]} {txt[index:]}"
149            sans.extra_spaces_needed = True
150    return txt
151
152
153def get_taf_remarks(txt: str) -> tuple[str, str]:
154    """Return report and remarks separated if found."""
155    remarks_start = core.find_first_in_list(txt, TAF_RMK)
156    if remarks_start == -1:
157        return txt, ""
158    remarks = txt[remarks_start:]
159    txt = txt[:remarks_start].strip()
160    return txt, remarks
161
162
163def get_alt_ice_turb(
164    data: list[str],
165) -> tuple[list[str], Number | None, list[str], list[str]]:
166    """Return the report list and removed: Altimeter string, Icing list, Turbulence list."""
167    altimeter_number = None
168    icing, turbulence = [], []
169    for i, item in reversed(list(enumerate(data))):
170        if len(item) > 6 and item.startswith("QNH") and item[3:7].isdigit():
171            altimeter = data.pop(i)[3:7]
172            if altimeter[0] in ("2", "3"):
173                altimeter = f"{altimeter[:2]}.{altimeter[2:]}"
174            altimeter_number = core.make_number(altimeter, literal=True)
175        elif item.isdigit():
176            if item[0] == "6":
177                icing.append(data.pop(i))
178            elif item[0] == "5":
179                turbulence.append(data.pop(i))
180    return data, altimeter_number, icing, turbulence
181
182
183def is_normal_time(item: str) -> bool:
184    """Return if the item looks like a valid TAF (1200/1400) time range."""
185    return len(item) == 9 and item[4] == "/" and item[:4].isdigit() and item[5:].isdigit()
186
187
188def starts_new_line(item: str) -> bool:
189    """Returns True if the given element should start a new report line"""
190    if item in TAF_NEWLINE:
191        return True
192    return any(item.startswith(start) for start in TAF_NEWLINE_STARTSWITH)
193
194
195def split_taf(txt: str) -> list[str]:
196    """Split a TAF report into each distinct time period."""
197    lines = []
198    split = txt.split()
199    last_index = 0
200    e_splits = enumerate(split)
201    next(e_splits)
202    for i, item in e_splits:
203        if (starts_new_line(item) and not split[i - 1].startswith("PROB")) or (
204            is_normal_time(item) and not starts_new_line(split[i - 1])
205        ):
206            lines.append(" ".join(split[last_index:i]))
207            last_index = i
208    lines.append(" ".join(split[last_index:]))
209    return lines
210
211
212# TAF line report type and start/end times
213def get_type_and_times(
214    data: list[str],
215) -> tuple[list[str], str, str | None, str | None, str | None]:
216    """Extract the report type string, start time string, and end time string."""
217    report_type, start_time, end_time, transition = "FROM", None, None, None
218    # TEMPO, BECMG, INTER
219    if data and data[0] in TAF_NEWLINE or len(data[0]) == 6 and data[0].startswith("PROB"):
220        report_type = data.pop(0)
221    if data:
222        item, length = data[0], len(data[0])
223        # 1200/1306
224        if is_normal_time(item):
225            start_time, end_time = data.pop(0).split("/")
226
227        # 1200 1306
228        elif len(data) == 8 and length == 4 and len(data[1]) == 4 and item.isdigit() and data[1].isdigit():
229            start_time = data.pop(0)
230            end_time = data.pop(0)
231
232        # 120000
233        elif length == 6 and item.isdigit() and item[-2:] == "00":
234            start_time = data.pop(0)[:4]
235        # FM120000
236        elif length > 7 and item.startswith("FM"):
237            report_type = "FROM"
238            if "/" in item and item[2:].split("/")[0].isdigit() and item[2:].split("/")[1].isdigit():
239                start_time, end_time = data.pop(0)[2:].split("/")
240            elif item[2:8].isdigit():
241                start_time = data.pop(0)[2:6]
242            # TL120600
243            if data and length > 7 and data[0].startswith("TL") and data[0][2:8].isdigit():
244                end_time = data.pop(0)[2:6]
245        elif report_type == "BECMG" and length == 5:
246            # 1200/
247            if item[-1] == "/" and item[:4].isdigit():
248                start_time = data.pop(0)[:4]
249            # /1200
250            elif item[0] == "/" and item[1:].isdigit():
251                end_time = data.pop(0)[1:]
252    if report_type == "BECMG":
253        transition, start_time, end_time = start_time, end_time, None
254    return data, report_type, start_time, end_time, transition
255
256
257def _is_tempo_or_prob(line: TafLineData) -> bool:
258    """Return True if report type is TEMPO or non-null probability."""
259    return line.type == "TEMPO" or line.probability is not None
260
261
262def _get_next_time(lines: list[TafLineData], target: str) -> Timestamp | None:
263    """Returns the next normal time target value or empty"""
264    for line in lines:
265        if _is_tempo_or_prob(line):
266            continue
267        time = line.transition_start or getattr(line, target) if target == "start_time" else getattr(line, target)
268        if time:
269            return time  # type: ignore
270    return None
271
272
273def find_missing_taf_times(
274    lines: list[TafLineData], start: Timestamp | None, end: Timestamp | None
275) -> list[TafLineData]:
276    """Fix any missing time issues except for error/empty lines."""
277    if not lines:
278        return lines
279    # Assign start time
280    lines[0].start_time = start
281    # Fix other times
282    last_fm_line = 0
283    for i, line in enumerate(lines):
284        if _is_tempo_or_prob(line):
285            continue
286        last_fm_line = i
287        # Search remaining lines to fill empty end or previous for empty start
288        for target, other, direc in (("start", "end", -1), ("end", "start", 1)):
289            target += "_time"  # noqa: PLW2901
290            if not getattr(line, target):
291                setattr(line, target, _get_next_time(lines[i::direc][1:], f"{other}_time"))
292    # Special case for final forcast
293    if last_fm_line:
294        lines[last_fm_line].end_time = end
295    # Reset original end time if still empty
296    if lines and not lines[0].end_time:
297        lines[0].end_time = end
298    return lines
299
300
301def get_wind_shear(data: list[str]) -> tuple[list[str], str | None]:
302    """Return the report list and the remove wind shear."""
303    shear = None
304    for i, item in reversed(list(enumerate(data))):
305        if len(item) > 6 and item.startswith("WS") and item[5] == "/":
306            shear = data.pop(i).replace("KT", "")
307    return data, shear
308
309
310def get_temp_min_and_max(
311    data: list[str],
312) -> tuple[list[str], str | None, str | None]:
313    """Pull out Max temp at time and Min temp at time items from wx list."""
314    temp_max, temp_min = "", ""
315    for i, item in reversed(list(enumerate(data))):
316        if len(item) > 6 and item[0] == "T" and "/" in item:
317            # TX12/1316Z
318            if item[1] == "X":
319                temp_max = data.pop(i)
320            # TNM03/1404Z
321            elif item[1] == "N":
322                temp_min = data.pop(i)
323            # TM03/1404Z T12/1316Z -> Will fix TN/TX
324            elif item[1] == "M" or item[1].isdigit():
325                if temp_min:
326                    if int(temp_min[2 : temp_min.find("/")].replace("M", "-")) > int(
327                        item[1 : item.find("/")].replace("M", "-")
328                    ):
329                        temp_max, temp_min = f"TX{temp_min[2:]}", f"TN{item[1:]}"
330                    else:
331                        temp_max = f"TX{item[1:]}"
332                else:
333                    temp_min = f"TN{item[1:]}"
334                data.pop(i)
335    return data, temp_max or None, temp_min or None
336
337
338def get_oceania_temp_and_alt(data: list[str]) -> tuple[list[str], list[str], list[str]]:
339    """Get Temperature and Altimeter lists for Oceania TAFs."""
340    tlist: list[str] = []
341    qlist: list[str] = []
342    if "T" in data:
343        data, tlist = core.get_digit_list(data, data.index("T"))
344    if "Q" in data:
345        data, qlist = core.get_digit_list(data, data.index("Q"))
346    return data, tlist, qlist
347
348
349def get_taf_flight_rules(lines: list[TafLineData]) -> list[TafLineData]:
350    """Get flight rules by looking for missing data in prior reports."""
351    for i, line in enumerate(lines):
352        temp_vis, temp_cloud, is_clear = line.visibility, line.clouds, False
353        for report in reversed(lines[: i + 1]):
354            if not _is_tempo_or_prob(report):
355                if not temp_vis:
356                    temp_vis = report.visibility
357                # SKC or CLR should force no clouds instead of looking back
358                if "SKC" in report.other or "CLR" in report.other or temp_vis and temp_vis.repr == "CAVOK":
359                    is_clear = True
360                elif temp_cloud == []:
361                    temp_cloud = report.clouds
362                if temp_vis and temp_cloud != []:
363                    break
364        if is_clear:
365            temp_cloud = []
366        line.flight_rules = FLIGHT_RULES[core.get_flight_rules(temp_vis, core.get_ceiling(temp_cloud))]
367    return lines
368
369
370def fix_report_header(report: str) -> str:
371    """Correct the header order for key elements."""
372    split_report = report.split()
373
374    # Limit scope to only the first few elements. Remarks may include similar tokens
375    header_length = min(len(split_report), 6)
376    headers = split_report[:header_length]
377
378    fixed_headers = []
379    for target in ("TAF", "AMD", "COR"):
380        with suppress(ValueError):
381            headers.remove(target)
382            fixed_headers.append(target)
383
384    return " ".join(fixed_headers + headers + split_report[header_length:])
385
386
387def _is_possible_start_end_time_slash(item: str) -> bool:
388    """Return True if item is a possible period start or end with missing element."""
389    return len(item) == 5 and (
390        # 1200/
391        (item[-1] == "/" and item[:4].isdigit())
392        or
393        # /1200
394        (item[0] == "/" and item[1:].isdigit())
395    )
396
397
398def parse(
399    station: str, report: str, issued: date | None = None
400) -> tuple[TafData | None, Units | None, Sanitization | None]:
401    """Return TafData and Units dataclasses with parsed data and their associated units."""
402    if not report:
403        return None, None, None
404    valid_station(station)
405    report = fix_report_header(report)
406    while len(report) > 3 and report[:4] in ("TAF ", "AMD ", "COR "):
407        report = report[4:]
408    start_time: Timestamp | None = None
409    end_time: Timestamp | None = None
410    sans = Sanitization()
411    sanitized = clean_taf_string(report, sans)
412    _, new_station, time = core.get_station_and_time(sanitized[:20].split())
413    if new_station is not None:
414        station = new_station
415    sanitized = sanitized.replace(station, "")
416    if time:
417        sanitized = sanitized.replace(time, "").strip()
418    units = Units.north_american() if uses_na_format(station) else Units.international()
419    # Find and remove remarks
420    sanitized, remarks = get_taf_remarks(sanitized)
421    # Split and parse each line
422    lines = split_taf(sanitized)
423    parsed_lines = parse_lines(lines, units, sans, issued)
424    # Perform additional info extract and corrections
425    max_temp: str | None = None
426    min_temp: str | None = None
427    if parsed_lines:
428        (
429            parsed_lines[-1].other,
430            max_temp,
431            min_temp,
432        ) = get_temp_min_and_max(parsed_lines[-1].other)
433        if not (max_temp or min_temp):
434            (
435                parsed_lines[0].other,
436                max_temp,
437                min_temp,
438            ) = get_temp_min_and_max(parsed_lines[0].other)
439        # Set start and end times based on the first line
440        start_time, end_time = parsed_lines[0].start_time, parsed_lines[0].end_time
441        parsed_lines[0].end_time = None
442        parsed_lines = find_missing_taf_times(parsed_lines, start_time, end_time)
443        parsed_lines = get_taf_flight_rules(parsed_lines)
444    # Extract Oceania-specific data
445    alts: list[str] | None = None
446    temps: list[str] | None = None
447    if station[0] == "A":
448        (
449            parsed_lines[-1].other,
450            alts,
451            temps,
452        ) = get_oceania_temp_and_alt(parsed_lines[-1].other)
453    # Convert wx codes
454    for line in parsed_lines:
455        line.other, line.wx_codes = get_wx_codes(line.other)
456    sanitized = " ".join(i for i in (station, time, sanitized) if i)
457    struct = TafData(
458        raw=report,
459        sanitized=sanitized,
460        station=station,
461        time=core.make_timestamp(time, target_date=issued),
462        remarks=remarks,
463        remarks_info=parse_remarks(remarks),
464        forecast=parsed_lines,
465        start_time=start_time,
466        end_time=end_time,
467        max_temp=max_temp,
468        min_temp=min_temp,
469        alts=alts,
470        temps=temps,
471    )
472    return struct, units, sans
473
474
475def parse_lines(lines: list[str], units: Units, sans: Sanitization, issued: date | None = None) -> list[TafLineData]:
476    """Return a list of parsed line dictionaries."""
477    parsed_lines: list[TafLineData] = []
478    prob = ""
479    while lines:
480        raw_line = lines[0].strip()
481        line = sanitize_line(raw_line, sans)
482        # Remove prob from the beginning of a line
483        if line.startswith("PROB"):
484            # Add standalone prob to next line
485            if len(line) == 6:
486                prob = line
487                line = ""
488            # Add to current line
489            elif len(line) > 6:
490                prob = line[:6]
491                line = line[6:].strip()
492        if line:
493            parsed_line = parse_line(line, units, sans, issued)
494            parsed_line.probability = None if " " in prob else core.make_number(prob[4:])
495            parsed_line.raw = raw_line
496            if prob:
497                parsed_line.sanitized = f"{prob} {parsed_line.sanitized}"
498            prob = ""
499            parsed_lines.append(parsed_line)
500        lines.pop(0)
501    return parsed_lines
502
503
504def parse_line(line: str, units: Units, sans: Sanitization, issued: date | None = None) -> TafLineData:
505    """Parser for the International TAF forcast variant."""
506    data: list[str] = core.dedupe(line.split())
507    # Grab original time piece under certain conditions to preserve a useful slash
508    old_time = data[1] if len(data) > 1 and _is_possible_start_end_time_slash(data[1]) else None
509    data = clean_taf_list(data, sans)
510    if old_time and len(data) > 1 and data[1] == old_time.strip("/"):
511        data[1] = old_time
512    sanitized = " ".join(data)
513    data, report_type, start_time, end_time, transition = get_type_and_times(data)
514    data, wind_shear = get_wind_shear(data)
515    (
516        data,
517        wind_direction,
518        wind_speed,
519        wind_gust,
520        wind_variable_direction,
521    ) = core.get_wind(data, units)
522    if "CAVOK" in data:
523        visibility = core.make_number("CAVOK")
524        clouds: list[Cloud] = []
525        data.pop(data.index("CAVOK"))
526    else:
527        data, visibility = core.get_visibility(data, units)
528        data, clouds = core.get_clouds(data)
529    other, altimeter, icing, turbulence = get_alt_ice_turb(data)
530    return TafLineData(
531        altimeter=altimeter,
532        clouds=clouds,
533        flight_rules="",
534        other=other,
535        visibility=visibility,
536        wind_direction=wind_direction,
537        wind_gust=wind_gust,
538        wind_speed=wind_speed,
539        wx_codes=[],
540        end_time=core.make_timestamp(end_time, target_date=issued),
541        icing=icing,
542        probability=None,
543        raw=line,
544        sanitized=sanitized,
545        start_time=core.make_timestamp(start_time, target_date=issued),
546        transition_start=core.make_timestamp(transition, target_date=issued),
547        turbulence=turbulence,
548        type=report_type,
549        wind_shear=wind_shear,
550        wind_variable_direction=wind_variable_direction,
551    )
class Taf(avwx.current.base.Report):
 39class Taf(Report):
 40    """
 41    The Taf class offers an object-oriented approach to managing TAF data for a
 42    single station.
 43
 44    ```python
 45    >>> from avwx import Taf
 46    >>> kjfk = Taf("KJFK")
 47    >>> kjfk.station.name
 48    'John F Kennedy International Airport'
 49    >>> kjfk.update()
 50    True
 51    >>> kjfk.last_updated
 52    datetime.datetime(2018, 3, 4, 23, 43, 26, 209644, tzinfo=datetime.timezone.utc)
 53    >>> kjfk.raw
 54    'KJFK 042030Z 0421/0524 33016G27KT P6SM BKN045 FM051600 36016G22KT P6SM BKN040 FM052100 35013KT P6SM SCT035'
 55    >>> len(kjfk.data.forecast)
 56    3
 57    >>> kjfk.data.forecast[0].flight_rules
 58    'VFR'
 59    >>> kjfk.translations.forecast[0].wind
 60    'NNW-330 at 16kt gusting to 27kt'
 61    >>> kjfk.speech
 62    'Starting on March 4th - From 21 to 16 zulu, Winds three three zero at 16kt gusting to 27kt. Visibility greater than six miles. Broken layer at 4500ft. From 16 to 21 zulu, Winds three six zero at 16kt gusting to 22kt. Visibility greater than six miles. Broken layer at 4000ft. From 21 to midnight zulu, Winds three five zero at 13kt. Visibility greater than six miles. Scattered clouds at 3500ft'
 63    ```
 64
 65    The `parse` and `from_report` methods can parse a report string if you want
 66    to override the normal fetching process.
 67
 68    ```python
 69    >>> from avwx import Taf
 70    >>> report = "TAF ZYHB 082300Z 0823/0911 VRB03KT 9999 SCT018 BKN120 TX14/0907Z TN04/0921Z FM090100 09015KT 9999 -SHRA WS020/13045KT SCT018 BKN120 BECMG 0904/0906 34008KT PROB30 TEMPO 0906/0911 7000 -RA SCT020 650104 530804 RMK FCST BASED ON AUTO OBS. NXT FCST BY 090600Z"
 71    >>> zyhb = Taf.from_report(report)
 72    True
 73    >>> zyhb.station.city
 74    'Hulan'
 75    >>> zyhb.data.remarks
 76    'RMK FCST BASED ON AUTO OBS. NXT FCST BY 090600Z'
 77    >>> zyhb.summary[-1]
 78    'Vis 7km, Light Rain, Scattered clouds at 2000ft, Frequent moderate turbulence in clear air from 8000ft to 12000ft, Moderate icing in clouds from 1000ft to 5000ft'
 79    ```
 80    """
 81
 82    data: TafData | None = None
 83    translations: TafTrans | None = None  # type: ignore
 84
 85    async def _post_update(self) -> None:
 86        if self.code is None or self.raw is None:
 87            return
 88        self.data, self.units, self.sanitization = parse(self.code, self.raw, self.issued)
 89        if self.data is None or self.units is None:
 90            return
 91        self.translations = translate_taf(self.data, self.units)
 92
 93    def _post_parse(self) -> None:
 94        if self.code is None or self.raw is None:
 95            return
 96        self.data, self.units, self.sanitization = parse(self.code, self.raw, self.issued)
 97        if self.data is None or self.units is None:
 98            return
 99        self.translations = translate_taf(self.data, self.units)
100
101    @property
102    def summary(self) -> list[str]:
103        """Condensed summary for each forecast created from translations."""
104        if not self.translations:
105            self.update()
106        if self.translations is None or self.translations.forecast is None:
107            return []
108        return [summary.taf(trans) for trans in self.translations.forecast]
109
110    @property
111    def speech(self) -> str | None:
112        """Report summary designed to be read by a text-to-speech program."""
113        if not self.data:
114            self.update()
115        if self.data is None or self.units is None:
116            return None
117        return speech.taf(self.data, self.units)

The Taf class offers an object-oriented approach to managing TAF data for a single station.

>>> from avwx import Taf
>>> kjfk = Taf("KJFK")
>>> kjfk.station.name
'John F Kennedy International Airport'
>>> kjfk.update()
True
>>> kjfk.last_updated
datetime.datetime(2018, 3, 4, 23, 43, 26, 209644, tzinfo=datetime.timezone.utc)
>>> kjfk.raw
'KJFK 042030Z 0421/0524 33016G27KT P6SM BKN045 FM051600 36016G22KT P6SM BKN040 FM052100 35013KT P6SM SCT035'
>>> len(kjfk.data.forecast)
3
>>> kjfk.data.forecast[0].flight_rules
'VFR'
>>> kjfk.translations.forecast[0].wind
'NNW-330 at 16kt gusting to 27kt'
>>> kjfk.speech
'Starting on March 4th - From 21 to 16 zulu, Winds three three zero at 16kt gusting to 27kt. Visibility greater than six miles. Broken layer at 4500ft. From 16 to 21 zulu, Winds three six zero at 16kt gusting to 22kt. Visibility greater than six miles. Broken layer at 4000ft. From 21 to midnight zulu, Winds three five zero at 13kt. Visibility greater than six miles. Scattered clouds at 3500ft'

The parse and from_report methods can parse a report string if you want to override the normal fetching process.

>>> from avwx import Taf
>>> report = "TAF ZYHB 082300Z 0823/0911 VRB03KT 9999 SCT018 BKN120 TX14/0907Z TN04/0921Z FM090100 09015KT 9999 -SHRA WS020/13045KT SCT018 BKN120 BECMG 0904/0906 34008KT PROB30 TEMPO 0906/0911 7000 -RA SCT020 650104 530804 RMK FCST BASED ON AUTO OBS. NXT FCST BY 090600Z"
>>> zyhb = Taf.from_report(report)
True
>>> zyhb.station.city
'Hulan'
>>> zyhb.data.remarks
'RMK FCST BASED ON AUTO OBS. NXT FCST BY 090600Z'
>>> zyhb.summary[-1]
'Vis 7km, Light Rain, Scattered clouds at 2000ft, Frequent moderate turbulence in clear air from 8000ft to 12000ft, Moderate icing in clouds from 1000ft to 5000ft'
data: avwx.structs.TafData | None = None
translations: avwx.structs.TafTrans | None = None
summary: list[str]
101    @property
102    def summary(self) -> list[str]:
103        """Condensed summary for each forecast created from translations."""
104        if not self.translations:
105            self.update()
106        if self.translations is None or self.translations.forecast is None:
107            return []
108        return [summary.taf(trans) for trans in self.translations.forecast]

Condensed summary for each forecast created from translations.

speech: str | None
110    @property
111    def speech(self) -> str | None:
112        """Report summary designed to be read by a text-to-speech program."""
113        if not self.data:
114            self.update()
115        if self.data is None or self.units is None:
116            return None
117        return speech.taf(self.data, self.units)

Report summary designed to be read by a text-to-speech program.

LINE_FIXES = {'TEMP0': 'TEMPO', 'TEMP O': 'TEMPO', 'TMPO': 'TEMPO', 'TE MPO': 'TEMPO', 'TEMP ': 'TEMPO ', 'T EMPO': 'TEMPO', ' EMPO': ' TEMPO', 'TEMO': 'TEMPO', 'BECM G': 'BECMG', 'BEMCG': 'BECMG', 'BE CMG': 'BECMG', 'B ECMG': 'BECMG', ' BEC ': ' BECMG ', 'BCEMG': 'BECMG', 'BEMG': 'BECMG'}
def sanitize_line(txt: str, sans: avwx.structs.Sanitization) -> str:
139def sanitize_line(txt: str, sans: Sanitization) -> str:
140    """Fix common mistakes with 'new line' signifiers so that they can be recognized."""
141    for key, fix in LINE_FIXES.items():
142        if key in txt:
143            txt = txt.replace(key, fix)
144            sans.log(key, fix)
145    # Fix when space is missing following new line signifiers
146    for item in ["BECMG", "TEMPO"]:
147        if item in txt and f"{item} " not in txt:
148            index = txt.find(item) + len(item)
149            txt = f"{txt[:index]} {txt[index:]}"
150            sans.extra_spaces_needed = True
151    return txt

Fix common mistakes with 'new line' signifiers so that they can be recognized.

def get_taf_remarks(txt: str) -> tuple[str, str]:
154def get_taf_remarks(txt: str) -> tuple[str, str]:
155    """Return report and remarks separated if found."""
156    remarks_start = core.find_first_in_list(txt, TAF_RMK)
157    if remarks_start == -1:
158        return txt, ""
159    remarks = txt[remarks_start:]
160    txt = txt[:remarks_start].strip()
161    return txt, remarks

Return report and remarks separated if found.

def get_alt_ice_turb( data: list[str]) -> tuple[list[str], avwx.structs.Number | None, list[str], list[str]]:
164def get_alt_ice_turb(
165    data: list[str],
166) -> tuple[list[str], Number | None, list[str], list[str]]:
167    """Return the report list and removed: Altimeter string, Icing list, Turbulence list."""
168    altimeter_number = None
169    icing, turbulence = [], []
170    for i, item in reversed(list(enumerate(data))):
171        if len(item) > 6 and item.startswith("QNH") and item[3:7].isdigit():
172            altimeter = data.pop(i)[3:7]
173            if altimeter[0] in ("2", "3"):
174                altimeter = f"{altimeter[:2]}.{altimeter[2:]}"
175            altimeter_number = core.make_number(altimeter, literal=True)
176        elif item.isdigit():
177            if item[0] == "6":
178                icing.append(data.pop(i))
179            elif item[0] == "5":
180                turbulence.append(data.pop(i))
181    return data, altimeter_number, icing, turbulence

Return the report list and removed: Altimeter string, Icing list, Turbulence list.

def is_normal_time(item: str) -> bool:
184def is_normal_time(item: str) -> bool:
185    """Return if the item looks like a valid TAF (1200/1400) time range."""
186    return len(item) == 9 and item[4] == "/" and item[:4].isdigit() and item[5:].isdigit()

Return if the item looks like a valid TAF (1200/1400) time range.

def starts_new_line(item: str) -> bool:
189def starts_new_line(item: str) -> bool:
190    """Returns True if the given element should start a new report line"""
191    if item in TAF_NEWLINE:
192        return True
193    return any(item.startswith(start) for start in TAF_NEWLINE_STARTSWITH)

Returns True if the given element should start a new report line

def split_taf(txt: str) -> list[str]:
196def split_taf(txt: str) -> list[str]:
197    """Split a TAF report into each distinct time period."""
198    lines = []
199    split = txt.split()
200    last_index = 0
201    e_splits = enumerate(split)
202    next(e_splits)
203    for i, item in e_splits:
204        if (starts_new_line(item) and not split[i - 1].startswith("PROB")) or (
205            is_normal_time(item) and not starts_new_line(split[i - 1])
206        ):
207            lines.append(" ".join(split[last_index:i]))
208            last_index = i
209    lines.append(" ".join(split[last_index:]))
210    return lines

Split a TAF report into each distinct time period.

def get_type_and_times( data: list[str]) -> tuple[list[str], str, str | None, str | None, str | None]:
214def get_type_and_times(
215    data: list[str],
216) -> tuple[list[str], str, str | None, str | None, str | None]:
217    """Extract the report type string, start time string, and end time string."""
218    report_type, start_time, end_time, transition = "FROM", None, None, None
219    # TEMPO, BECMG, INTER
220    if data and data[0] in TAF_NEWLINE or len(data[0]) == 6 and data[0].startswith("PROB"):
221        report_type = data.pop(0)
222    if data:
223        item, length = data[0], len(data[0])
224        # 1200/1306
225        if is_normal_time(item):
226            start_time, end_time = data.pop(0).split("/")
227
228        # 1200 1306
229        elif len(data) == 8 and length == 4 and len(data[1]) == 4 and item.isdigit() and data[1].isdigit():
230            start_time = data.pop(0)
231            end_time = data.pop(0)
232
233        # 120000
234        elif length == 6 and item.isdigit() and item[-2:] == "00":
235            start_time = data.pop(0)[:4]
236        # FM120000
237        elif length > 7 and item.startswith("FM"):
238            report_type = "FROM"
239            if "/" in item and item[2:].split("/")[0].isdigit() and item[2:].split("/")[1].isdigit():
240                start_time, end_time = data.pop(0)[2:].split("/")
241            elif item[2:8].isdigit():
242                start_time = data.pop(0)[2:6]
243            # TL120600
244            if data and length > 7 and data[0].startswith("TL") and data[0][2:8].isdigit():
245                end_time = data.pop(0)[2:6]
246        elif report_type == "BECMG" and length == 5:
247            # 1200/
248            if item[-1] == "/" and item[:4].isdigit():
249                start_time = data.pop(0)[:4]
250            # /1200
251            elif item[0] == "/" and item[1:].isdigit():
252                end_time = data.pop(0)[1:]
253    if report_type == "BECMG":
254        transition, start_time, end_time = start_time, end_time, None
255    return data, report_type, start_time, end_time, transition

Extract the report type string, start time string, and end time string.

def find_missing_taf_times( lines: list[avwx.structs.TafLineData], start: avwx.structs.Timestamp | None, end: avwx.structs.Timestamp | None) -> list[avwx.structs.TafLineData]:
274def find_missing_taf_times(
275    lines: list[TafLineData], start: Timestamp | None, end: Timestamp | None
276) -> list[TafLineData]:
277    """Fix any missing time issues except for error/empty lines."""
278    if not lines:
279        return lines
280    # Assign start time
281    lines[0].start_time = start
282    # Fix other times
283    last_fm_line = 0
284    for i, line in enumerate(lines):
285        if _is_tempo_or_prob(line):
286            continue
287        last_fm_line = i
288        # Search remaining lines to fill empty end or previous for empty start
289        for target, other, direc in (("start", "end", -1), ("end", "start", 1)):
290            target += "_time"  # noqa: PLW2901
291            if not getattr(line, target):
292                setattr(line, target, _get_next_time(lines[i::direc][1:], f"{other}_time"))
293    # Special case for final forcast
294    if last_fm_line:
295        lines[last_fm_line].end_time = end
296    # Reset original end time if still empty
297    if lines and not lines[0].end_time:
298        lines[0].end_time = end
299    return lines

Fix any missing time issues except for error/empty lines.

def get_wind_shear(data: list[str]) -> tuple[list[str], str | None]:
302def get_wind_shear(data: list[str]) -> tuple[list[str], str | None]:
303    """Return the report list and the remove wind shear."""
304    shear = None
305    for i, item in reversed(list(enumerate(data))):
306        if len(item) > 6 and item.startswith("WS") and item[5] == "/":
307            shear = data.pop(i).replace("KT", "")
308    return data, shear

Return the report list and the remove wind shear.

def get_temp_min_and_max(data: list[str]) -> tuple[list[str], str | None, str | None]:
311def get_temp_min_and_max(
312    data: list[str],
313) -> tuple[list[str], str | None, str | None]:
314    """Pull out Max temp at time and Min temp at time items from wx list."""
315    temp_max, temp_min = "", ""
316    for i, item in reversed(list(enumerate(data))):
317        if len(item) > 6 and item[0] == "T" and "/" in item:
318            # TX12/1316Z
319            if item[1] == "X":
320                temp_max = data.pop(i)
321            # TNM03/1404Z
322            elif item[1] == "N":
323                temp_min = data.pop(i)
324            # TM03/1404Z T12/1316Z -> Will fix TN/TX
325            elif item[1] == "M" or item[1].isdigit():
326                if temp_min:
327                    if int(temp_min[2 : temp_min.find("/")].replace("M", "-")) > int(
328                        item[1 : item.find("/")].replace("M", "-")
329                    ):
330                        temp_max, temp_min = f"TX{temp_min[2:]}", f"TN{item[1:]}"
331                    else:
332                        temp_max = f"TX{item[1:]}"
333                else:
334                    temp_min = f"TN{item[1:]}"
335                data.pop(i)
336    return data, temp_max or None, temp_min or None

Pull out Max temp at time and Min temp at time items from wx list.

def get_oceania_temp_and_alt(data: list[str]) -> tuple[list[str], list[str], list[str]]:
339def get_oceania_temp_and_alt(data: list[str]) -> tuple[list[str], list[str], list[str]]:
340    """Get Temperature and Altimeter lists for Oceania TAFs."""
341    tlist: list[str] = []
342    qlist: list[str] = []
343    if "T" in data:
344        data, tlist = core.get_digit_list(data, data.index("T"))
345    if "Q" in data:
346        data, qlist = core.get_digit_list(data, data.index("Q"))
347    return data, tlist, qlist

Get Temperature and Altimeter lists for Oceania TAFs.

def get_taf_flight_rules(lines: list[avwx.structs.TafLineData]) -> list[avwx.structs.TafLineData]:
350def get_taf_flight_rules(lines: list[TafLineData]) -> list[TafLineData]:
351    """Get flight rules by looking for missing data in prior reports."""
352    for i, line in enumerate(lines):
353        temp_vis, temp_cloud, is_clear = line.visibility, line.clouds, False
354        for report in reversed(lines[: i + 1]):
355            if not _is_tempo_or_prob(report):
356                if not temp_vis:
357                    temp_vis = report.visibility
358                # SKC or CLR should force no clouds instead of looking back
359                if "SKC" in report.other or "CLR" in report.other or temp_vis and temp_vis.repr == "CAVOK":
360                    is_clear = True
361                elif temp_cloud == []:
362                    temp_cloud = report.clouds
363                if temp_vis and temp_cloud != []:
364                    break
365        if is_clear:
366            temp_cloud = []
367        line.flight_rules = FLIGHT_RULES[core.get_flight_rules(temp_vis, core.get_ceiling(temp_cloud))]
368    return lines

Get flight rules by looking for missing data in prior reports.

def fix_report_header(report: str) -> str:
371def fix_report_header(report: str) -> str:
372    """Correct the header order for key elements."""
373    split_report = report.split()
374
375    # Limit scope to only the first few elements. Remarks may include similar tokens
376    header_length = min(len(split_report), 6)
377    headers = split_report[:header_length]
378
379    fixed_headers = []
380    for target in ("TAF", "AMD", "COR"):
381        with suppress(ValueError):
382            headers.remove(target)
383            fixed_headers.append(target)
384
385    return " ".join(fixed_headers + headers + split_report[header_length:])

Correct the header order for key elements.

def parse( station: str, report: str, issued: datetime.date | None = None) -> tuple[avwx.structs.TafData | None, avwx.structs.Units | None, avwx.structs.Sanitization | None]:
399def parse(
400    station: str, report: str, issued: date | None = None
401) -> tuple[TafData | None, Units | None, Sanitization | None]:
402    """Return TafData and Units dataclasses with parsed data and their associated units."""
403    if not report:
404        return None, None, None
405    valid_station(station)
406    report = fix_report_header(report)
407    while len(report) > 3 and report[:4] in ("TAF ", "AMD ", "COR "):
408        report = report[4:]
409    start_time: Timestamp | None = None
410    end_time: Timestamp | None = None
411    sans = Sanitization()
412    sanitized = clean_taf_string(report, sans)
413    _, new_station, time = core.get_station_and_time(sanitized[:20].split())
414    if new_station is not None:
415        station = new_station
416    sanitized = sanitized.replace(station, "")
417    if time:
418        sanitized = sanitized.replace(time, "").strip()
419    units = Units.north_american() if uses_na_format(station) else Units.international()
420    # Find and remove remarks
421    sanitized, remarks = get_taf_remarks(sanitized)
422    # Split and parse each line
423    lines = split_taf(sanitized)
424    parsed_lines = parse_lines(lines, units, sans, issued)
425    # Perform additional info extract and corrections
426    max_temp: str | None = None
427    min_temp: str | None = None
428    if parsed_lines:
429        (
430            parsed_lines[-1].other,
431            max_temp,
432            min_temp,
433        ) = get_temp_min_and_max(parsed_lines[-1].other)
434        if not (max_temp or min_temp):
435            (
436                parsed_lines[0].other,
437                max_temp,
438                min_temp,
439            ) = get_temp_min_and_max(parsed_lines[0].other)
440        # Set start and end times based on the first line
441        start_time, end_time = parsed_lines[0].start_time, parsed_lines[0].end_time
442        parsed_lines[0].end_time = None
443        parsed_lines = find_missing_taf_times(parsed_lines, start_time, end_time)
444        parsed_lines = get_taf_flight_rules(parsed_lines)
445    # Extract Oceania-specific data
446    alts: list[str] | None = None
447    temps: list[str] | None = None
448    if station[0] == "A":
449        (
450            parsed_lines[-1].other,
451            alts,
452            temps,
453        ) = get_oceania_temp_and_alt(parsed_lines[-1].other)
454    # Convert wx codes
455    for line in parsed_lines:
456        line.other, line.wx_codes = get_wx_codes(line.other)
457    sanitized = " ".join(i for i in (station, time, sanitized) if i)
458    struct = TafData(
459        raw=report,
460        sanitized=sanitized,
461        station=station,
462        time=core.make_timestamp(time, target_date=issued),
463        remarks=remarks,
464        remarks_info=parse_remarks(remarks),
465        forecast=parsed_lines,
466        start_time=start_time,
467        end_time=end_time,
468        max_temp=max_temp,
469        min_temp=min_temp,
470        alts=alts,
471        temps=temps,
472    )
473    return struct, units, sans

Return TafData and Units dataclasses with parsed data and their associated units.

def parse_lines( lines: list[str], units: avwx.structs.Units, sans: avwx.structs.Sanitization, issued: datetime.date | None = None) -> list[avwx.structs.TafLineData]:
476def parse_lines(lines: list[str], units: Units, sans: Sanitization, issued: date | None = None) -> list[TafLineData]:
477    """Return a list of parsed line dictionaries."""
478    parsed_lines: list[TafLineData] = []
479    prob = ""
480    while lines:
481        raw_line = lines[0].strip()
482        line = sanitize_line(raw_line, sans)
483        # Remove prob from the beginning of a line
484        if line.startswith("PROB"):
485            # Add standalone prob to next line
486            if len(line) == 6:
487                prob = line
488                line = ""
489            # Add to current line
490            elif len(line) > 6:
491                prob = line[:6]
492                line = line[6:].strip()
493        if line:
494            parsed_line = parse_line(line, units, sans, issued)
495            parsed_line.probability = None if " " in prob else core.make_number(prob[4:])
496            parsed_line.raw = raw_line
497            if prob:
498                parsed_line.sanitized = f"{prob} {parsed_line.sanitized}"
499            prob = ""
500            parsed_lines.append(parsed_line)
501        lines.pop(0)
502    return parsed_lines

Return a list of parsed line dictionaries.

def parse_line( line: str, units: avwx.structs.Units, sans: avwx.structs.Sanitization, issued: datetime.date | None = None) -> avwx.structs.TafLineData:
505def parse_line(line: str, units: Units, sans: Sanitization, issued: date | None = None) -> TafLineData:
506    """Parser for the International TAF forcast variant."""
507    data: list[str] = core.dedupe(line.split())
508    # Grab original time piece under certain conditions to preserve a useful slash
509    old_time = data[1] if len(data) > 1 and _is_possible_start_end_time_slash(data[1]) else None
510    data = clean_taf_list(data, sans)
511    if old_time and len(data) > 1 and data[1] == old_time.strip("/"):
512        data[1] = old_time
513    sanitized = " ".join(data)
514    data, report_type, start_time, end_time, transition = get_type_and_times(data)
515    data, wind_shear = get_wind_shear(data)
516    (
517        data,
518        wind_direction,
519        wind_speed,
520        wind_gust,
521        wind_variable_direction,
522    ) = core.get_wind(data, units)
523    if "CAVOK" in data:
524        visibility = core.make_number("CAVOK")
525        clouds: list[Cloud] = []
526        data.pop(data.index("CAVOK"))
527    else:
528        data, visibility = core.get_visibility(data, units)
529        data, clouds = core.get_clouds(data)
530    other, altimeter, icing, turbulence = get_alt_ice_turb(data)
531    return TafLineData(
532        altimeter=altimeter,
533        clouds=clouds,
534        flight_rules="",
535        other=other,
536        visibility=visibility,
537        wind_direction=wind_direction,
538        wind_gust=wind_gust,
539        wind_speed=wind_speed,
540        wx_codes=[],
541        end_time=core.make_timestamp(end_time, target_date=issued),
542        icing=icing,
543        probability=None,
544        raw=line,
545        sanitized=sanitized,
546        start_time=core.make_timestamp(start_time, target_date=issued),
547        transition_start=core.make_timestamp(transition, target_date=issued),
548        turbulence=turbulence,
549        type=report_type,
550        wind_shear=wind_shear,
551        wind_variable_direction=wind_variable_direction,
552    )

Parser for the International TAF forcast variant.