avwx.forecast.base
Forecast report shared resources
1""" 2Forecast report shared resources 3""" 4 5# pylint: disable=too-many-arguments 6 7# stdlib 8from datetime import datetime, timedelta, timezone 9from typing import Callable, List, Optional, Tuple, Union 10 11# module 12from avwx.base import ManagedReport 13from avwx.parsing import core 14from avwx.service import Service 15from avwx.structs import Code, Number, ReportData, Timestamp 16 17 18def _trim_lines(lines: List[str], target: int) -> List[str]: 19 """Trim all lines to match the trimmed length of the target line""" 20 length = len(lines[target].strip()) 21 return [l[:length] for l in lines] 22 23 24def _split_line( 25 line: str, size: int = 3, prefix: int = 4, strip: str = " |" 26) -> List[str]: 27 """Evenly split a string while stripping elements""" 28 line = line[prefix:] 29 ret = [] 30 while len(line) >= size: 31 ret.append(line[:size].strip(strip)) 32 line = line[size:] 33 if line := line.strip(strip): 34 ret.append(line) 35 return ret 36 37 38def _timestamp(line: str) -> Timestamp: 39 """Returns the report timestamp from the first line""" 40 start = line.find("GUIDANCE") + 11 41 text = line[start : start + 16].strip() 42 timestamp = datetime.strptime(text, r"%m/%d/%Y %H%M") 43 return Timestamp(text, timestamp.replace(tzinfo=timezone.utc)) 44 45 46def _find_time_periods(line: List[str], timestamp: Optional[datetime]) -> List[dict]: 47 """Find and create the empty time periods""" 48 periods: List[Optional[Timestamp]] = [] 49 if timestamp is None: 50 periods = [None] * len(line) 51 else: 52 previous = timestamp.hour 53 for hourstr in line: 54 if not hourstr: 55 continue 56 hour = int(hourstr) 57 previous, difference = hour, hour - previous 58 if difference < 0: 59 difference += 24 60 timestamp += timedelta(hours=difference) 61 periods.append(Timestamp(hourstr, timestamp)) 62 return [{"time": time} for time in periods] 63 64 65def _init_parse(report: str) -> Tuple[ReportData, List[str]]: 66 """Returns the meta data and lines from a report string""" 67 report = report.strip() 68 lines = report.split("\n") 69 struct = ReportData( 70 raw=report, 71 sanitized=report, 72 station=report[:4], 73 time=_timestamp(lines[0]), 74 remarks=None, 75 ) 76 return struct, lines 77 78 79def _numbers( 80 line: str, 81 size: int = 3, 82 prefix: str = "", 83 postfix: str = "", 84 decimal: Optional[int] = None, 85 literal: bool = False, 86 special: Optional[dict] = None, 87) -> List[Optional[Number]]: 88 """Parse line into Number objects 89 90 Prefix, postfix, and decimal location are applied to value, not repr 91 92 Decimal is applied after prefix and postfix 93 """ 94 ret = [] 95 for item in _split_line(line, size=size): 96 value = None 97 if item: 98 value = prefix + item + postfix 99 if decimal is not None: 100 if abs(decimal) > len(value): 101 value = value.zfill(abs(decimal)) 102 value = f"{value[:decimal]}.{value[decimal:]}" 103 ret.append(core.make_number(value, repr=item, literal=literal, special=special)) 104 return ret 105 106 107def _decimal_10(line: str, size: int = 3) -> List[Optional[Number]]: 108 """Parse line into Number objects with 10ths decimal location""" 109 return _numbers(line, size, decimal=-1) 110 111 112def _decimal_100(line: str, size: int = 3) -> List[Optional[Number]]: 113 """Parse line into Number objects with 100ths decimal location""" 114 return _numbers(line, size, decimal=-2) 115 116 117def _number_10(line: str, size: int = 3) -> List[Optional[Number]]: 118 """Parse line into Number objects in tens""" 119 return _numbers(line, size, postfix="0") 120 121 122def _number_100(line: str, size: int = 3) -> List[Optional[Number]]: 123 """Parse line into Number objects in hundreds""" 124 return _numbers(line, size, postfix="00") 125 126 127def _direction(line: str, size: int = 3) -> List[Optional[Number]]: 128 """Parse line into Number objects in hundreds""" 129 return _numbers(line, size, postfix="0", literal=True) 130 131 132def _code(mapping: dict) -> Callable: 133 """Generates a conditional code mapping function""" 134 135 def func(line: str, size: int = 3) -> List[Union[Code, str, None]]: 136 ret: List[Union[Code, str, None]] = [] 137 for key in _split_line(line, size=size): 138 try: 139 ret.append(Code(key, mapping[key])) 140 except KeyError: 141 ret.append(key or None) 142 return ret 143 144 return func 145 146 147def _parse_lines( 148 periods: List[dict], 149 lines: List[str], 150 handlers: Union[dict, Callable], 151 size: int = 3, 152) -> None: 153 """Add data to time periods by parsing each line (element type) 154 155 Adds data in place 156 """ 157 for line in lines: 158 try: 159 key = line[:3] 160 *keys, handler = ( 161 handlers[key] if isinstance(handlers, dict) else handlers(key) 162 ) 163 except (IndexError, KeyError): 164 continue 165 values = handler(line, size=size) 166 values += [None] * (len(periods) - len(values)) 167 # pylint: disable=consider-using-enumerate 168 for i in range(len(periods)): 169 value = values[i] 170 if not value: 171 continue 172 if isinstance(value, tuple): 173 for j, key in enumerate(keys): 174 if value[j]: 175 periods[i][key] = value[j] 176 else: 177 periods[i][keys[0]] = value 178 179 180class Forecast(ManagedReport): 181 """Forecast base class""" 182 183 # pylint: disable=abstract-method 184 185 report_type: str 186 _service_class: Service 187 188 def __init__(self, code: str): 189 super().__init__(code) 190 self.service: Service = self._service_class(self.report_type) # type: ignore
181class Forecast(ManagedReport): 182 """Forecast base class""" 183 184 # pylint: disable=abstract-method 185 186 report_type: str 187 _service_class: Service 188 189 def __init__(self, code: str): 190 super().__init__(code) 191 self.service: Service = self._service_class(self.report_type) # type: ignore
Forecast base class
service: avwx.service.base.Service