avwx.service

Report Source Services

AVWX fetches the raw weather reports from third-party services via REST API calls or file downloads. We use Service objects to handle the request and extraction for us.

Basic Module Use

METARs and TAFs are the most widely-supported report types, so an effort has been made to localize some of these services to a regional source. The get_service function was introduced to determine the best service for a given station.

# Fetch Australian reports
station = 'YWOL'
country = 'AU' # can source from avwx.Station.country
# Get the station's preferred service and initialize to fetch METARs
service = avwx.service.get_service(station, country)('metar')
# service is now avwx.service.AUBOM init'd to fetch METARs
# Fetch the current METAR
report = service.fetch(station)

Other report types require specific service classes which are found in their respective submodules. However, you can normally let the report type classes handle these services for you.

Adding a New Service

If the existing services are not supplying the report(s) you need, adding a new service is easy. First, you'll need to determine if your source can be scraped or you need to download a file.

ScrapeService

For web scraping sources, you'll need to do the following things:

  • Add the base URL and method (if not "GET")
  • Implement the ScrapeService._make_url to return the source URL and query parameters
  • Implement the ScrapeService._extract function to return just the report string (starting at the station ID) from the response

Let's look at the MAC service as an example:

class MAC(StationScrape):
    """Requests data from Meteorologia Aeronautica Civil for Columbian stations"""

    _url = "http://meteorologia.aerocivil.gov.co/expert_text_query/parse"
    method = "POST"

    def _make_url(self, station: str) -> tuple[str, dict]:
        """Returns a formatted URL and parameters"""
        return self._url, {"query": f"{self.report_type} {station}"}

    def _extract(self, raw: str, station: str) -> str:
        """Extracts the report message using string finding"""
        return self._simple_extract(raw, f"{station.upper()} ", "=")

Our URL and query parameters are returned using _make_url so fetch knows how to request the report. The result of this query is given to _extract which returns the report or list of reports.

Once your service is created, it can optionally be added to avwx.service.scrape.PREFERRED if the service covers all stations with a known ICAO prefix or avwx.service.scrape.BY_COUNTRY if the service covers all stations in a single country. This is how avwx.service.get_service determines the preferred service. For example, the MAC service is preferred over NOAA for all ICAOs starting with "SK" while AUBOM is better for all Australian stations.

FileService

For file-based sources, you'll need to do the following things:

  • Add the base URL and valid report types
  • Implement the FileService._urls to iterate through source URLs
  • Implement the FileService._extract function to return just the report string (starting at the station ID) from the response

Let's look at the NOAA_NBM service as an example:

class NOAA_NBM(FileService):
    """Requests forecast data from NOAA NBM FTP servers"""

    _url = "https://nomads.ncep.noaa.gov/pub/data/nccf/com/blend/prod/blend.{}/{}/text/blend_{}tx.t{}z"
    _valid_types = ("nbh", "nbs", "nbe")

    @property
    def _urls(self) -> Iterator[str]:
        """Iterates through hourly updates no older than two days"""
        date = dt.datetime.now(tz=dt.timezone.utc)
        cutoff = date - dt.timedelta(days=1)
        while date > cutoff:
            timestamp = date.strftime(r"%Y%m%d")
            hour = str(date.hour).zfill(2)
            yield self.url.format(timestamp, hour, self.report_type, hour)
            date -= dt.timedelta(hours=1)

    def _extract(self, station: str, source: TextIO) -> Optional[str]:
        """Returns report pulled from the saved file"""
        start = station + "   "
        end = self.report_type.upper() + " GUIDANCE"
        txt = source.read()
        txt = txt[txt.find(start) :]
        txt = txt[: txt.find(end, 30)]
        lines = []
        for line in txt.split("\n"):
            if "CLIMO" not in line:
                line = line.strip()
            if not line:
                break
            lines.append(line)
        return "\n".join(lines) or None

In this example, we iterate through _urls looking for the most recent published file. URL iterators should always have a lower bound to stop iteration so the service can return a null response.

Once the file is downloaded, the requested station and file-like object are passed to the _extract method to find and return the report from the file. This method will not be called if the file doesn't exist.

 1""".. include:: ../../docs/service.md"""
 2
 3from avwx.service.base import Service
 4from avwx.service.files import NoaaGfs, NoaaNbm
 5from avwx.service.scrape import (
 6    Amo,
 7    Aubom,
 8    Avt,
 9    FaaNotam,
10    Mac,
11    Nam,
12    Noaa,
13    Olbs,
14    get_service,
15)
16
17__all__ = (
18    "get_service",
19    "Noaa",
20    "Amo",
21    "Aubom",
22    "Avt",
23    "Mac",
24    "Nam",
25    "Olbs",
26    "FaaNotam",
27    "NoaaGfs",
28    "NoaaNbm",
29    "Service",
30)
def get_service(station: str, country_code: str) -> avwx.service.scrape.ScrapeService:
543def get_service(station: str, country_code: str) -> ScrapeService:
544    """Return the preferred scrape service for a given station.
545
546    ```python
547    # Fetch Australian reports
548    station = "YWOL"
549    country = "AU"  # can source from avwx.Station.country
550    # Get the station's preferred service and initialize to fetch METARs
551    service = avwx.service.get_service(station, country)("metar")
552    # service is now avwx.service.Aubom init'd to fetch METARs
553    # Fetch the current METAR
554    report = service.fetch(station)
555    ```
556    """
557    with suppress(KeyError):
558        return PREFERRED[station[:2]]  # type: ignore
559    return BY_COUNTRY.get(country_code, Noaa)  # type: ignore

Return the preferred scrape service for a given station.

# Fetch Australian reports
station = "YWOL"
country = "AU"  # can source from avwx.Station.country
# Get the station's preferred service and initialize to fetch METARs
service = avwx.service.get_service(station, country)("metar")
# service is now avwx.service.Aubom init'd to fetch METARs
# Fetch the current METAR
report = service.fetch(station)
Noaa = <class 'avwx.service.scrape.NoaaApi'>
class Amo(avwx.service.scrape.StationScrape):
239class Amo(StationScrape):
240    """Request data from AMO KMA for Korean stations."""
241
242    _url = "http://amoapi.kma.go.kr/amoApi/{}"
243    default_timeout = 60
244
245    def _make_url(self, station: str) -> tuple[str, dict]:
246        """Return a formatted URL and parameters."""
247        return self._url.format(self.report_type), {"icao": station}
248
249    def _extract(self, raw: str, station: str) -> str:  # noqa: ARG002
250        """Extract the report message from XML response."""
251        resp = parsexml(raw)
252        try:
253            report = resp["response"]["body"]["items"]["item"][f"{self.report_type.lower()}Msg"]
254        except KeyError as key_error:
255            raise self._make_err(raw) from key_error
256        if not report:
257            msg = "The station might not exist"
258            raise self._make_err(msg)
259        # Replace line breaks
260        report = report.replace("\n", "")
261        # Remove excess leading and trailing data
262        for item in (self.report_type.upper(), "SPECI"):
263            if report.startswith(f"{item} "):
264                report = report[len(item) + 1 :]
265        report = report.rstrip("=")
266        # Make every element single-spaced and stripped
267        return " ".join(report.split())

Request data from AMO KMA for Korean stations.

default_timeout = 60
class Aubom(avwx.service.scrape.StationScrape):
290class Aubom(StationScrape):
291    """Request data from the Australian Bureau of Meteorology."""
292
293    _url = "http://www.bom.gov.au/aviation/php/process.php"
294    method = "POST"
295
296    @staticmethod
297    def _make_headers() -> dict:
298        """Return request headers."""
299        return {
300            "Content-Type": "application/x-www-form-urlencoded",
301            "Accept": "*/*",
302            "Accept-Language": "en-us",
303            "Accept-Encoding": "gzip, deflate",
304            "Host": "www.bom.gov.au",
305            "Origin": "http://www.bom.gov.au",
306            "User-Agent": secrets.choice(_USER_AGENTS),
307            "Connection": "keep-alive",
308        }
309
310    def _post_data(self, station: str) -> dict:
311        """Return the POST form."""
312        return {"keyword": station, "type": "search", "page": "TAF"}
313
314    def _extract(self, raw: str, station: str) -> str:  # noqa: ARG002
315        """Extract the reports from HTML response."""
316        index = 1 if self.report_type == "taf" else 2
317        try:
318            report = raw.split("<p")[index]
319            report = report[report.find(">") + 1 :]
320        except IndexError as index_error:
321            msg = "The station might not exist"
322            raise self._make_err(msg) from index_error
323        if report.startswith("<"):
324            return ""
325        report = report[: report.find("</p>")]
326        return report.replace("<br />", " ")

Request data from the Australian Bureau of Meteorology.

method = 'POST'
class Avt(avwx.service.scrape.StationScrape):
391class Avt(StationScrape):
392    """Request data from AVT/XiamenAir for China.
393    NOTE: This should be replaced later with a gov+https source.
394    """
395
396    _url = "http://www.avt7.com/Home/AirportMetarInfo?airport4Code="
397
398    def _make_url(self, station: str) -> tuple[str, dict]:
399        """Return a formatted URL and empty parameters."""
400        return self._url + station, {}
401
402    def _extract(self, raw: str, station: str) -> str:  # noqa: ARG002
403        """Extract the reports from HTML response."""
404        try:
405            data = json.loads(raw)
406            key = f"{self.report_type.lower()}ContentList"
407            text: str = data[key]["rows"][0]["content"]
408        except (TypeError, json.decoder.JSONDecodeError, KeyError, IndexError):
409            return ""
410        else:
411            return text

Request data from AVT/XiamenAir for China. NOTE: This should be replaced later with a gov+https source.

class Mac(avwx.service.scrape.StationScrape):
270class Mac(StationScrape):
271    """Request data from Meteorologia Aeronautica Civil for Columbian stations."""
272
273    _url = "https://meteorologia.aerocivil.gov.co/expert_text_query/parse"
274    method = "POST"
275
276    @staticmethod
277    def _make_headers() -> dict:
278        """Return request headers."""
279        return {"X-Requested-With": "XMLHttpRequest"}
280
281    def _post_data(self, station: str) -> dict:
282        """Return the POST form/data payload."""
283        return {"query": f"{self.report_type} {station}"}
284
285    def _extract(self, raw: str, station: str) -> str:
286        """Extract the report message using string finding."""
287        return self._simple_extract(raw, f"{station.upper()} ", "=")

Request data from Meteorologia Aeronautica Civil for Columbian stations.

method = 'POST'
class Nam(avwx.service.scrape.StationScrape):
372class Nam(StationScrape):
373    """Request data from NorthAviMet for North Atlantic and Nordic countries."""
374
375    _url = "https://www.northavimet.com/NamConWS/rest/opmet/command/0/"
376
377    def _make_url(self, station: str) -> tuple[str, dict]:
378        """Return a formatted URL and empty parameters."""
379        return self._url + station, {}
380
381    def _extract(self, raw: str, station: str) -> str:
382        """Extract the reports from HTML response."""
383        starts = [f">{self.report_type.upper()} <", f">{station.upper()}<", "top'>"]
384        report = self._simple_extract(raw, starts, "=")
385        index = report.rfind(">")
386        if index > -1:
387            report = report[index + 1 :]
388        return f"{station} {report.strip()}"

Request data from NorthAviMet for North Atlantic and Nordic countries.

class Olbs(avwx.service.scrape.StationScrape):
329class Olbs(StationScrape):
330    """Request data from India OLBS flight briefing."""
331
332    # _url = "https://olbs.amsschennai.gov.in/nsweb/FlightBriefing/showopmetquery.php"
333    # method = "POST"
334
335    # Temp redirect
336    _url = "https://avbrief3.el.r.appspot.com/"
337
338    def _make_url(self, station: str) -> tuple[str, dict]:
339        """Return a formatted URL and empty parameters."""
340        return self._url, {"icao": station}
341
342    def _post_data(self, station: str) -> dict:
343        """Return the POST form."""
344        # Can set icaos to "V*" to return all results
345        return {"icaos": station, "type": self.report_type}
346
347    @staticmethod
348    def _make_headers() -> dict:
349        """Return request headers."""
350        return {
351            # "Content-Type": "application/x-www-form-urlencoded",
352            # "Accept": "text/html, */*; q=0.01",
353            # "Accept-Language": "en-us",
354            "Accept-Encoding": "gzip, deflate, br",
355            # "Host": "olbs.amsschennai.gov.in",
356            "User-Agent": secrets.choice(_USER_AGENTS),
357            "Connection": "keep-alive",
358            # "Referer": "https://olbs.amsschennai.gov.in/nsweb/FlightBriefing/",
359            # "X-Requested-With": "XMLHttpRequest",
360            "Accept-Language": "en-US,en;q=0.9",
361            "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
362            "Referer": "https://avbrief3.el.r.appspot.com/",
363            "Host": "avbrief3.el.r.appspot.com",
364        }
365
366    def _extract(self, raw: str, station: str) -> str:
367        """Extract the reports from HTML response."""
368        # start = raw.find(f"{self.report_type.upper()} {station} ")
369        return self._simple_extract(raw, [f">{self.report_type.upper()}</div>", station], ["=", "<"])

Request data from India OLBS flight briefing.

class FaaNotam(avwx.service.scrape.ScrapeService):
422class FaaNotam(ScrapeService):
423    """Source NOTAMs from official FAA portal."""
424
425    _url = "https://notams.aim.faa.gov/notamSearch/search"
426    method = "POST"
427    _valid_types = ("notam",)
428
429    @staticmethod
430    def _make_headers() -> dict:
431        return {"Content-Type": "application/x-www-form-urlencoded"}
432
433    @staticmethod
434    def _split_coord(prefix: str, value: float) -> dict:
435        """Add coordinate deg/min/sec fields per float value."""
436        degree, minute, second = Coord.to_dms(value)
437        if prefix == "lat":
438            key = "latitude"
439            direction = "N" if degree >= 0 else "S"
440        else:
441            key = "longitude"
442            direction = "E" if degree >= 0 else "W"
443        return {
444            f"{prefix}Degrees": abs(degree),
445            f"{prefix}Minutes": minute,
446            f"{prefix}Seconds": second,
447            f"{key}Direction": direction,
448        }
449
450    def _post_for(
451        self,
452        icao: str | None = None,
453        coord: Coord | None = None,
454        path: list[str] | None = None,
455        radius: int = 10,
456    ) -> dict:
457        """Generate POST payload for search params in location order."""
458        data: dict[str, Any] = {"notamsOnly": False, "radius": radius}
459        if icao:
460            data["searchType"] = 0
461            data["designatorsForLocation"] = icao
462        elif coord:
463            data["searchType"] = 3
464            data["radiusSearchOnDesignator"] = False
465            data |= self._split_coord("lat", coord.lat)
466            data |= self._split_coord("long", coord.lon)
467        elif path:
468            data["searchType"] = 6
469            data["flightPathText"] = " ".join(path)
470            data["flightPathBuffer"] = radius
471            data["flightPathIncludeNavaids"] = True
472            data["flightPathIncludeArtcc"] = False
473            data["flightPathIncludeTfr"] = True
474            data["flightPathIncludeRegulatory"] = False
475            data["flightPathResultsType"] = "All NOTAMs"
476        else:
477            msg = "Not enough info to request NOTAM data"
478            raise InvalidRequest(msg)
479        return data
480
481    def fetch(
482        self,
483        icao: str | None = None,
484        coord: Coord | None = None,
485        path: list[str] | None = None,
486        radius: int = 10,
487        timeout: int = 10,
488    ) -> list[str]:
489        """Fetch NOTAM list from the service via ICAO, coordinate, or ident path."""
490        return aio.run(self.async_fetch(icao, coord, path, radius, timeout))
491
492    async def async_fetch(
493        self,
494        icao: str | None = None,
495        coord: Coord | None = None,
496        path: list[str] | None = None,
497        radius: int = 10,
498        timeout: int = 10,
499    ) -> list[str]:
500        """Async fetch NOTAM list from the service via ICAO, coordinate, or ident path."""
501        headers = self._make_headers()
502        data = self._post_for(icao, coord, path, radius)
503        notams = []
504        while True:
505            text = await self._call(self._url, None, headers, data, timeout)
506            resp: dict = json.loads(text)
507            if resp.get("error"):
508                msg = "Search criteria appears to be invalid"
509                raise self._make_err(msg)
510            for item in resp["notamList"]:
511                if report := item.get("icaoMessage", "").strip():
512                    report = _TAG_PATTERN.sub("", report).strip()
513                    if issued := item.get("issueDate"):
514                        report = f"{issued}||{report}"
515                    notams.append(report)
516            offset = resp["endRecordCount"]
517            if not notams or offset >= resp["totalNotamCount"]:
518                break
519            data["offset"] = offset
520        return notams

Source NOTAMs from official FAA portal.

method = 'POST'
def fetch( self, icao: str | None = None, coord: avwx.structs.Coord | None = None, path: list[str] | None = None, radius: int = 10, timeout: int = 10) -> list[str]:
481    def fetch(
482        self,
483        icao: str | None = None,
484        coord: Coord | None = None,
485        path: list[str] | None = None,
486        radius: int = 10,
487        timeout: int = 10,
488    ) -> list[str]:
489        """Fetch NOTAM list from the service via ICAO, coordinate, or ident path."""
490        return aio.run(self.async_fetch(icao, coord, path, radius, timeout))

Fetch NOTAM list from the service via ICAO, coordinate, or ident path.

async def async_fetch( self, icao: str | None = None, coord: avwx.structs.Coord | None = None, path: list[str] | None = None, radius: int = 10, timeout: int = 10) -> list[str]:
492    async def async_fetch(
493        self,
494        icao: str | None = None,
495        coord: Coord | None = None,
496        path: list[str] | None = None,
497        radius: int = 10,
498        timeout: int = 10,
499    ) -> list[str]:
500        """Async fetch NOTAM list from the service via ICAO, coordinate, or ident path."""
501        headers = self._make_headers()
502        data = self._post_for(icao, coord, path, radius)
503        notams = []
504        while True:
505            text = await self._call(self._url, None, headers, data, timeout)
506            resp: dict = json.loads(text)
507            if resp.get("error"):
508                msg = "Search criteria appears to be invalid"
509                raise self._make_err(msg)
510            for item in resp["notamList"]:
511                if report := item.get("icaoMessage", "").strip():
512                    report = _TAG_PATTERN.sub("", report).strip()
513                    if issued := item.get("issueDate"):
514                        report = f"{issued}||{report}"
515                    notams.append(report)
516            offset = resp["endRecordCount"]
517            if not notams or offset >= resp["totalNotamCount"]:
518                break
519            data["offset"] = offset
520        return notams

Async fetch NOTAM list from the service via ICAO, coordinate, or ident path.

class NoaaGfs(avwx.service.files.NoaaForecast):
241class NoaaGfs(NoaaForecast):
242    """Request forecast data from NOAA GFS FTP servers."""
243
244    _url = "https://nomads.ncep.noaa.gov/pub/data/nccf/com/gfs/prod/gfsmos.{}/mdl_gfs{}.t{}z"
245    _valid_types = ("mav", "mex")
246
247    _cycles: ClassVar[dict[str, tuple[int, ...]]] = {"mav": (0, 6, 12, 18), "mex": (0, 12)}
248
249    @property
250    def _urls(self) -> Iterator[str]:
251        """Iterate through update cycles no older than two days."""
252        warnings.warn(
253            "GFS fetch has been deprecated due to NOAA retiring the format. Migrate to NBM for similar data",
254            DeprecationWarning,
255            stacklevel=2,
256        )
257        now = dt.datetime.now(tz=dt.timezone.utc)
258        date = dt.datetime.now(tz=dt.timezone.utc)
259        cutoff = date - dt.timedelta(days=1)
260        while date > cutoff:
261            for cycle in reversed(self._cycles[self.report_type]):
262                date = date.replace(hour=cycle)
263                if date > now:
264                    continue
265                timestamp = date.strftime(r"%Y%m%d")
266                hour = str(date.hour).zfill(2)
267                yield self._url.format(timestamp, self.report_type, hour)
268            date -= dt.timedelta(hours=1)
269
270    def _index_target(self, station: str) -> tuple[str, str]:
271        return f"{station}   GFS", f"{self.report_type.upper()} GUIDANCE"

Request forecast data from NOAA GFS FTP servers.

class NoaaNbm(avwx.service.files.NoaaForecast):
220class NoaaNbm(NoaaForecast):
221    """Request forecast data from NOAA NBM FTP servers."""
222
223    _url = "https://nomads.ncep.noaa.gov/pub/data/nccf/com/blend/prod/blend.{}/{}/text/blend_{}tx.t{}z"
224    _valid_types = ("nbh", "nbs", "nbe", "nbx")
225
226    @property
227    def _urls(self) -> Iterator[str]:
228        """Iterate through hourly updates no older than two days."""
229        date = dt.datetime.now(tz=dt.timezone.utc)
230        cutoff = date - dt.timedelta(days=1)
231        while date > cutoff:
232            timestamp = date.strftime(r"%Y%m%d")
233            hour = str(date.hour).zfill(2)
234            yield self._url.format(timestamp, hour, self.report_type, hour)
235            date -= dt.timedelta(hours=1)
236
237    def _index_target(self, station: str) -> tuple[str, str]:
238        return f"{station}   ", f"{self.report_type.upper()} GUIDANCE"

Request forecast data from NOAA NBM FTP servers.

class Service:
35class Service:
36    """Base Service class for fetching reports."""
37
38    report_type: str
39    _url: ClassVar[str] = ""
40    _valid_types: ClassVar[tuple[str, ...]] = ()
41
42    def __init__(self, report_type: str):
43        if self._valid_types and report_type not in self._valid_types:
44            msg = f"'{report_type}' is not a valid report type for {self.__class__.__name__}. Expected {self._valid_types}"
45            raise ValueError(msg)
46        self.report_type = report_type
47
48    @property
49    def root(self) -> str | None:
50        """Return the service's root URL."""
51        if self._url is None:
52            return None
53        url = self._url[self._url.find("//") + 2 :]
54        return url[: url.find("/")]

Base Service class for fetching reports.

Service(report_type: str)
42    def __init__(self, report_type: str):
43        if self._valid_types and report_type not in self._valid_types:
44            msg = f"'{report_type}' is not a valid report type for {self.__class__.__name__}. Expected {self._valid_types}"
45            raise ValueError(msg)
46        self.report_type = report_type
report_type: str
root: str | None
48    @property
49    def root(self) -> str | None:
50        """Return the service's root URL."""
51        if self._url is None:
52            return None
53        url = self._url[self._url.find("//") + 2 :]
54        return url[: url.find("/")]

Return the service's root URL.