avwx.current.taf

A TAF (Terminal Aerodrome Forecast) is a 24-hour weather forecast for the area 5 statute miles from the reporting station. They are update once every three or six hours or when significant changes warrant an update, and the observations are valid for six hours or until the next report is issued

  1"""
  2A TAF (Terminal Aerodrome Forecast) is a 24-hour weather forecast for the area
  35 statute miles from the reporting station. They are update once every three or
  4six hours or when significant changes warrant an update, and the observations
  5are valid for six hours or until the next report is issued
  6"""
  7
  8# stdlib
  9from contextlib import suppress
 10from datetime import date
 11from typing import List, Tuple, Optional
 12
 13# module
 14from avwx.current.base import Report, get_wx_codes
 15from avwx.parsing import core, speech, summary
 16from avwx.parsing.remarks import parse as parse_remarks
 17from avwx.parsing.sanitization.taf import clean_taf_list, clean_taf_string
 18from avwx.parsing.translate.taf import translate_taf
 19from avwx.static.core import FLIGHT_RULES
 20from avwx.static.taf import TAF_RMK, TAF_NEWLINE, TAF_NEWLINE_STARTSWITH
 21from avwx.station import uses_na_format, valid_station
 22from avwx.structs import (
 23    Cloud,
 24    Number,
 25    Sanitization,
 26    TafData,
 27    TafLineData,
 28    TafTrans,
 29    Timestamp,
 30    Units,
 31)
 32
 33
 34class Taf(Report):
 35    """
 36    The Taf class offers an object-oriented approach to managing TAF data for a
 37    single station.
 38
 39    ```python
 40    >>> from avwx import Taf
 41    >>> kjfk = Taf("KJFK")
 42    >>> kjfk.station.name
 43    'John F Kennedy International Airport'
 44    >>> kjfk.update()
 45    True
 46    >>> kjfk.last_updated
 47    datetime.datetime(2018, 3, 4, 23, 43, 26, 209644, tzinfo=datetime.timezone.utc)
 48    >>> kjfk.raw
 49    'KJFK 042030Z 0421/0524 33016G27KT P6SM BKN045 FM051600 36016G22KT P6SM BKN040 FM052100 35013KT P6SM SCT035'
 50    >>> len(kjfk.data.forecast)
 51    3
 52    >>> kjfk.data.forecast[0].flight_rules
 53    'VFR'
 54    >>> kjfk.translations.forecast[0].wind
 55    'NNW-330 at 16kt gusting to 27kt'
 56    >>> kjfk.speech
 57    'Starting on March 4th - From 21 to 16 zulu, Winds three three zero at 16kt gusting to 27kt. Visibility greater than six miles. Broken layer at 4500ft. From 16 to 21 zulu, Winds three six zero at 16kt gusting to 22kt. Visibility greater than six miles. Broken layer at 4000ft. From 21 to midnight zulu, Winds three five zero at 13kt. Visibility greater than six miles. Scattered clouds at 3500ft'
 58    ```
 59
 60    The `parse` and `from_report` methods can parse a report string if you want
 61    to override the normal fetching process.
 62
 63    ```python
 64    >>> from avwx import Taf
 65    >>> report = "TAF ZYHB 082300Z 0823/0911 VRB03KT 9999 SCT018 BKN120 TX14/0907Z TN04/0921Z FM090100 09015KT 9999 -SHRA WS020/13045KT SCT018 BKN120 BECMG 0904/0906 34008KT PROB30 TEMPO 0906/0911 7000 -RA SCT020 650104 530804 RMK FCST BASED ON AUTO OBS. NXT FCST BY 090600Z"
 66    >>> zyhb = Taf.from_report(report)
 67    True
 68    >>> zyhb.station.city
 69    'Hulan'
 70    >>> zyhb.data.remarks
 71    'RMK FCST BASED ON AUTO OBS. NXT FCST BY 090600Z'
 72    >>> zyhb.summary[-1]
 73    'Vis 7km, Light Rain, Scattered clouds at 2000ft, Frequent moderate turbulence in clear air from 8000ft to 12000ft, Moderate icing in clouds from 1000ft to 5000ft'
 74    ```
 75    """
 76
 77    data: Optional[TafData] = None
 78    translations: Optional[TafTrans] = None  # type: ignore
 79
 80    async def _post_update(self) -> None:
 81        if self.code is None or self.raw is None:
 82            return
 83        self.data, self.units, self.sanitization = parse(
 84            self.code, self.raw, self.issued
 85        )
 86        if self.data is None or self.units is None:
 87            return
 88        self.translations = translate_taf(self.data, self.units)
 89
 90    def _post_parse(self) -> None:
 91        if self.code is None or self.raw is None:
 92            return
 93        self.data, self.units, self.sanitization = parse(
 94            self.code, self.raw, self.issued
 95        )
 96        if self.data is None or self.units is None:
 97            return
 98        self.translations = translate_taf(self.data, self.units)
 99
100    @property
101    def summary(self) -> List[str]:
102        """Condensed summary for each forecast created from translations"""
103        if not self.translations:
104            self.update()
105        if self.translations is None or self.translations.forecast is None:
106            return []
107        return [summary.taf(trans) for trans in self.translations.forecast]
108
109    @property
110    def speech(self) -> Optional[str]:
111        """Report summary designed to be read by a text-to-speech program"""
112        if not self.data:
113            self.update()
114        if self.data is None or self.units is None:
115            return None
116        return speech.taf(self.data, self.units)
117
118
119LINE_FIXES = {
120    "TEMP0": "TEMPO",
121    "TEMP O": "TEMPO",
122    "TMPO": "TEMPO",
123    "TE MPO": "TEMPO",
124    "TEMP ": "TEMPO ",
125    "T EMPO": "TEMPO",
126    " EMPO": " TEMPO",
127    "TEMO": "TEMPO",
128    "BECM G": "BECMG",
129    "BEMCG": "BECMG",
130    "BE CMG": "BECMG",
131    "B ECMG": "BECMG",
132    " BEC ": " BECMG ",
133    "BCEMG": "BECMG",
134    "BEMG": "BECMG",
135}
136
137
138def sanitize_line(txt: str, sans: Sanitization) -> str:
139    """Fixes common mistakes with 'new line' signifiers so that they can be recognized"""
140    for key, fix in LINE_FIXES.items():
141        if key in txt:
142            txt = txt.replace(key, fix)
143            sans.log(key, fix)
144    # Fix when space is missing following new line signifiers
145    for item in ["BECMG", "TEMPO"]:
146        if item in txt and f"{item} " not in txt:
147            index = txt.find(item) + len(item)
148            txt = f"{txt[:index]} {txt[index:]}"
149            sans.extra_spaces_needed = True
150    return txt
151
152
153def get_taf_remarks(txt: str) -> Tuple[str, str]:
154    """Returns report and remarks separated if found"""
155    remarks_start = core.find_first_in_list(txt, TAF_RMK)
156    if remarks_start == -1:
157        return txt, ""
158    remarks = txt[remarks_start:]
159    txt = txt[:remarks_start].strip()
160    return txt, remarks
161
162
163def get_alt_ice_turb(
164    data: List[str],
165) -> Tuple[List[str], Optional[Number], List[str], List[str]]:
166    """Returns the report list and removed: Altimeter string, Icing list, Turbulence list"""
167    altimeter_number = None
168    icing, turbulence = [], []
169    for i, item in reversed(list(enumerate(data))):
170        if len(item) > 6 and item.startswith("QNH") and item[3:7].isdigit():
171            altimeter = data.pop(i)[3:7]
172            if altimeter[0] in ("2", "3"):
173                altimeter = f"{altimeter[:2]}.{altimeter[2:]}"
174            altimeter_number = core.make_number(altimeter, literal=True)
175        elif item.isdigit():
176            if item[0] == "6":
177                icing.append(data.pop(i))
178            elif item[0] == "5":
179                turbulence.append(data.pop(i))
180    return data, altimeter_number, icing, turbulence
181
182
183def starts_new_line(item: str) -> bool:
184    """Returns True if the given element should start a new report line"""
185    if item in TAF_NEWLINE:
186        return True
187    return any(item.startswith(start) for start in TAF_NEWLINE_STARTSWITH)
188
189
190def split_taf(txt: str) -> List[str]:
191    """Splits a TAF report into each distinct time period"""
192    lines = []
193    split = txt.split()
194    last_index = 0
195    for i, item in enumerate(split):
196        if starts_new_line(item) and i != 0 and not split[i - 1].startswith("PROB"):
197            lines.append(" ".join(split[last_index:i]))
198            last_index = i
199    lines.append(" ".join(split[last_index:]))
200    return lines
201
202
203# TAF line report type and start/end times
204def get_type_and_times(
205    data: List[str],
206) -> Tuple[List[str], str, Optional[str], Optional[str], Optional[str]]:
207    """Returns the report list and removed:
208
209    Report type string, start time string, end time string
210    """
211    report_type, start_time, end_time, transition = "FROM", None, None, None
212    if data:
213        # TEMPO, BECMG, INTER
214        if data[0] in TAF_NEWLINE:
215            report_type = data.pop(0)
216        # PROB[30,40]
217        elif len(data[0]) == 6 and data[0].startswith("PROB"):
218            report_type = data.pop(0)
219    if data:
220        # 1200/1306
221        if (
222            len(data[0]) == 9
223            and data[0][4] == "/"
224            and data[0][:4].isdigit()
225            and data[0][5:].isdigit()
226        ):
227            start_time, end_time = data.pop(0).split("/")
228
229        # 1200 1306
230        elif (
231            len(data) == 8
232            and len(data[0]) == 4
233            and len(data[1]) == 4
234            and data[0].isdigit()
235            and data[1].isdigit()
236        ):
237            start_time = data.pop(0)
238            end_time = data.pop(0)
239
240        # 120000
241        elif len(data[0]) == 6 and data[0].isdigit() and data[0][-2:] == "00":
242            start_time = data.pop(0)[:4]
243        # FM120000
244        elif len(data[0]) > 7 and data[0].startswith("FM"):
245            report_type = "FROM"
246            if (
247                "/" in data[0]
248                and data[0][2:].split("/")[0].isdigit()
249                and data[0][2:].split("/")[1].isdigit()
250            ):
251                start_time, end_time = data.pop(0)[2:].split("/")
252            elif data[0][2:8].isdigit():
253                start_time = data.pop(0)[2:6]
254            # TL120600
255            if (
256                data
257                and len(data[0]) > 7
258                and data[0].startswith("TL")
259                and data[0][2:8].isdigit()
260            ):
261                end_time = data.pop(0)[2:6]
262    if report_type == "BECMG":
263        transition, start_time, end_time = start_time, end_time, None
264    return data, report_type, start_time, end_time, transition
265
266
267def _is_tempo_or_prob(line: TafLineData) -> bool:
268    """Returns True if report type is TEMPO or non-null probability"""
269    return line.type == "TEMPO" or line.probability is not None
270
271
272def _get_next_time(lines: List[TafLineData], target: str) -> Optional[Timestamp]:
273    """Returns the next normal time target value or empty"""
274    for line in lines:
275        if _is_tempo_or_prob(line):
276            continue
277        if target == "start_time":
278            time = line.transition_start or getattr(line, target)
279        else:
280            time = getattr(line, target)
281        if time:
282            return time
283    return None
284
285
286def find_missing_taf_times(
287    lines: List[TafLineData], start: Optional[Timestamp], end: Optional[Timestamp]
288) -> List[TafLineData]:
289    """Fix any missing time issues (except for error/empty lines)"""
290    if not lines:
291        return lines
292    # Assign start time
293    lines[0].start_time = start
294    # Fix other times
295    last_fm_line = 0
296    for i, line in enumerate(lines):
297        if _is_tempo_or_prob(line):
298            continue
299        last_fm_line = i
300        # Search remaining lines to fill empty end or previous for empty start
301        for target, other, direc in (("start", "end", -1), ("end", "start", 1)):
302            target += "_time"
303            if not getattr(line, target):
304                setattr(
305                    line, target, _get_next_time(lines[i::direc][1:], f"{other}_time")
306                )
307    # Special case for final forcast
308    if last_fm_line:
309        lines[last_fm_line].end_time = end
310    # Reset original end time if still empty
311    if lines and not lines[0].end_time:
312        lines[0].end_time = end
313    return lines
314
315
316def get_wind_shear(data: List[str]) -> Tuple[List[str], Optional[str]]:
317    """Returns the report list and the remove wind shear"""
318    shear = None
319    for i, item in reversed(list(enumerate(data))):
320        if len(item) > 6 and item.startswith("WS") and item[5] == "/":
321            shear = data.pop(i).replace("KT", "")
322    return data, shear
323
324
325def get_temp_min_and_max(
326    data: List[str],
327) -> Tuple[List[str], Optional[str], Optional[str]]:
328    """Pull out Max temp at time and Min temp at time items from wx list"""
329    temp_max, temp_min = "", ""
330    for i, item in reversed(list(enumerate(data))):
331        if len(item) > 6 and item[0] == "T" and "/" in item:
332            # TX12/1316Z
333            if item[1] == "X":
334                temp_max = data.pop(i)
335            # TNM03/1404Z
336            elif item[1] == "N":
337                temp_min = data.pop(i)
338            # TM03/1404Z T12/1316Z -> Will fix TN/TX
339            elif item[1] == "M" or item[1].isdigit():
340                if temp_min:
341                    if int(temp_min[2 : temp_min.find("/")].replace("M", "-")) > int(
342                        item[1 : item.find("/")].replace("M", "-")
343                    ):
344                        temp_max, temp_min = f"TX{temp_min[2:]}", f"TN{item[1:]}"
345                    else:
346                        temp_max = f"TX{item[1:]}"
347                else:
348                    temp_min = f"TN{item[1:]}"
349                data.pop(i)
350    return data, temp_max or None, temp_min or None
351
352
353def get_oceania_temp_and_alt(data: List[str]) -> Tuple[List[str], List[str], List[str]]:
354    """Get Temperature and Altimeter lists for Oceania TAFs"""
355    tlist: List[str] = []
356    qlist: List[str] = []
357    if "T" in data:
358        data, tlist = core.get_digit_list(data, data.index("T"))
359    if "Q" in data:
360        data, qlist = core.get_digit_list(data, data.index("Q"))
361    return data, tlist, qlist
362
363
364def get_taf_flight_rules(lines: List[TafLineData]) -> List[TafLineData]:
365    """Get flight rules by looking for missing data in prior reports"""
366    for i, line in enumerate(lines):
367        temp_vis, temp_cloud, is_clear = line.visibility, line.clouds, False
368        for report in reversed(lines[: i + 1]):
369            if not _is_tempo_or_prob(report):
370                if not temp_vis:
371                    temp_vis = report.visibility
372                # SKC or CLR should force no clouds instead of looking back
373                if "SKC" in report.other or "CLR" in report.other:
374                    is_clear = True
375                elif temp_vis and temp_vis.repr == "CAVOK":
376                    is_clear = True
377                elif temp_cloud == []:
378                    temp_cloud = report.clouds
379                if temp_vis and temp_cloud != []:
380                    break
381        if is_clear:
382            temp_cloud = []
383        line.flight_rules = FLIGHT_RULES[
384            core.get_flight_rules(temp_vis, core.get_ceiling(temp_cloud))
385        ]
386    return lines
387
388
389def fix_report_header(report: str) -> str:
390    """Corrects the header order for key elements"""
391    split_report = report.split()
392
393    # Limit scope to only the first few elements. Remarks may include similar tokens
394    header_length = min(len(split_report), 6)
395    headers = split_report[:header_length]
396
397    fixed_headers = []
398    for target in ("TAF", "AMD", "COR"):
399        with suppress(ValueError):
400            headers.remove(target)
401            fixed_headers.append(target)
402
403    return " ".join(fixed_headers + headers + split_report[header_length:])
404
405
406def parse(
407    station: str, report: str, issued: Optional[date] = None
408) -> Tuple[Optional[TafData], Optional[Units], Optional[Sanitization]]:
409    """Returns TafData and Units dataclasses with parsed data and their associated units"""
410    # pylint: disable=too-many-locals
411    if not report:
412        return None, None, None
413    valid_station(station)
414    report = fix_report_header(report)
415    while len(report) > 3 and report[:4] in ("TAF ", "AMD ", "COR "):
416        report = report[4:]
417    start_time: Optional[Timestamp] = None
418    end_time: Optional[Timestamp] = None
419    sans = Sanitization()
420    sanitized = clean_taf_string(report, sans)
421    _, new_station, time = core.get_station_and_time(sanitized[:20].split())
422    if new_station is not None:
423        station = new_station
424    sanitized = sanitized.replace(station, "")
425    if time:
426        sanitized = sanitized.replace(time, "").strip()
427    units = Units.north_american() if uses_na_format(station) else Units.international()
428    # Find and remove remarks
429    sanitized, remarks = get_taf_remarks(sanitized)
430    # Split and parse each line
431    lines = split_taf(sanitized)
432    parsed_lines = parse_lines(lines, units, sans, issued)
433    # Perform additional info extract and corrections
434    max_temp: Optional[str] = None
435    min_temp: Optional[str] = None
436    if parsed_lines:
437        (
438            parsed_lines[-1].other,
439            max_temp,
440            min_temp,
441        ) = get_temp_min_and_max(parsed_lines[-1].other)
442        if not (max_temp or min_temp):
443            (
444                parsed_lines[0].other,
445                max_temp,
446                min_temp,
447            ) = get_temp_min_and_max(parsed_lines[0].other)
448        # Set start and end times based on the first line
449        start_time, end_time = parsed_lines[0].start_time, parsed_lines[0].end_time
450        parsed_lines[0].end_time = None
451        parsed_lines = find_missing_taf_times(parsed_lines, start_time, end_time)
452        parsed_lines = get_taf_flight_rules(parsed_lines)
453    # Extract Oceania-specific data
454    alts: Optional[List[str]] = None
455    temps: Optional[List[str]] = None
456    if station[0] == "A":
457        (
458            parsed_lines[-1].other,
459            alts,
460            temps,
461        ) = get_oceania_temp_and_alt(parsed_lines[-1].other)
462    # Convert wx codes
463    for line in parsed_lines:
464        line.other, line.wx_codes = get_wx_codes(line.other)
465    sanitized = " ".join(i for i in (station, time, sanitized) if i)
466    struct = TafData(
467        raw=report,
468        sanitized=sanitized,
469        station=station,
470        time=core.make_timestamp(time, target_date=issued),
471        remarks=remarks,
472        remarks_info=parse_remarks(remarks),
473        forecast=parsed_lines,
474        start_time=start_time,
475        end_time=end_time,
476        max_temp=max_temp,
477        min_temp=min_temp,
478        alts=alts,
479        temps=temps,
480    )
481    return struct, units, sans
482
483
484def parse_lines(
485    lines: List[str], units: Units, sans: Sanitization, issued: Optional[date] = None
486) -> List[TafLineData]:
487    """Returns a list of parsed line dictionaries"""
488    parsed_lines: List[TafLineData] = []
489    prob = ""
490    while lines:
491        raw_line = lines[0].strip()
492        line = sanitize_line(raw_line, sans)
493        # Remove prob from the beginning of a line
494        if line.startswith("PROB"):
495            # Add standalone prob to next line
496            if len(line) == 6:
497                prob = line
498                line = ""
499            # Add to current line
500            elif len(line) > 6:
501                prob = line[:6]
502                line = line[6:].strip()
503        if line:
504            parsed_line = parse_line(line, units, sans, issued)
505            parsed_line.probability = (
506                None if " " in prob else core.make_number(prob[4:])
507            )
508            parsed_line.raw = raw_line
509            if prob:
510                parsed_line.sanitized = f"{prob} {parsed_line.sanitized}"
511            prob = ""
512            parsed_lines.append(parsed_line)
513        lines.pop(0)
514    return parsed_lines
515
516
517def parse_line(
518    line: str, units: Units, sans: Sanitization, issued: Optional[date] = None
519) -> TafLineData:
520    """Parser for the International TAF forcast variant"""
521    # pylint: disable=too-many-locals
522    data = core.dedupe(line.split())
523    data = clean_taf_list(data, sans)
524    sanitized = " ".join(data)
525    data, report_type, start_time, end_time, transition = get_type_and_times(data)
526    data, wind_shear = get_wind_shear(data)
527    (
528        data,
529        wind_direction,
530        wind_speed,
531        wind_gust,
532        wind_variable_direction,
533    ) = core.get_wind(data, units)
534    if "CAVOK" in data:
535        visibility = core.make_number("CAVOK")
536        clouds: List[Cloud] = []
537        data.pop(data.index("CAVOK"))
538    else:
539        data, visibility = core.get_visibility(data, units)
540        data, clouds = core.get_clouds(data)
541    other, altimeter, icing, turbulence = get_alt_ice_turb(data)
542    return TafLineData(
543        altimeter=altimeter,
544        clouds=clouds,
545        flight_rules="",
546        other=other,
547        visibility=visibility,
548        wind_direction=wind_direction,
549        wind_gust=wind_gust,
550        wind_speed=wind_speed,
551        wx_codes=[],
552        end_time=core.make_timestamp(end_time, target_date=issued),
553        icing=icing,
554        probability=None,
555        raw=line,
556        sanitized=sanitized,
557        start_time=core.make_timestamp(start_time, target_date=issued),
558        transition_start=core.make_timestamp(transition, target_date=issued),
559        turbulence=turbulence,
560        type=report_type,
561        wind_shear=wind_shear,
562        wind_variable_direction=wind_variable_direction,
563    )
class Taf(avwx.current.base.Report):
 35class Taf(Report):
 36    """
 37    The Taf class offers an object-oriented approach to managing TAF data for a
 38    single station.
 39
 40    ```python
 41    >>> from avwx import Taf
 42    >>> kjfk = Taf("KJFK")
 43    >>> kjfk.station.name
 44    'John F Kennedy International Airport'
 45    >>> kjfk.update()
 46    True
 47    >>> kjfk.last_updated
 48    datetime.datetime(2018, 3, 4, 23, 43, 26, 209644, tzinfo=datetime.timezone.utc)
 49    >>> kjfk.raw
 50    'KJFK 042030Z 0421/0524 33016G27KT P6SM BKN045 FM051600 36016G22KT P6SM BKN040 FM052100 35013KT P6SM SCT035'
 51    >>> len(kjfk.data.forecast)
 52    3
 53    >>> kjfk.data.forecast[0].flight_rules
 54    'VFR'
 55    >>> kjfk.translations.forecast[0].wind
 56    'NNW-330 at 16kt gusting to 27kt'
 57    >>> kjfk.speech
 58    'Starting on March 4th - From 21 to 16 zulu, Winds three three zero at 16kt gusting to 27kt. Visibility greater than six miles. Broken layer at 4500ft. From 16 to 21 zulu, Winds three six zero at 16kt gusting to 22kt. Visibility greater than six miles. Broken layer at 4000ft. From 21 to midnight zulu, Winds three five zero at 13kt. Visibility greater than six miles. Scattered clouds at 3500ft'
 59    ```
 60
 61    The `parse` and `from_report` methods can parse a report string if you want
 62    to override the normal fetching process.
 63
 64    ```python
 65    >>> from avwx import Taf
 66    >>> report = "TAF ZYHB 082300Z 0823/0911 VRB03KT 9999 SCT018 BKN120 TX14/0907Z TN04/0921Z FM090100 09015KT 9999 -SHRA WS020/13045KT SCT018 BKN120 BECMG 0904/0906 34008KT PROB30 TEMPO 0906/0911 7000 -RA SCT020 650104 530804 RMK FCST BASED ON AUTO OBS. NXT FCST BY 090600Z"
 67    >>> zyhb = Taf.from_report(report)
 68    True
 69    >>> zyhb.station.city
 70    'Hulan'
 71    >>> zyhb.data.remarks
 72    'RMK FCST BASED ON AUTO OBS. NXT FCST BY 090600Z'
 73    >>> zyhb.summary[-1]
 74    'Vis 7km, Light Rain, Scattered clouds at 2000ft, Frequent moderate turbulence in clear air from 8000ft to 12000ft, Moderate icing in clouds from 1000ft to 5000ft'
 75    ```
 76    """
 77
 78    data: Optional[TafData] = None
 79    translations: Optional[TafTrans] = None  # type: ignore
 80
 81    async def _post_update(self) -> None:
 82        if self.code is None or self.raw is None:
 83            return
 84        self.data, self.units, self.sanitization = parse(
 85            self.code, self.raw, self.issued
 86        )
 87        if self.data is None or self.units is None:
 88            return
 89        self.translations = translate_taf(self.data, self.units)
 90
 91    def _post_parse(self) -> None:
 92        if self.code is None or self.raw is None:
 93            return
 94        self.data, self.units, self.sanitization = parse(
 95            self.code, self.raw, self.issued
 96        )
 97        if self.data is None or self.units is None:
 98            return
 99        self.translations = translate_taf(self.data, self.units)
100
101    @property
102    def summary(self) -> List[str]:
103        """Condensed summary for each forecast created from translations"""
104        if not self.translations:
105            self.update()
106        if self.translations is None or self.translations.forecast is None:
107            return []
108        return [summary.taf(trans) for trans in self.translations.forecast]
109
110    @property
111    def speech(self) -> Optional[str]:
112        """Report summary designed to be read by a text-to-speech program"""
113        if not self.data:
114            self.update()
115        if self.data is None or self.units is None:
116            return None
117        return speech.taf(self.data, self.units)

The Taf class offers an object-oriented approach to managing TAF data for a single station.

>>> from avwx import Taf
>>> kjfk = Taf("KJFK")
>>> kjfk.station.name
'John F Kennedy International Airport'
>>> kjfk.update()
True
>>> kjfk.last_updated
datetime.datetime(2018, 3, 4, 23, 43, 26, 209644, tzinfo=datetime.timezone.utc)
>>> kjfk.raw
'KJFK 042030Z 0421/0524 33016G27KT P6SM BKN045 FM051600 36016G22KT P6SM BKN040 FM052100 35013KT P6SM SCT035'
>>> len(kjfk.data.forecast)
3
>>> kjfk.data.forecast[0].flight_rules
'VFR'
>>> kjfk.translations.forecast[0].wind
'NNW-330 at 16kt gusting to 27kt'
>>> kjfk.speech
'Starting on March 4th - From 21 to 16 zulu, Winds three three zero at 16kt gusting to 27kt. Visibility greater than six miles. Broken layer at 4500ft. From 16 to 21 zulu, Winds three six zero at 16kt gusting to 22kt. Visibility greater than six miles. Broken layer at 4000ft. From 21 to midnight zulu, Winds three five zero at 13kt. Visibility greater than six miles. Scattered clouds at 3500ft'

The parse and from_report methods can parse a report string if you want to override the normal fetching process.

>>> from avwx import Taf
>>> report = "TAF ZYHB 082300Z 0823/0911 VRB03KT 9999 SCT018 BKN120 TX14/0907Z TN04/0921Z FM090100 09015KT 9999 -SHRA WS020/13045KT SCT018 BKN120 BECMG 0904/0906 34008KT PROB30 TEMPO 0906/0911 7000 -RA SCT020 650104 530804 RMK FCST BASED ON AUTO OBS. NXT FCST BY 090600Z"
>>> zyhb = Taf.from_report(report)
True
>>> zyhb.station.city
'Hulan'
>>> zyhb.data.remarks
'RMK FCST BASED ON AUTO OBS. NXT FCST BY 090600Z'
>>> zyhb.summary[-1]
'Vis 7km, Light Rain, Scattered clouds at 2000ft, Frequent moderate turbulence in clear air from 8000ft to 12000ft, Moderate icing in clouds from 1000ft to 5000ft'
data: Optional[avwx.structs.TafData] = None
translations: Optional[avwx.structs.TafTrans] = None
summary: List[str]

Condensed summary for each forecast created from translations

speech: Optional[str]

Report summary designed to be read by a text-to-speech program

LINE_FIXES = {'TEMP0': 'TEMPO', 'TEMP O': 'TEMPO', 'TMPO': 'TEMPO', 'TE MPO': 'TEMPO', 'TEMP ': 'TEMPO ', 'T EMPO': 'TEMPO', ' EMPO': ' TEMPO', 'TEMO': 'TEMPO', 'BECM G': 'BECMG', 'BEMCG': 'BECMG', 'BE CMG': 'BECMG', 'B ECMG': 'BECMG', ' BEC ': ' BECMG ', 'BCEMG': 'BECMG', 'BEMG': 'BECMG'}
def sanitize_line(txt: str, sans: avwx.structs.Sanitization) -> str:
139def sanitize_line(txt: str, sans: Sanitization) -> str:
140    """Fixes common mistakes with 'new line' signifiers so that they can be recognized"""
141    for key, fix in LINE_FIXES.items():
142        if key in txt:
143            txt = txt.replace(key, fix)
144            sans.log(key, fix)
145    # Fix when space is missing following new line signifiers
146    for item in ["BECMG", "TEMPO"]:
147        if item in txt and f"{item} " not in txt:
148            index = txt.find(item) + len(item)
149            txt = f"{txt[:index]} {txt[index:]}"
150            sans.extra_spaces_needed = True
151    return txt

Fixes common mistakes with 'new line' signifiers so that they can be recognized

def get_taf_remarks(txt: str) -> Tuple[str, str]:
154def get_taf_remarks(txt: str) -> Tuple[str, str]:
155    """Returns report and remarks separated if found"""
156    remarks_start = core.find_first_in_list(txt, TAF_RMK)
157    if remarks_start == -1:
158        return txt, ""
159    remarks = txt[remarks_start:]
160    txt = txt[:remarks_start].strip()
161    return txt, remarks

Returns report and remarks separated if found

def get_alt_ice_turb( data: List[str]) -> Tuple[List[str], Optional[avwx.structs.Number], List[str], List[str]]:
164def get_alt_ice_turb(
165    data: List[str],
166) -> Tuple[List[str], Optional[Number], List[str], List[str]]:
167    """Returns the report list and removed: Altimeter string, Icing list, Turbulence list"""
168    altimeter_number = None
169    icing, turbulence = [], []
170    for i, item in reversed(list(enumerate(data))):
171        if len(item) > 6 and item.startswith("QNH") and item[3:7].isdigit():
172            altimeter = data.pop(i)[3:7]
173            if altimeter[0] in ("2", "3"):
174                altimeter = f"{altimeter[:2]}.{altimeter[2:]}"
175            altimeter_number = core.make_number(altimeter, literal=True)
176        elif item.isdigit():
177            if item[0] == "6":
178                icing.append(data.pop(i))
179            elif item[0] == "5":
180                turbulence.append(data.pop(i))
181    return data, altimeter_number, icing, turbulence

Returns the report list and removed: Altimeter string, Icing list, Turbulence list

def starts_new_line(item: str) -> bool:
184def starts_new_line(item: str) -> bool:
185    """Returns True if the given element should start a new report line"""
186    if item in TAF_NEWLINE:
187        return True
188    return any(item.startswith(start) for start in TAF_NEWLINE_STARTSWITH)

Returns True if the given element should start a new report line

def split_taf(txt: str) -> List[str]:
191def split_taf(txt: str) -> List[str]:
192    """Splits a TAF report into each distinct time period"""
193    lines = []
194    split = txt.split()
195    last_index = 0
196    for i, item in enumerate(split):
197        if starts_new_line(item) and i != 0 and not split[i - 1].startswith("PROB"):
198            lines.append(" ".join(split[last_index:i]))
199            last_index = i
200    lines.append(" ".join(split[last_index:]))
201    return lines

Splits a TAF report into each distinct time period

def get_type_and_times( data: List[str]) -> Tuple[List[str], str, Optional[str], Optional[str], Optional[str]]:
205def get_type_and_times(
206    data: List[str],
207) -> Tuple[List[str], str, Optional[str], Optional[str], Optional[str]]:
208    """Returns the report list and removed:
209
210    Report type string, start time string, end time string
211    """
212    report_type, start_time, end_time, transition = "FROM", None, None, None
213    if data:
214        # TEMPO, BECMG, INTER
215        if data[0] in TAF_NEWLINE:
216            report_type = data.pop(0)
217        # PROB[30,40]
218        elif len(data[0]) == 6 and data[0].startswith("PROB"):
219            report_type = data.pop(0)
220    if data:
221        # 1200/1306
222        if (
223            len(data[0]) == 9
224            and data[0][4] == "/"
225            and data[0][:4].isdigit()
226            and data[0][5:].isdigit()
227        ):
228            start_time, end_time = data.pop(0).split("/")
229
230        # 1200 1306
231        elif (
232            len(data) == 8
233            and len(data[0]) == 4
234            and len(data[1]) == 4
235            and data[0].isdigit()
236            and data[1].isdigit()
237        ):
238            start_time = data.pop(0)
239            end_time = data.pop(0)
240
241        # 120000
242        elif len(data[0]) == 6 and data[0].isdigit() and data[0][-2:] == "00":
243            start_time = data.pop(0)[:4]
244        # FM120000
245        elif len(data[0]) > 7 and data[0].startswith("FM"):
246            report_type = "FROM"
247            if (
248                "/" in data[0]
249                and data[0][2:].split("/")[0].isdigit()
250                and data[0][2:].split("/")[1].isdigit()
251            ):
252                start_time, end_time = data.pop(0)[2:].split("/")
253            elif data[0][2:8].isdigit():
254                start_time = data.pop(0)[2:6]
255            # TL120600
256            if (
257                data
258                and len(data[0]) > 7
259                and data[0].startswith("TL")
260                and data[0][2:8].isdigit()
261            ):
262                end_time = data.pop(0)[2:6]
263    if report_type == "BECMG":
264        transition, start_time, end_time = start_time, end_time, None
265    return data, report_type, start_time, end_time, transition

Returns the report list and removed:

Report type string, start time string, end time string

def find_missing_taf_times( lines: List[avwx.structs.TafLineData], start: Optional[avwx.structs.Timestamp], end: Optional[avwx.structs.Timestamp]) -> List[avwx.structs.TafLineData]:
287def find_missing_taf_times(
288    lines: List[TafLineData], start: Optional[Timestamp], end: Optional[Timestamp]
289) -> List[TafLineData]:
290    """Fix any missing time issues (except for error/empty lines)"""
291    if not lines:
292        return lines
293    # Assign start time
294    lines[0].start_time = start
295    # Fix other times
296    last_fm_line = 0
297    for i, line in enumerate(lines):
298        if _is_tempo_or_prob(line):
299            continue
300        last_fm_line = i
301        # Search remaining lines to fill empty end or previous for empty start
302        for target, other, direc in (("start", "end", -1), ("end", "start", 1)):
303            target += "_time"
304            if not getattr(line, target):
305                setattr(
306                    line, target, _get_next_time(lines[i::direc][1:], f"{other}_time")
307                )
308    # Special case for final forcast
309    if last_fm_line:
310        lines[last_fm_line].end_time = end
311    # Reset original end time if still empty
312    if lines and not lines[0].end_time:
313        lines[0].end_time = end
314    return lines

Fix any missing time issues (except for error/empty lines)

def get_wind_shear(data: List[str]) -> Tuple[List[str], Optional[str]]:
317def get_wind_shear(data: List[str]) -> Tuple[List[str], Optional[str]]:
318    """Returns the report list and the remove wind shear"""
319    shear = None
320    for i, item in reversed(list(enumerate(data))):
321        if len(item) > 6 and item.startswith("WS") and item[5] == "/":
322            shear = data.pop(i).replace("KT", "")
323    return data, shear

Returns the report list and the remove wind shear

def get_temp_min_and_max(data: List[str]) -> Tuple[List[str], Optional[str], Optional[str]]:
326def get_temp_min_and_max(
327    data: List[str],
328) -> Tuple[List[str], Optional[str], Optional[str]]:
329    """Pull out Max temp at time and Min temp at time items from wx list"""
330    temp_max, temp_min = "", ""
331    for i, item in reversed(list(enumerate(data))):
332        if len(item) > 6 and item[0] == "T" and "/" in item:
333            # TX12/1316Z
334            if item[1] == "X":
335                temp_max = data.pop(i)
336            # TNM03/1404Z
337            elif item[1] == "N":
338                temp_min = data.pop(i)
339            # TM03/1404Z T12/1316Z -> Will fix TN/TX
340            elif item[1] == "M" or item[1].isdigit():
341                if temp_min:
342                    if int(temp_min[2 : temp_min.find("/")].replace("M", "-")) > int(
343                        item[1 : item.find("/")].replace("M", "-")
344                    ):
345                        temp_max, temp_min = f"TX{temp_min[2:]}", f"TN{item[1:]}"
346                    else:
347                        temp_max = f"TX{item[1:]}"
348                else:
349                    temp_min = f"TN{item[1:]}"
350                data.pop(i)
351    return data, temp_max or None, temp_min or None

Pull out Max temp at time and Min temp at time items from wx list

def get_oceania_temp_and_alt(data: List[str]) -> Tuple[List[str], List[str], List[str]]:
354def get_oceania_temp_and_alt(data: List[str]) -> Tuple[List[str], List[str], List[str]]:
355    """Get Temperature and Altimeter lists for Oceania TAFs"""
356    tlist: List[str] = []
357    qlist: List[str] = []
358    if "T" in data:
359        data, tlist = core.get_digit_list(data, data.index("T"))
360    if "Q" in data:
361        data, qlist = core.get_digit_list(data, data.index("Q"))
362    return data, tlist, qlist

Get Temperature and Altimeter lists for Oceania TAFs

def get_taf_flight_rules(lines: List[avwx.structs.TafLineData]) -> List[avwx.structs.TafLineData]:
365def get_taf_flight_rules(lines: List[TafLineData]) -> List[TafLineData]:
366    """Get flight rules by looking for missing data in prior reports"""
367    for i, line in enumerate(lines):
368        temp_vis, temp_cloud, is_clear = line.visibility, line.clouds, False
369        for report in reversed(lines[: i + 1]):
370            if not _is_tempo_or_prob(report):
371                if not temp_vis:
372                    temp_vis = report.visibility
373                # SKC or CLR should force no clouds instead of looking back
374                if "SKC" in report.other or "CLR" in report.other:
375                    is_clear = True
376                elif temp_vis and temp_vis.repr == "CAVOK":
377                    is_clear = True
378                elif temp_cloud == []:
379                    temp_cloud = report.clouds
380                if temp_vis and temp_cloud != []:
381                    break
382        if is_clear:
383            temp_cloud = []
384        line.flight_rules = FLIGHT_RULES[
385            core.get_flight_rules(temp_vis, core.get_ceiling(temp_cloud))
386        ]
387    return lines

Get flight rules by looking for missing data in prior reports

def fix_report_header(report: str) -> str:
390def fix_report_header(report: str) -> str:
391    """Corrects the header order for key elements"""
392    split_report = report.split()
393
394    # Limit scope to only the first few elements. Remarks may include similar tokens
395    header_length = min(len(split_report), 6)
396    headers = split_report[:header_length]
397
398    fixed_headers = []
399    for target in ("TAF", "AMD", "COR"):
400        with suppress(ValueError):
401            headers.remove(target)
402            fixed_headers.append(target)
403
404    return " ".join(fixed_headers + headers + split_report[header_length:])

Corrects the header order for key elements

def parse( station: str, report: str, issued: Optional[datetime.date] = None) -> Tuple[Optional[avwx.structs.TafData], Optional[avwx.structs.Units], Optional[avwx.structs.Sanitization]]:
407def parse(
408    station: str, report: str, issued: Optional[date] = None
409) -> Tuple[Optional[TafData], Optional[Units], Optional[Sanitization]]:
410    """Returns TafData and Units dataclasses with parsed data and their associated units"""
411    # pylint: disable=too-many-locals
412    if not report:
413        return None, None, None
414    valid_station(station)
415    report = fix_report_header(report)
416    while len(report) > 3 and report[:4] in ("TAF ", "AMD ", "COR "):
417        report = report[4:]
418    start_time: Optional[Timestamp] = None
419    end_time: Optional[Timestamp] = None
420    sans = Sanitization()
421    sanitized = clean_taf_string(report, sans)
422    _, new_station, time = core.get_station_and_time(sanitized[:20].split())
423    if new_station is not None:
424        station = new_station
425    sanitized = sanitized.replace(station, "")
426    if time:
427        sanitized = sanitized.replace(time, "").strip()
428    units = Units.north_american() if uses_na_format(station) else Units.international()
429    # Find and remove remarks
430    sanitized, remarks = get_taf_remarks(sanitized)
431    # Split and parse each line
432    lines = split_taf(sanitized)
433    parsed_lines = parse_lines(lines, units, sans, issued)
434    # Perform additional info extract and corrections
435    max_temp: Optional[str] = None
436    min_temp: Optional[str] = None
437    if parsed_lines:
438        (
439            parsed_lines[-1].other,
440            max_temp,
441            min_temp,
442        ) = get_temp_min_and_max(parsed_lines[-1].other)
443        if not (max_temp or min_temp):
444            (
445                parsed_lines[0].other,
446                max_temp,
447                min_temp,
448            ) = get_temp_min_and_max(parsed_lines[0].other)
449        # Set start and end times based on the first line
450        start_time, end_time = parsed_lines[0].start_time, parsed_lines[0].end_time
451        parsed_lines[0].end_time = None
452        parsed_lines = find_missing_taf_times(parsed_lines, start_time, end_time)
453        parsed_lines = get_taf_flight_rules(parsed_lines)
454    # Extract Oceania-specific data
455    alts: Optional[List[str]] = None
456    temps: Optional[List[str]] = None
457    if station[0] == "A":
458        (
459            parsed_lines[-1].other,
460            alts,
461            temps,
462        ) = get_oceania_temp_and_alt(parsed_lines[-1].other)
463    # Convert wx codes
464    for line in parsed_lines:
465        line.other, line.wx_codes = get_wx_codes(line.other)
466    sanitized = " ".join(i for i in (station, time, sanitized) if i)
467    struct = TafData(
468        raw=report,
469        sanitized=sanitized,
470        station=station,
471        time=core.make_timestamp(time, target_date=issued),
472        remarks=remarks,
473        remarks_info=parse_remarks(remarks),
474        forecast=parsed_lines,
475        start_time=start_time,
476        end_time=end_time,
477        max_temp=max_temp,
478        min_temp=min_temp,
479        alts=alts,
480        temps=temps,
481    )
482    return struct, units, sans

Returns TafData and Units dataclasses with parsed data and their associated units

def parse_lines( lines: List[str], units: avwx.structs.Units, sans: avwx.structs.Sanitization, issued: Optional[datetime.date] = None) -> List[avwx.structs.TafLineData]:
485def parse_lines(
486    lines: List[str], units: Units, sans: Sanitization, issued: Optional[date] = None
487) -> List[TafLineData]:
488    """Returns a list of parsed line dictionaries"""
489    parsed_lines: List[TafLineData] = []
490    prob = ""
491    while lines:
492        raw_line = lines[0].strip()
493        line = sanitize_line(raw_line, sans)
494        # Remove prob from the beginning of a line
495        if line.startswith("PROB"):
496            # Add standalone prob to next line
497            if len(line) == 6:
498                prob = line
499                line = ""
500            # Add to current line
501            elif len(line) > 6:
502                prob = line[:6]
503                line = line[6:].strip()
504        if line:
505            parsed_line = parse_line(line, units, sans, issued)
506            parsed_line.probability = (
507                None if " " in prob else core.make_number(prob[4:])
508            )
509            parsed_line.raw = raw_line
510            if prob:
511                parsed_line.sanitized = f"{prob} {parsed_line.sanitized}"
512            prob = ""
513            parsed_lines.append(parsed_line)
514        lines.pop(0)
515    return parsed_lines

Returns a list of parsed line dictionaries

def parse_line( line: str, units: avwx.structs.Units, sans: avwx.structs.Sanitization, issued: Optional[datetime.date] = None) -> avwx.structs.TafLineData:
518def parse_line(
519    line: str, units: Units, sans: Sanitization, issued: Optional[date] = None
520) -> TafLineData:
521    """Parser for the International TAF forcast variant"""
522    # pylint: disable=too-many-locals
523    data = core.dedupe(line.split())
524    data = clean_taf_list(data, sans)
525    sanitized = " ".join(data)
526    data, report_type, start_time, end_time, transition = get_type_and_times(data)
527    data, wind_shear = get_wind_shear(data)
528    (
529        data,
530        wind_direction,
531        wind_speed,
532        wind_gust,
533        wind_variable_direction,
534    ) = core.get_wind(data, units)
535    if "CAVOK" in data:
536        visibility = core.make_number("CAVOK")
537        clouds: List[Cloud] = []
538        data.pop(data.index("CAVOK"))
539    else:
540        data, visibility = core.get_visibility(data, units)
541        data, clouds = core.get_clouds(data)
542    other, altimeter, icing, turbulence = get_alt_ice_turb(data)
543    return TafLineData(
544        altimeter=altimeter,
545        clouds=clouds,
546        flight_rules="",
547        other=other,
548        visibility=visibility,
549        wind_direction=wind_direction,
550        wind_gust=wind_gust,
551        wind_speed=wind_speed,
552        wx_codes=[],
553        end_time=core.make_timestamp(end_time, target_date=issued),
554        icing=icing,
555        probability=None,
556        raw=line,
557        sanitized=sanitized,
558        start_time=core.make_timestamp(start_time, target_date=issued),
559        transition_start=core.make_timestamp(transition, target_date=issued),
560        turbulence=turbulence,
561        type=report_type,
562        wind_shear=wind_shear,
563        wind_variable_direction=wind_variable_direction,
564    )

Parser for the International TAF forcast variant