avwx.service.files

These services are directed at FTP servers to find the most recent file associated with the search criteria. Files are stored in a temporary directory which is deleted when the program ends. Fetch requests will extract reports from the downloaded file until an update interval has been exceeded, at which point the service will check for a newer file. You can also have direct access to all downloaded reports.

  1"""
  2These services are directed at FTP servers to find the most recent file
  3associated with the search criteria. Files are stored in a temporary directory
  4which is deleted when the program ends. Fetch requests will extract reports
  5from the downloaded file until an update interval has been exceeded, at which
  6point the service will check for a newer file. You can also have direct access
  7to all downloaded reports.
  8"""
  9
 10# pylint: disable=invalid-name,arguments-differ
 11
 12# stdlib
 13import asyncio as aio
 14import atexit
 15import datetime as dt
 16import tempfile
 17import warnings
 18from contextlib import suppress
 19from pathlib import Path
 20from socket import gaierror
 21from typing import Dict, Iterator, List, Optional, TextIO, Tuple
 22
 23# library
 24import httpx
 25
 26# module
 27from avwx.service.base import Service
 28from avwx.station import valid_station
 29
 30
 31_TEMP_DIR = tempfile.TemporaryDirectory()  # pylint: disable=consider-using-with
 32_TEMP = Path(_TEMP_DIR.name)
 33
 34
 35_HTTPX_EXCEPTIONS = (httpx.ConnectTimeout, httpx.ReadTimeout, httpx.RemoteProtocolError)
 36
 37
 38@atexit.register
 39def _cleanup() -> None:
 40    """Deletes temporary files and directory at Python exit"""
 41    _TEMP_DIR.cleanup()
 42
 43
 44class FileService(Service):
 45    """Service class for fetching reports via managed source files"""
 46
 47    update_interval: dt.timedelta = dt.timedelta(minutes=10)
 48    _updating: bool = False
 49
 50    @property
 51    def _file_stem(self) -> str:
 52        return f"{self.__class__.__name__}.{self.report_type}"
 53
 54    @property
 55    def _file(self) -> Optional[Path]:
 56        """Path object of the managed data file"""
 57        for path in _TEMP.glob(f"{self._file_stem}*"):
 58            return path
 59        return None
 60
 61    @property
 62    def last_updated(self) -> Optional[dt.datetime]:
 63        """When the file was last updated"""
 64        file = self._file
 65        if file is None:
 66            return None
 67        try:
 68            timestamp = int(file.name.split(".")[-2])
 69            return dt.datetime.fromtimestamp(timestamp, tz=dt.timezone.utc)
 70        except (AttributeError, ValueError):
 71            return None
 72
 73    @property
 74    def is_outdated(self) -> bool:
 75        """If the file should be updated based on the update interval"""
 76        last = self.last_updated
 77        if last is None:
 78            return True
 79        now = dt.datetime.now(tz=dt.timezone.utc)
 80        return now > last + self.update_interval
 81
 82    def _new_path(self) -> Path:
 83        now = dt.datetime.now(tz=dt.timezone.utc).timestamp()
 84        timestamp = str(now).split(".", maxsplit=1)[0]
 85        return _TEMP / f"{self._file_stem}.{timestamp}.txt"
 86
 87    async def _wait_until_updated(self) -> None:
 88        while not self._updating:
 89            await aio.sleep(0.01)
 90
 91    @property
 92    def all(self) -> List[str]:
 93        """All report strings available after updating"""
 94        raise NotImplementedError()
 95
 96    @property
 97    def _urls(self) -> Iterator[str]:
 98        raise NotImplementedError()
 99
100    def _extract(self, station: str, source: TextIO) -> Optional[str]:
101        raise NotImplementedError()
102
103    async def _update_file(self, timeout: int) -> bool:
104        """Finds and saves the most recent file"""
105        # Find the most recent file
106        async with httpx.AsyncClient(timeout=timeout) as client:
107            for url in self._urls:
108                try:
109                    resp = await client.get(url)
110                    if resp.status_code == 200:
111                        break
112                except _HTTPX_EXCEPTIONS:
113                    return False
114                except gaierror:
115                    return False
116            else:
117                return False
118        # Save successful file download
119        new_path = self._new_path()
120        with new_path.open("wb") as new_file:
121            new_file.write(resp.content)
122        return True
123
124    async def update(self, wait: bool = False, timeout: int = 10) -> bool:
125        """Update the stored file and returns success
126
127        If wait, this will block if the file is already being updated
128        """
129        # Guard for other async calls
130        if self._updating:
131            if wait:
132                await self._wait_until_updated()
133                return True
134            return False
135        self._updating = True
136        # Replace file
137        old_path = self._file
138        if not await self._update_file(timeout):
139            self._updating = False
140            return False
141        if old_path:
142            with suppress(FileNotFoundError):
143                old_path.unlink()
144        self._updating = False
145        return True
146
147    def fetch(
148        self, station: str, wait: bool = True, timeout: int = 10, force: bool = False
149    ) -> Optional[str]:
150        """Fetch a report string from the source file
151
152        If wait, this will block if the file is already being updated
153
154        Can force the service to fetch a new file
155        """
156        return aio.run(self.async_fetch(station, wait, timeout, force))
157
158    async def async_fetch(
159        self, station: str, wait: bool = True, timeout: int = 10, force: bool = False
160    ) -> Optional[str]:
161        """Asynchronously fetch a report string from the source file
162
163        If wait, this will block if the file is already being updated
164
165        Can force the service to fetch a new file
166        """
167        valid_station(station)
168        if wait and self._updating:
169            await self._wait_until_updated()
170        if (force or self.is_outdated) and not await self.update(wait, timeout):
171            return None
172        file = self._file
173        if file is None:
174            return None
175        with file.open() as fin:
176            report = self._extract(station, fin)
177        return report
178
179
180class NOAA_Forecast(FileService):
181    """Subclass for extracting reports from NOAA FTP files"""
182
183    @property
184    def all(self) -> List[str]:
185        """All report strings available after updating"""
186        if self._file is None:
187            return []
188        with self._file.open() as fin:
189            lines = fin.readlines()
190        reports = []
191        report = ""
192        for line in lines:
193            if line := line.strip():
194                report += "\n" + line
195            else:
196                if len(report) > 10:
197                    reports.append(report.strip())
198                report = ""
199        return reports
200
201    def _index_target(self, station: str) -> Tuple[str, str]:
202        raise NotImplementedError()
203
204    def _extract(self, station: str, source: TextIO) -> Optional[str]:
205        """Returns report pulled from the saved file"""
206        start, end = self._index_target(station)
207        txt = source.read()
208        txt = txt[txt.find(start) :]
209        txt = txt[: txt.find(end, 30)]
210        lines = []
211        for line in txt.split("\n"):
212            if "CLIMO" not in line:
213                line = line.strip()
214            if not line:
215                break
216            lines.append(line)
217        return "\n".join(lines) or None
218
219
220class NOAA_NBM(NOAA_Forecast):
221    """Requests forecast data from NOAA NBM FTP servers"""
222
223    _url = "https://nomads.ncep.noaa.gov/pub/data/nccf/com/blend/prod/blend.{}/{}/text/blend_{}tx.t{}z"
224    _valid_types = ("nbh", "nbs", "nbe", "nbx")
225
226    @property
227    def _urls(self) -> Iterator[str]:
228        """Iterates through hourly updates no older than two days"""
229        date = dt.datetime.now(tz=dt.timezone.utc)
230        cutoff = date - dt.timedelta(days=1)
231        while date > cutoff:
232            timestamp = date.strftime(r"%Y%m%d")
233            hour = str(date.hour).zfill(2)
234            yield self._url.format(timestamp, hour, self.report_type, hour)
235            date -= dt.timedelta(hours=1)
236
237    def _index_target(self, station: str) -> Tuple[str, str]:
238        return f"{station}   ", f"{self.report_type.upper()} GUIDANCE"
239
240
241class NOAA_GFS(NOAA_Forecast):
242    """Requests forecast data from NOAA GFS FTP servers"""
243
244    _url = "https://nomads.ncep.noaa.gov/pub/data/nccf/com/gfs/prod/gfsmos.{}/mdl_gfs{}.t{}z"
245    _valid_types = ("mav", "mex")
246
247    _cycles: Dict[str, Tuple[int, ...]] = {"mav": (0, 6, 12, 18), "mex": (0, 12)}
248
249    @property
250    def _urls(self) -> Iterator[str]:
251        """Iterates through update cycles no older than two days"""
252        warnings.warn(
253            "GFS fetch has been deprecated due to NOAA retiring the format. Migrate to NBM for similar data",
254            DeprecationWarning,
255        )
256        now = dt.datetime.now(tz=dt.timezone.utc)
257        date = dt.datetime.now(tz=dt.timezone.utc)
258        cutoff = date - dt.timedelta(days=1)
259        while date > cutoff:
260            for cycle in reversed(self._cycles[self.report_type]):
261                date = date.replace(hour=cycle)
262                if date > now:
263                    continue
264                timestamp = date.strftime(r"%Y%m%d")
265                hour = str(date.hour).zfill(2)
266                yield self._url.format(timestamp, self.report_type, hour)
267            date -= dt.timedelta(hours=1)
268
269    def _index_target(self, station: str) -> Tuple[str, str]:
270        return f"{station}   GFS", f"{self.report_type.upper()} GUIDANCE"
271
272
273# https://www.ncei.noaa.gov/data/ncep-global-data-assimilation/access/202304/20230415/
class FileService(avwx.service.base.Service):
 45class FileService(Service):
 46    """Service class for fetching reports via managed source files"""
 47
 48    update_interval: dt.timedelta = dt.timedelta(minutes=10)
 49    _updating: bool = False
 50
 51    @property
 52    def _file_stem(self) -> str:
 53        return f"{self.__class__.__name__}.{self.report_type}"
 54
 55    @property
 56    def _file(self) -> Optional[Path]:
 57        """Path object of the managed data file"""
 58        for path in _TEMP.glob(f"{self._file_stem}*"):
 59            return path
 60        return None
 61
 62    @property
 63    def last_updated(self) -> Optional[dt.datetime]:
 64        """When the file was last updated"""
 65        file = self._file
 66        if file is None:
 67            return None
 68        try:
 69            timestamp = int(file.name.split(".")[-2])
 70            return dt.datetime.fromtimestamp(timestamp, tz=dt.timezone.utc)
 71        except (AttributeError, ValueError):
 72            return None
 73
 74    @property
 75    def is_outdated(self) -> bool:
 76        """If the file should be updated based on the update interval"""
 77        last = self.last_updated
 78        if last is None:
 79            return True
 80        now = dt.datetime.now(tz=dt.timezone.utc)
 81        return now > last + self.update_interval
 82
 83    def _new_path(self) -> Path:
 84        now = dt.datetime.now(tz=dt.timezone.utc).timestamp()
 85        timestamp = str(now).split(".", maxsplit=1)[0]
 86        return _TEMP / f"{self._file_stem}.{timestamp}.txt"
 87
 88    async def _wait_until_updated(self) -> None:
 89        while not self._updating:
 90            await aio.sleep(0.01)
 91
 92    @property
 93    def all(self) -> List[str]:
 94        """All report strings available after updating"""
 95        raise NotImplementedError()
 96
 97    @property
 98    def _urls(self) -> Iterator[str]:
 99        raise NotImplementedError()
100
101    def _extract(self, station: str, source: TextIO) -> Optional[str]:
102        raise NotImplementedError()
103
104    async def _update_file(self, timeout: int) -> bool:
105        """Finds and saves the most recent file"""
106        # Find the most recent file
107        async with httpx.AsyncClient(timeout=timeout) as client:
108            for url in self._urls:
109                try:
110                    resp = await client.get(url)
111                    if resp.status_code == 200:
112                        break
113                except _HTTPX_EXCEPTIONS:
114                    return False
115                except gaierror:
116                    return False
117            else:
118                return False
119        # Save successful file download
120        new_path = self._new_path()
121        with new_path.open("wb") as new_file:
122            new_file.write(resp.content)
123        return True
124
125    async def update(self, wait: bool = False, timeout: int = 10) -> bool:
126        """Update the stored file and returns success
127
128        If wait, this will block if the file is already being updated
129        """
130        # Guard for other async calls
131        if self._updating:
132            if wait:
133                await self._wait_until_updated()
134                return True
135            return False
136        self._updating = True
137        # Replace file
138        old_path = self._file
139        if not await self._update_file(timeout):
140            self._updating = False
141            return False
142        if old_path:
143            with suppress(FileNotFoundError):
144                old_path.unlink()
145        self._updating = False
146        return True
147
148    def fetch(
149        self, station: str, wait: bool = True, timeout: int = 10, force: bool = False
150    ) -> Optional[str]:
151        """Fetch a report string from the source file
152
153        If wait, this will block if the file is already being updated
154
155        Can force the service to fetch a new file
156        """
157        return aio.run(self.async_fetch(station, wait, timeout, force))
158
159    async def async_fetch(
160        self, station: str, wait: bool = True, timeout: int = 10, force: bool = False
161    ) -> Optional[str]:
162        """Asynchronously fetch a report string from the source file
163
164        If wait, this will block if the file is already being updated
165
166        Can force the service to fetch a new file
167        """
168        valid_station(station)
169        if wait and self._updating:
170            await self._wait_until_updated()
171        if (force or self.is_outdated) and not await self.update(wait, timeout):
172            return None
173        file = self._file
174        if file is None:
175            return None
176        with file.open() as fin:
177            report = self._extract(station, fin)
178        return report

Service class for fetching reports via managed source files

update_interval: datetime.timedelta = datetime.timedelta(seconds=600)
last_updated: Optional[datetime.datetime]

When the file was last updated

is_outdated: bool

If the file should be updated based on the update interval

all: List[str]

All report strings available after updating

async def update(self, wait: bool = False, timeout: int = 10) -> bool:
125    async def update(self, wait: bool = False, timeout: int = 10) -> bool:
126        """Update the stored file and returns success
127
128        If wait, this will block if the file is already being updated
129        """
130        # Guard for other async calls
131        if self._updating:
132            if wait:
133                await self._wait_until_updated()
134                return True
135            return False
136        self._updating = True
137        # Replace file
138        old_path = self._file
139        if not await self._update_file(timeout):
140            self._updating = False
141            return False
142        if old_path:
143            with suppress(FileNotFoundError):
144                old_path.unlink()
145        self._updating = False
146        return True

Update the stored file and returns success

If wait, this will block if the file is already being updated

def fetch( self, station: str, wait: bool = True, timeout: int = 10, force: bool = False) -> Optional[str]:
148    def fetch(
149        self, station: str, wait: bool = True, timeout: int = 10, force: bool = False
150    ) -> Optional[str]:
151        """Fetch a report string from the source file
152
153        If wait, this will block if the file is already being updated
154
155        Can force the service to fetch a new file
156        """
157        return aio.run(self.async_fetch(station, wait, timeout, force))

Fetch a report string from the source file

If wait, this will block if the file is already being updated

Can force the service to fetch a new file

async def async_fetch( self, station: str, wait: bool = True, timeout: int = 10, force: bool = False) -> Optional[str]:
159    async def async_fetch(
160        self, station: str, wait: bool = True, timeout: int = 10, force: bool = False
161    ) -> Optional[str]:
162        """Asynchronously fetch a report string from the source file
163
164        If wait, this will block if the file is already being updated
165
166        Can force the service to fetch a new file
167        """
168        valid_station(station)
169        if wait and self._updating:
170            await self._wait_until_updated()
171        if (force or self.is_outdated) and not await self.update(wait, timeout):
172            return None
173        file = self._file
174        if file is None:
175            return None
176        with file.open() as fin:
177            report = self._extract(station, fin)
178        return report

Asynchronously fetch a report string from the source file

If wait, this will block if the file is already being updated

Can force the service to fetch a new file

class NOAA_Forecast(FileService):
181class NOAA_Forecast(FileService):
182    """Subclass for extracting reports from NOAA FTP files"""
183
184    @property
185    def all(self) -> List[str]:
186        """All report strings available after updating"""
187        if self._file is None:
188            return []
189        with self._file.open() as fin:
190            lines = fin.readlines()
191        reports = []
192        report = ""
193        for line in lines:
194            if line := line.strip():
195                report += "\n" + line
196            else:
197                if len(report) > 10:
198                    reports.append(report.strip())
199                report = ""
200        return reports
201
202    def _index_target(self, station: str) -> Tuple[str, str]:
203        raise NotImplementedError()
204
205    def _extract(self, station: str, source: TextIO) -> Optional[str]:
206        """Returns report pulled from the saved file"""
207        start, end = self._index_target(station)
208        txt = source.read()
209        txt = txt[txt.find(start) :]
210        txt = txt[: txt.find(end, 30)]
211        lines = []
212        for line in txt.split("\n"):
213            if "CLIMO" not in line:
214                line = line.strip()
215            if not line:
216                break
217            lines.append(line)
218        return "\n".join(lines) or None

Subclass for extracting reports from NOAA FTP files

all: List[str]

All report strings available after updating

class NOAA_NBM(NOAA_Forecast):
221class NOAA_NBM(NOAA_Forecast):
222    """Requests forecast data from NOAA NBM FTP servers"""
223
224    _url = "https://nomads.ncep.noaa.gov/pub/data/nccf/com/blend/prod/blend.{}/{}/text/blend_{}tx.t{}z"
225    _valid_types = ("nbh", "nbs", "nbe", "nbx")
226
227    @property
228    def _urls(self) -> Iterator[str]:
229        """Iterates through hourly updates no older than two days"""
230        date = dt.datetime.now(tz=dt.timezone.utc)
231        cutoff = date - dt.timedelta(days=1)
232        while date > cutoff:
233            timestamp = date.strftime(r"%Y%m%d")
234            hour = str(date.hour).zfill(2)
235            yield self._url.format(timestamp, hour, self.report_type, hour)
236            date -= dt.timedelta(hours=1)
237
238    def _index_target(self, station: str) -> Tuple[str, str]:
239        return f"{station}   ", f"{self.report_type.upper()} GUIDANCE"

Requests forecast data from NOAA NBM FTP servers

class NOAA_GFS(NOAA_Forecast):
242class NOAA_GFS(NOAA_Forecast):
243    """Requests forecast data from NOAA GFS FTP servers"""
244
245    _url = "https://nomads.ncep.noaa.gov/pub/data/nccf/com/gfs/prod/gfsmos.{}/mdl_gfs{}.t{}z"
246    _valid_types = ("mav", "mex")
247
248    _cycles: Dict[str, Tuple[int, ...]] = {"mav": (0, 6, 12, 18), "mex": (0, 12)}
249
250    @property
251    def _urls(self) -> Iterator[str]:
252        """Iterates through update cycles no older than two days"""
253        warnings.warn(
254            "GFS fetch has been deprecated due to NOAA retiring the format. Migrate to NBM for similar data",
255            DeprecationWarning,
256        )
257        now = dt.datetime.now(tz=dt.timezone.utc)
258        date = dt.datetime.now(tz=dt.timezone.utc)
259        cutoff = date - dt.timedelta(days=1)
260        while date > cutoff:
261            for cycle in reversed(self._cycles[self.report_type]):
262                date = date.replace(hour=cycle)
263                if date > now:
264                    continue
265                timestamp = date.strftime(r"%Y%m%d")
266                hour = str(date.hour).zfill(2)
267                yield self._url.format(timestamp, self.report_type, hour)
268            date -= dt.timedelta(hours=1)
269
270    def _index_target(self, station: str) -> Tuple[str, str]:
271        return f"{station}   GFS", f"{self.report_type.upper()} GUIDANCE"

Requests forecast data from NOAA GFS FTP servers