avwx.service.files
These services are directed at FTP servers to find the most recent file associated with the search criteria. Files are stored in a temporary directory which is deleted when the program ends. Fetch requests will extract reports from the downloaded file until an update interval has been exceeded, at which point the service will check for a newer file. You can also have direct access to all downloaded reports.
1""" 2These services are directed at FTP servers to find the most recent file 3associated with the search criteria. Files are stored in a temporary directory 4which is deleted when the program ends. Fetch requests will extract reports 5from the downloaded file until an update interval has been exceeded, at which 6point the service will check for a newer file. You can also have direct access 7to all downloaded reports. 8""" 9 10# pylint: disable=invalid-name,arguments-differ 11 12# stdlib 13import asyncio as aio 14import atexit 15import datetime as dt 16import tempfile 17import warnings 18from contextlib import suppress 19from pathlib import Path 20from socket import gaierror 21from typing import Dict, Iterator, List, Optional, TextIO, Tuple 22 23# library 24import httpx 25 26# module 27from avwx.service.base import Service 28from avwx.station import valid_station 29 30 31_TEMP_DIR = tempfile.TemporaryDirectory() # pylint: disable=consider-using-with 32_TEMP = Path(_TEMP_DIR.name) 33 34 35_HTTPX_EXCEPTIONS = (httpx.ConnectTimeout, httpx.ReadTimeout, httpx.RemoteProtocolError) 36 37 38@atexit.register 39def _cleanup() -> None: 40 """Deletes temporary files and directory at Python exit""" 41 _TEMP_DIR.cleanup() 42 43 44class FileService(Service): 45 """Service class for fetching reports via managed source files""" 46 47 update_interval: dt.timedelta = dt.timedelta(minutes=10) 48 _updating: bool = False 49 50 @property 51 def _file_stem(self) -> str: 52 return f"{self.__class__.__name__}.{self.report_type}" 53 54 @property 55 def _file(self) -> Optional[Path]: 56 """Path object of the managed data file""" 57 for path in _TEMP.glob(f"{self._file_stem}*"): 58 return path 59 return None 60 61 @property 62 def last_updated(self) -> Optional[dt.datetime]: 63 """When the file was last updated""" 64 file = self._file 65 if file is None: 66 return None 67 try: 68 timestamp = int(file.name.split(".")[-2]) 69 return dt.datetime.fromtimestamp(timestamp, tz=dt.timezone.utc) 70 except (AttributeError, ValueError): 71 return None 72 73 @property 74 def is_outdated(self) -> bool: 75 """If the file should be updated based on the update interval""" 76 last = self.last_updated 77 if last is None: 78 return True 79 now = dt.datetime.now(tz=dt.timezone.utc) 80 return now > last + self.update_interval 81 82 def _new_path(self) -> Path: 83 now = dt.datetime.now(tz=dt.timezone.utc).timestamp() 84 timestamp = str(now).split(".", maxsplit=1)[0] 85 return _TEMP / f"{self._file_stem}.{timestamp}.txt" 86 87 async def _wait_until_updated(self) -> None: 88 while not self._updating: 89 await aio.sleep(0.01) 90 91 @property 92 def all(self) -> List[str]: 93 """All report strings available after updating""" 94 raise NotImplementedError() 95 96 @property 97 def _urls(self) -> Iterator[str]: 98 raise NotImplementedError() 99 100 def _extract(self, station: str, source: TextIO) -> Optional[str]: 101 raise NotImplementedError() 102 103 async def _update_file(self, timeout: int) -> bool: 104 """Finds and saves the most recent file""" 105 # Find the most recent file 106 async with httpx.AsyncClient(timeout=timeout) as client: 107 for url in self._urls: 108 try: 109 resp = await client.get(url) 110 if resp.status_code == 200: 111 break 112 except _HTTPX_EXCEPTIONS: 113 return False 114 except gaierror: 115 return False 116 else: 117 return False 118 # Save successful file download 119 new_path = self._new_path() 120 with new_path.open("wb") as new_file: 121 new_file.write(resp.content) 122 return True 123 124 async def update(self, wait: bool = False, timeout: int = 10) -> bool: 125 """Update the stored file and returns success 126 127 If wait, this will block if the file is already being updated 128 """ 129 # Guard for other async calls 130 if self._updating: 131 if wait: 132 await self._wait_until_updated() 133 return True 134 return False 135 self._updating = True 136 # Replace file 137 old_path = self._file 138 if not await self._update_file(timeout): 139 self._updating = False 140 return False 141 if old_path: 142 with suppress(FileNotFoundError): 143 old_path.unlink() 144 self._updating = False 145 return True 146 147 def fetch( 148 self, station: str, wait: bool = True, timeout: int = 10, force: bool = False 149 ) -> Optional[str]: 150 """Fetch a report string from the source file 151 152 If wait, this will block if the file is already being updated 153 154 Can force the service to fetch a new file 155 """ 156 return aio.run(self.async_fetch(station, wait, timeout, force)) 157 158 async def async_fetch( 159 self, station: str, wait: bool = True, timeout: int = 10, force: bool = False 160 ) -> Optional[str]: 161 """Asynchronously fetch a report string from the source file 162 163 If wait, this will block if the file is already being updated 164 165 Can force the service to fetch a new file 166 """ 167 valid_station(station) 168 if wait and self._updating: 169 await self._wait_until_updated() 170 if (force or self.is_outdated) and not await self.update(wait, timeout): 171 return None 172 file = self._file 173 if file is None: 174 return None 175 with file.open() as fin: 176 report = self._extract(station, fin) 177 return report 178 179 180class NOAA_Forecast(FileService): 181 """Subclass for extracting reports from NOAA FTP files""" 182 183 @property 184 def all(self) -> List[str]: 185 """All report strings available after updating""" 186 if self._file is None: 187 return [] 188 with self._file.open() as fin: 189 lines = fin.readlines() 190 reports = [] 191 report = "" 192 for line in lines: 193 if line := line.strip(): 194 report += "\n" + line 195 else: 196 if len(report) > 10: 197 reports.append(report.strip()) 198 report = "" 199 return reports 200 201 def _index_target(self, station: str) -> Tuple[str, str]: 202 raise NotImplementedError() 203 204 def _extract(self, station: str, source: TextIO) -> Optional[str]: 205 """Returns report pulled from the saved file""" 206 start, end = self._index_target(station) 207 txt = source.read() 208 txt = txt[txt.find(start) :] 209 txt = txt[: txt.find(end, 30)] 210 lines = [] 211 for line in txt.split("\n"): 212 if "CLIMO" not in line: 213 line = line.strip() 214 if not line: 215 break 216 lines.append(line) 217 return "\n".join(lines) or None 218 219 220class NOAA_NBM(NOAA_Forecast): 221 """Requests forecast data from NOAA NBM FTP servers""" 222 223 _url = "https://nomads.ncep.noaa.gov/pub/data/nccf/com/blend/prod/blend.{}/{}/text/blend_{}tx.t{}z" 224 _valid_types = ("nbh", "nbs", "nbe", "nbx") 225 226 @property 227 def _urls(self) -> Iterator[str]: 228 """Iterates through hourly updates no older than two days""" 229 date = dt.datetime.now(tz=dt.timezone.utc) 230 cutoff = date - dt.timedelta(days=1) 231 while date > cutoff: 232 timestamp = date.strftime(r"%Y%m%d") 233 hour = str(date.hour).zfill(2) 234 yield self._url.format(timestamp, hour, self.report_type, hour) 235 date -= dt.timedelta(hours=1) 236 237 def _index_target(self, station: str) -> Tuple[str, str]: 238 return f"{station} ", f"{self.report_type.upper()} GUIDANCE" 239 240 241class NOAA_GFS(NOAA_Forecast): 242 """Requests forecast data from NOAA GFS FTP servers""" 243 244 _url = "https://nomads.ncep.noaa.gov/pub/data/nccf/com/gfs/prod/gfsmos.{}/mdl_gfs{}.t{}z" 245 _valid_types = ("mav", "mex") 246 247 _cycles: Dict[str, Tuple[int, ...]] = {"mav": (0, 6, 12, 18), "mex": (0, 12)} 248 249 @property 250 def _urls(self) -> Iterator[str]: 251 """Iterates through update cycles no older than two days""" 252 warnings.warn( 253 "GFS fetch has been deprecated due to NOAA retiring the format. Migrate to NBM for similar data", 254 DeprecationWarning, 255 ) 256 now = dt.datetime.now(tz=dt.timezone.utc) 257 date = dt.datetime.now(tz=dt.timezone.utc) 258 cutoff = date - dt.timedelta(days=1) 259 while date > cutoff: 260 for cycle in reversed(self._cycles[self.report_type]): 261 date = date.replace(hour=cycle) 262 if date > now: 263 continue 264 timestamp = date.strftime(r"%Y%m%d") 265 hour = str(date.hour).zfill(2) 266 yield self._url.format(timestamp, self.report_type, hour) 267 date -= dt.timedelta(hours=1) 268 269 def _index_target(self, station: str) -> Tuple[str, str]: 270 return f"{station} GFS", f"{self.report_type.upper()} GUIDANCE" 271 272 273# https://www.ncei.noaa.gov/data/ncep-global-data-assimilation/access/202304/20230415/
45class FileService(Service): 46 """Service class for fetching reports via managed source files""" 47 48 update_interval: dt.timedelta = dt.timedelta(minutes=10) 49 _updating: bool = False 50 51 @property 52 def _file_stem(self) -> str: 53 return f"{self.__class__.__name__}.{self.report_type}" 54 55 @property 56 def _file(self) -> Optional[Path]: 57 """Path object of the managed data file""" 58 for path in _TEMP.glob(f"{self._file_stem}*"): 59 return path 60 return None 61 62 @property 63 def last_updated(self) -> Optional[dt.datetime]: 64 """When the file was last updated""" 65 file = self._file 66 if file is None: 67 return None 68 try: 69 timestamp = int(file.name.split(".")[-2]) 70 return dt.datetime.fromtimestamp(timestamp, tz=dt.timezone.utc) 71 except (AttributeError, ValueError): 72 return None 73 74 @property 75 def is_outdated(self) -> bool: 76 """If the file should be updated based on the update interval""" 77 last = self.last_updated 78 if last is None: 79 return True 80 now = dt.datetime.now(tz=dt.timezone.utc) 81 return now > last + self.update_interval 82 83 def _new_path(self) -> Path: 84 now = dt.datetime.now(tz=dt.timezone.utc).timestamp() 85 timestamp = str(now).split(".", maxsplit=1)[0] 86 return _TEMP / f"{self._file_stem}.{timestamp}.txt" 87 88 async def _wait_until_updated(self) -> None: 89 while not self._updating: 90 await aio.sleep(0.01) 91 92 @property 93 def all(self) -> List[str]: 94 """All report strings available after updating""" 95 raise NotImplementedError() 96 97 @property 98 def _urls(self) -> Iterator[str]: 99 raise NotImplementedError() 100 101 def _extract(self, station: str, source: TextIO) -> Optional[str]: 102 raise NotImplementedError() 103 104 async def _update_file(self, timeout: int) -> bool: 105 """Finds and saves the most recent file""" 106 # Find the most recent file 107 async with httpx.AsyncClient(timeout=timeout) as client: 108 for url in self._urls: 109 try: 110 resp = await client.get(url) 111 if resp.status_code == 200: 112 break 113 except _HTTPX_EXCEPTIONS: 114 return False 115 except gaierror: 116 return False 117 else: 118 return False 119 # Save successful file download 120 new_path = self._new_path() 121 with new_path.open("wb") as new_file: 122 new_file.write(resp.content) 123 return True 124 125 async def update(self, wait: bool = False, timeout: int = 10) -> bool: 126 """Update the stored file and returns success 127 128 If wait, this will block if the file is already being updated 129 """ 130 # Guard for other async calls 131 if self._updating: 132 if wait: 133 await self._wait_until_updated() 134 return True 135 return False 136 self._updating = True 137 # Replace file 138 old_path = self._file 139 if not await self._update_file(timeout): 140 self._updating = False 141 return False 142 if old_path: 143 with suppress(FileNotFoundError): 144 old_path.unlink() 145 self._updating = False 146 return True 147 148 def fetch( 149 self, station: str, wait: bool = True, timeout: int = 10, force: bool = False 150 ) -> Optional[str]: 151 """Fetch a report string from the source file 152 153 If wait, this will block if the file is already being updated 154 155 Can force the service to fetch a new file 156 """ 157 return aio.run(self.async_fetch(station, wait, timeout, force)) 158 159 async def async_fetch( 160 self, station: str, wait: bool = True, timeout: int = 10, force: bool = False 161 ) -> Optional[str]: 162 """Asynchronously fetch a report string from the source file 163 164 If wait, this will block if the file is already being updated 165 166 Can force the service to fetch a new file 167 """ 168 valid_station(station) 169 if wait and self._updating: 170 await self._wait_until_updated() 171 if (force or self.is_outdated) and not await self.update(wait, timeout): 172 return None 173 file = self._file 174 if file is None: 175 return None 176 with file.open() as fin: 177 report = self._extract(station, fin) 178 return report
Service class for fetching reports via managed source files
125 async def update(self, wait: bool = False, timeout: int = 10) -> bool: 126 """Update the stored file and returns success 127 128 If wait, this will block if the file is already being updated 129 """ 130 # Guard for other async calls 131 if self._updating: 132 if wait: 133 await self._wait_until_updated() 134 return True 135 return False 136 self._updating = True 137 # Replace file 138 old_path = self._file 139 if not await self._update_file(timeout): 140 self._updating = False 141 return False 142 if old_path: 143 with suppress(FileNotFoundError): 144 old_path.unlink() 145 self._updating = False 146 return True
Update the stored file and returns success
If wait, this will block if the file is already being updated
148 def fetch( 149 self, station: str, wait: bool = True, timeout: int = 10, force: bool = False 150 ) -> Optional[str]: 151 """Fetch a report string from the source file 152 153 If wait, this will block if the file is already being updated 154 155 Can force the service to fetch a new file 156 """ 157 return aio.run(self.async_fetch(station, wait, timeout, force))
Fetch a report string from the source file
If wait, this will block if the file is already being updated
Can force the service to fetch a new file
159 async def async_fetch( 160 self, station: str, wait: bool = True, timeout: int = 10, force: bool = False 161 ) -> Optional[str]: 162 """Asynchronously fetch a report string from the source file 163 164 If wait, this will block if the file is already being updated 165 166 Can force the service to fetch a new file 167 """ 168 valid_station(station) 169 if wait and self._updating: 170 await self._wait_until_updated() 171 if (force or self.is_outdated) and not await self.update(wait, timeout): 172 return None 173 file = self._file 174 if file is None: 175 return None 176 with file.open() as fin: 177 report = self._extract(station, fin) 178 return report
Asynchronously fetch a report string from the source file
If wait, this will block if the file is already being updated
Can force the service to fetch a new file
Inherited Members
181class NOAA_Forecast(FileService): 182 """Subclass for extracting reports from NOAA FTP files""" 183 184 @property 185 def all(self) -> List[str]: 186 """All report strings available after updating""" 187 if self._file is None: 188 return [] 189 with self._file.open() as fin: 190 lines = fin.readlines() 191 reports = [] 192 report = "" 193 for line in lines: 194 if line := line.strip(): 195 report += "\n" + line 196 else: 197 if len(report) > 10: 198 reports.append(report.strip()) 199 report = "" 200 return reports 201 202 def _index_target(self, station: str) -> Tuple[str, str]: 203 raise NotImplementedError() 204 205 def _extract(self, station: str, source: TextIO) -> Optional[str]: 206 """Returns report pulled from the saved file""" 207 start, end = self._index_target(station) 208 txt = source.read() 209 txt = txt[txt.find(start) :] 210 txt = txt[: txt.find(end, 30)] 211 lines = [] 212 for line in txt.split("\n"): 213 if "CLIMO" not in line: 214 line = line.strip() 215 if not line: 216 break 217 lines.append(line) 218 return "\n".join(lines) or None
Subclass for extracting reports from NOAA FTP files
221class NOAA_NBM(NOAA_Forecast): 222 """Requests forecast data from NOAA NBM FTP servers""" 223 224 _url = "https://nomads.ncep.noaa.gov/pub/data/nccf/com/blend/prod/blend.{}/{}/text/blend_{}tx.t{}z" 225 _valid_types = ("nbh", "nbs", "nbe", "nbx") 226 227 @property 228 def _urls(self) -> Iterator[str]: 229 """Iterates through hourly updates no older than two days""" 230 date = dt.datetime.now(tz=dt.timezone.utc) 231 cutoff = date - dt.timedelta(days=1) 232 while date > cutoff: 233 timestamp = date.strftime(r"%Y%m%d") 234 hour = str(date.hour).zfill(2) 235 yield self._url.format(timestamp, hour, self.report_type, hour) 236 date -= dt.timedelta(hours=1) 237 238 def _index_target(self, station: str) -> Tuple[str, str]: 239 return f"{station} ", f"{self.report_type.upper()} GUIDANCE"
Requests forecast data from NOAA NBM FTP servers
242class NOAA_GFS(NOAA_Forecast): 243 """Requests forecast data from NOAA GFS FTP servers""" 244 245 _url = "https://nomads.ncep.noaa.gov/pub/data/nccf/com/gfs/prod/gfsmos.{}/mdl_gfs{}.t{}z" 246 _valid_types = ("mav", "mex") 247 248 _cycles: Dict[str, Tuple[int, ...]] = {"mav": (0, 6, 12, 18), "mex": (0, 12)} 249 250 @property 251 def _urls(self) -> Iterator[str]: 252 """Iterates through update cycles no older than two days""" 253 warnings.warn( 254 "GFS fetch has been deprecated due to NOAA retiring the format. Migrate to NBM for similar data", 255 DeprecationWarning, 256 ) 257 now = dt.datetime.now(tz=dt.timezone.utc) 258 date = dt.datetime.now(tz=dt.timezone.utc) 259 cutoff = date - dt.timedelta(days=1) 260 while date > cutoff: 261 for cycle in reversed(self._cycles[self.report_type]): 262 date = date.replace(hour=cycle) 263 if date > now: 264 continue 265 timestamp = date.strftime(r"%Y%m%d") 266 hour = str(date.hour).zfill(2) 267 yield self._url.format(timestamp, self.report_type, hour) 268 date -= dt.timedelta(hours=1) 269 270 def _index_target(self, station: str) -> Tuple[str, str]: 271 return f"{station} GFS", f"{self.report_type.upper()} GUIDANCE"
Requests forecast data from NOAA GFS FTP servers