avwx.service

Report Source Services

AVWX fetches the raw weather reports from third-party services via REST API calls or file downloads. We use Service objects to handle the request and extraction for us.

Basic Module Use

METARs and TAFs are the most widely-supported report types, so an effort has been made to localize some of these services to a regional source. The get_service function was introduced to determine the best service for a given station.

# Fetch Australian reports
station = 'YWOL'
country = 'AU' # can source from avwx.Station.country
# Get the station's preferred service and initialize to fetch METARs
service = avwx.service.get_service(station, country)('metar')
# service is now avwx.service.AUBOM init'd to fetch METARs
# Fetch the current METAR
report = service.fetch(station)

Other report types require specific service classes which are found in their respective submodules. However, you can normally let the report type classes handle these services for you.

Adding a New Service

If the existing services are not supplying the report(s) you need, adding a new service is easy. First, you'll need to determine if your source can be scraped or you need to download a file.

ScrapeService

For web scraping sources, you'll need to do the following things:

  • Add the base URL and method (if not "GET")
  • Implement the ScrapeService._make_url to return the source URL and query parameters
  • Implement the ScrapeService._extract function to return just the report string (starting at the station ID) from the response

Let's look at the MAC service as an example:

class MAC(StationScrape):
    """Requests data from Meteorologia Aeronautica Civil for Columbian stations"""

    _url = "http://meteorologia.aerocivil.gov.co/expert_text_query/parse"
    method = "POST"

    def _make_url(self, station: str) -> tuple[str, dict]:
        """Returns a formatted URL and parameters"""
        return self._url, {"query": f"{self.report_type} {station}"}

    def _extract(self, raw: str, station: str) -> str:
        """Extracts the report message using string finding"""
        return self._simple_extract(raw, f"{station.upper()} ", "=")

Our URL and query parameters are returned using _make_url so fetch knows how to request the report. The result of this query is given to _extract which returns the report or list of reports.

Once your service is created, it can optionally be added to avwx.service.scrape.PREFERRED if the service covers all stations with a known ICAO prefix or avwx.service.scrape.BY_COUNTRY if the service covers all stations in a single country. This is how avwx.service.get_service determines the preferred service. For example, the MAC service is preferred over NOAA for all ICAOs starting with "SK" while AUBOM is better for all Australian stations.

FileService

For file-based sources, you'll need to do the following things:

  • Add the base URL and valid report types
  • Implement the FileService._urls to iterate through source URLs
  • Implement the FileService._extract function to return just the report string (starting at the station ID) from the response

Let's look at the NOAA_NBM service as an example:

class NOAA_NBM(FileService):
    """Requests forecast data from NOAA NBM FTP servers"""

    _url = "https://nomads.ncep.noaa.gov/pub/data/nccf/com/blend/prod/blend.{}/{}/text/blend_{}tx.t{}z"
    _valid_types = ("nbh", "nbs", "nbe")

    @property
    def _urls(self) -> Iterator[str]:
        """Iterates through hourly updates no older than two days"""
        date = dt.datetime.now(tz=dt.timezone.utc)
        cutoff = date - dt.timedelta(days=1)
        while date > cutoff:
            timestamp = date.strftime(r"%Y%m%d")
            hour = str(date.hour).zfill(2)
            yield self.url.format(timestamp, hour, self.report_type, hour)
            date -= dt.timedelta(hours=1)

    def _extract(self, station: str, source: TextIO) -> Optional[str]:
        """Returns report pulled from the saved file"""
        start = station + "   "
        end = self.report_type.upper() + " GUIDANCE"
        txt = source.read()
        txt = txt[txt.find(start) :]
        txt = txt[: txt.find(end, 30)]
        lines = []
        for line in txt.split("\n"):
            if "CLIMO" not in line:
                line = line.strip()
            if not line:
                break
            lines.append(line)
        return "\n".join(lines) or None

In this example, we iterate through _urls looking for the most recent published file. URL iterators should always have a lower bound to stop iteration so the service can return a null response.

Once the file is downloaded, the requested station and file-like object are passed to the _extract method to find and return the report from the file. This method will not be called if the file doesn't exist.

 1""".. include:: ../../docs/service.md"""
 2
 3from avwx.service.base import Service
 4from avwx.service.files import NoaaGfs, NoaaNbm
 5from avwx.service.scrape import (
 6    Amo,
 7    Aubom,
 8    Avt,
 9    FaaNotam,
10    Mac,
11    Nam,
12    Noaa,
13    Olbs,
14    get_service,
15)
16
17__all__ = (
18    "get_service",
19    "Noaa",
20    "Amo",
21    "Aubom",
22    "Avt",
23    "Mac",
24    "Nam",
25    "Olbs",
26    "FaaNotam",
27    "NoaaGfs",
28    "NoaaNbm",
29    "Service",
30)
def get_service(station: str, country_code: str) -> avwx.service.scrape.ScrapeService:
545def get_service(station: str, country_code: str) -> ScrapeService:
546    """Return the preferred scrape service for a given station.
547
548    ```python
549    # Fetch Australian reports
550    station = "YWOL"
551    country = "AU"  # can source from avwx.Station.country
552    # Get the station's preferred service and initialize to fetch METARs
553    service = avwx.service.get_service(station, country)("metar")
554    # service is now avwx.service.Aubom init'd to fetch METARs
555    # Fetch the current METAR
556    report = service.fetch(station)
557    ```
558    """
559    with suppress(KeyError):
560        return PREFERRED[station[:2]]  # type: ignore
561    return BY_COUNTRY.get(country_code, Noaa)  # type: ignore

Return the preferred scrape service for a given station.

# Fetch Australian reports
station = "YWOL"
country = "AU"  # can source from avwx.Station.country
# Get the station's preferred service and initialize to fetch METARs
service = avwx.service.get_service(station, country)("metar")
# service is now avwx.service.Aubom init'd to fetch METARs
# Fetch the current METAR
report = service.fetch(station)
Noaa = <class 'avwx.service.scrape.NoaaApi'>
class Amo(avwx.service.scrape.StationScrape):
241class Amo(StationScrape):
242    """Request data from AMO KMA for Korean stations."""
243
244    _url = "http://amoapi.kma.go.kr/amoApi/{}"
245    default_timeout = 60
246
247    def _make_url(self, station: str) -> tuple[str, dict]:
248        """Return a formatted URL and parameters."""
249        return self._url.format(self.report_type), {"icao": station}
250
251    def _extract(self, raw: str, station: str) -> str:  # noqa: ARG002
252        """Extract the report message from XML response."""
253        resp = parsexml(raw)
254        try:
255            report = resp["response"]["body"]["items"]["item"][f"{self.report_type.lower()}Msg"]
256        except KeyError as key_error:
257            raise self._make_err(raw) from key_error
258        if not report:
259            msg = "The station might not exist"
260            raise self._make_err(msg)
261        # Replace line breaks
262        report = report.replace("\n", "")
263        # Remove excess leading and trailing data
264        for item in (self.report_type.upper(), "SPECI"):
265            if report.startswith(f"{item} "):
266                report = report[len(item) + 1 :]
267        report = report.rstrip("=")
268        # Make every element single-spaced and stripped
269        return " ".join(report.split())

Request data from AMO KMA for Korean stations.

default_timeout = 60
class Aubom(avwx.service.scrape.StationScrape):
292class Aubom(StationScrape):
293    """Request data from the Australian Bureau of Meteorology."""
294
295    _url = "http://www.bom.gov.au/aviation/php/process.php"
296    method = "POST"
297
298    @staticmethod
299    def _make_headers() -> dict:
300        """Return request headers."""
301        return {
302            "Content-Type": "application/x-www-form-urlencoded",
303            "Accept": "*/*",
304            "Accept-Language": "en-us",
305            "Accept-Encoding": "gzip, deflate",
306            "Host": "www.bom.gov.au",
307            "Origin": "http://www.bom.gov.au",
308            "User-Agent": secrets.choice(_USER_AGENTS),
309            "Connection": "keep-alive",
310        }
311
312    def _post_data(self, station: str) -> dict:
313        """Return the POST form."""
314        return {"keyword": station, "type": "search", "page": "TAF"}
315
316    def _extract(self, raw: str, station: str) -> str:  # noqa: ARG002
317        """Extract the reports from HTML response."""
318        index = 1 if self.report_type == "taf" else 2
319        try:
320            report = raw.split("<p")[index]
321            report = report[report.find(">") + 1 :]
322        except IndexError as index_error:
323            msg = "The station might not exist"
324            raise self._make_err(msg) from index_error
325        if report.startswith("<"):
326            return ""
327        report = report[: report.find("</p>")]
328        return report.replace("<br />", " ")

Request data from the Australian Bureau of Meteorology.

method = 'POST'
class Avt(avwx.service.scrape.StationScrape):
393class Avt(StationScrape):
394    """Request data from AVT/XiamenAir for China.
395    NOTE: This should be replaced later with a gov+https source.
396    """
397
398    _url = "http://www.avt7.com/Home/AirportMetarInfo?airport4Code="
399
400    def _make_url(self, station: str) -> tuple[str, dict]:
401        """Return a formatted URL and empty parameters."""
402        return self._url + station, {}
403
404    def _extract(self, raw: str, station: str) -> str:  # noqa: ARG002
405        """Extract the reports from HTML response."""
406        try:
407            data = json.loads(raw)
408            key = f"{self.report_type.lower()}ContentList"
409            text: str = data[key]["rows"][0]["content"]
410        except (TypeError, json.decoder.JSONDecodeError, KeyError, IndexError):
411            return ""
412        else:
413            return text

Request data from AVT/XiamenAir for China. NOTE: This should be replaced later with a gov+https source.

class Mac(avwx.service.scrape.StationScrape):
272class Mac(StationScrape):
273    """Request data from Meteorologia Aeronautica Civil for Columbian stations."""
274
275    _url = "https://meteorologia.aerocivil.gov.co/expert_text_query/parse"
276    method = "POST"
277
278    @staticmethod
279    def _make_headers() -> dict:
280        """Return request headers."""
281        return {"X-Requested-With": "XMLHttpRequest"}
282
283    def _post_data(self, station: str) -> dict:
284        """Return the POST form/data payload."""
285        return {"query": f"{self.report_type} {station}"}
286
287    def _extract(self, raw: str, station: str) -> str:
288        """Extract the report message using string finding."""
289        return self._simple_extract(raw, f"{station.upper()} ", "=")

Request data from Meteorologia Aeronautica Civil for Columbian stations.

method = 'POST'
class Nam(avwx.service.scrape.StationScrape):
374class Nam(StationScrape):
375    """Request data from NorthAviMet for North Atlantic and Nordic countries."""
376
377    _url = "https://www.northavimet.com/NamConWS/rest/opmet/command/0/"
378
379    def _make_url(self, station: str) -> tuple[str, dict]:
380        """Return a formatted URL and empty parameters."""
381        return self._url + station, {}
382
383    def _extract(self, raw: str, station: str) -> str:
384        """Extract the reports from HTML response."""
385        starts = [f">{self.report_type.upper()} <", f">{station.upper()}<", "top'>"]
386        report = self._simple_extract(raw, starts, "=")
387        index = report.rfind(">")
388        if index > -1:
389            report = report[index + 1 :]
390        return f"{station} {report.strip()}"

Request data from NorthAviMet for North Atlantic and Nordic countries.

class Olbs(avwx.service.scrape.StationScrape):
331class Olbs(StationScrape):
332    """Request data from India OLBS flight briefing."""
333
334    # _url = "https://olbs.amsschennai.gov.in/nsweb/FlightBriefing/showopmetquery.php"
335    # method = "POST"
336
337    # Temp redirect
338    _url = "https://avbrief3.el.r.appspot.com/"
339
340    def _make_url(self, station: str) -> tuple[str, dict]:
341        """Return a formatted URL and empty parameters."""
342        return self._url, {"icao": station}
343
344    def _post_data(self, station: str) -> dict:
345        """Return the POST form."""
346        # Can set icaos to "V*" to return all results
347        return {"icaos": station, "type": self.report_type}
348
349    @staticmethod
350    def _make_headers() -> dict:
351        """Return request headers."""
352        return {
353            # "Content-Type": "application/x-www-form-urlencoded",
354            # "Accept": "text/html, */*; q=0.01",
355            # "Accept-Language": "en-us",
356            "Accept-Encoding": "gzip, deflate, br",
357            # "Host": "olbs.amsschennai.gov.in",
358            "User-Agent": secrets.choice(_USER_AGENTS),
359            "Connection": "keep-alive",
360            # "Referer": "https://olbs.amsschennai.gov.in/nsweb/FlightBriefing/",
361            # "X-Requested-With": "XMLHttpRequest",
362            "Accept-Language": "en-US,en;q=0.9",
363            "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
364            "Referer": "https://avbrief3.el.r.appspot.com/",
365            "Host": "avbrief3.el.r.appspot.com",
366        }
367
368    def _extract(self, raw: str, station: str) -> str:
369        """Extract the reports from HTML response."""
370        # start = raw.find(f"{self.report_type.upper()} {station} ")
371        return self._simple_extract(raw, [f">{self.report_type.upper()}</div>", station], ["=", "<"])

Request data from India OLBS flight briefing.

class FaaNotam(avwx.service.scrape.ScrapeService):
424class FaaNotam(ScrapeService):
425    """Source NOTAMs from official FAA portal."""
426
427    _url = "https://notams.aim.faa.gov/notamSearch/search"
428    method = "POST"
429    _valid_types = ("notam",)
430
431    @staticmethod
432    def _make_headers() -> dict:
433        return {"Content-Type": "application/x-www-form-urlencoded"}
434
435    @staticmethod
436    def _split_coord(prefix: str, value: float) -> dict:
437        """Add coordinate deg/min/sec fields per float value."""
438        degree, minute, second = Coord.to_dms(value)
439        if prefix == "lat":
440            key = "latitude"
441            direction = "N" if degree >= 0 else "S"
442        else:
443            key = "longitude"
444            direction = "E" if degree >= 0 else "W"
445        return {
446            f"{prefix}Degrees": abs(degree),
447            f"{prefix}Minutes": minute,
448            f"{prefix}Seconds": second,
449            f"{key}Direction": direction,
450        }
451
452    def _post_for(
453        self,
454        icao: str | None = None,
455        coord: Coord | None = None,
456        path: list[str] | None = None,
457        radius: int = 10,
458    ) -> dict:
459        """Generate POST payload for search params in location order."""
460        data: dict[str, Any] = {"notamsOnly": False, "radius": radius}
461        if icao:
462            data["searchType"] = 0
463            data["designatorsForLocation"] = icao
464        elif coord:
465            data["searchType"] = 3
466            data["radiusSearchOnDesignator"] = False
467            data |= self._split_coord("lat", coord.lat)
468            data |= self._split_coord("long", coord.lon)
469        elif path:
470            data["searchType"] = 6
471            data["flightPathText"] = " ".join(path)
472            data["flightPathBuffer"] = radius
473            data["flightPathIncludeNavaids"] = True
474            data["flightPathIncludeArtcc"] = False
475            data["flightPathIncludeTfr"] = True
476            data["flightPathIncludeRegulatory"] = False
477            data["flightPathResultsType"] = "All NOTAMs"
478        else:
479            msg = "Not enough info to request NOTAM data"
480            raise InvalidRequest(msg)
481        return data
482
483    def fetch(
484        self,
485        icao: str | None = None,
486        coord: Coord | None = None,
487        path: list[str] | None = None,
488        radius: int = 10,
489        timeout: int = 10,
490    ) -> list[str]:
491        """Fetch NOTAM list from the service via ICAO, coordinate, or ident path."""
492        return aio.run(self.async_fetch(icao, coord, path, radius, timeout))
493
494    async def async_fetch(
495        self,
496        icao: str | None = None,
497        coord: Coord | None = None,
498        path: list[str] | None = None,
499        radius: int = 10,
500        timeout: int = 10,
501    ) -> list[str]:
502        """Async fetch NOTAM list from the service via ICAO, coordinate, or ident path."""
503        headers = self._make_headers()
504        data = self._post_for(icao, coord, path, radius)
505        notams = []
506        while True:
507            text = await self._call(self._url, None, headers, data, timeout)
508            resp: dict = json.loads(text)
509            if resp.get("error"):
510                msg = "Search criteria appears to be invalid"
511                raise self._make_err(msg)
512            for item in resp["notamList"]:
513                if report := item.get("icaoMessage", "").strip():
514                    report = _TAG_PATTERN.sub("", report).strip()
515                    if issued := item.get("issueDate"):
516                        report = f"{issued}||{report}"
517                    notams.append(report)
518            offset = resp["endRecordCount"]
519            if not notams or offset >= resp["totalNotamCount"]:
520                break
521            data["offset"] = offset
522        return notams

Source NOTAMs from official FAA portal.

method = 'POST'
def fetch( self, icao: str | None = None, coord: avwx.structs.Coord | None = None, path: list[str] | None = None, radius: int = 10, timeout: int = 10) -> list[str]:
483    def fetch(
484        self,
485        icao: str | None = None,
486        coord: Coord | None = None,
487        path: list[str] | None = None,
488        radius: int = 10,
489        timeout: int = 10,
490    ) -> list[str]:
491        """Fetch NOTAM list from the service via ICAO, coordinate, or ident path."""
492        return aio.run(self.async_fetch(icao, coord, path, radius, timeout))

Fetch NOTAM list from the service via ICAO, coordinate, or ident path.

async def async_fetch( self, icao: str | None = None, coord: avwx.structs.Coord | None = None, path: list[str] | None = None, radius: int = 10, timeout: int = 10) -> list[str]:
494    async def async_fetch(
495        self,
496        icao: str | None = None,
497        coord: Coord | None = None,
498        path: list[str] | None = None,
499        radius: int = 10,
500        timeout: int = 10,
501    ) -> list[str]:
502        """Async fetch NOTAM list from the service via ICAO, coordinate, or ident path."""
503        headers = self._make_headers()
504        data = self._post_for(icao, coord, path, radius)
505        notams = []
506        while True:
507            text = await self._call(self._url, None, headers, data, timeout)
508            resp: dict = json.loads(text)
509            if resp.get("error"):
510                msg = "Search criteria appears to be invalid"
511                raise self._make_err(msg)
512            for item in resp["notamList"]:
513                if report := item.get("icaoMessage", "").strip():
514                    report = _TAG_PATTERN.sub("", report).strip()
515                    if issued := item.get("issueDate"):
516                        report = f"{issued}||{report}"
517                    notams.append(report)
518            offset = resp["endRecordCount"]
519            if not notams or offset >= resp["totalNotamCount"]:
520                break
521            data["offset"] = offset
522        return notams

Async fetch NOTAM list from the service via ICAO, coordinate, or ident path.

class NoaaGfs(avwx.service.files.NoaaForecast):
241class NoaaGfs(NoaaForecast):
242    """Request forecast data from NOAA GFS FTP servers."""
243
244    _url = "https://nomads.ncep.noaa.gov/pub/data/nccf/com/gfs/prod/gfsmos.{}/mdl_gfs{}.t{}z"
245    _valid_types = ("mav", "mex")
246
247    _cycles: ClassVar[dict[str, tuple[int, ...]]] = {"mav": (0, 6, 12, 18), "mex": (0, 12)}
248
249    @property
250    def _urls(self) -> Iterator[str]:
251        """Iterate through update cycles no older than two days."""
252        warnings.warn(
253            "GFS fetch has been deprecated due to NOAA retiring the format. Migrate to NBM for similar data",
254            DeprecationWarning,
255            stacklevel=2,
256        )
257        now = dt.datetime.now(tz=dt.timezone.utc)
258        date = dt.datetime.now(tz=dt.timezone.utc)
259        cutoff = date - dt.timedelta(days=1)
260        while date > cutoff:
261            for cycle in reversed(self._cycles[self.report_type]):
262                date = date.replace(hour=cycle)
263                if date > now:
264                    continue
265                timestamp = date.strftime(r"%Y%m%d")
266                hour = str(date.hour).zfill(2)
267                yield self._url.format(timestamp, self.report_type, hour)
268            date -= dt.timedelta(hours=1)
269
270    def _index_target(self, station: str) -> tuple[str, str]:
271        return f"{station}   GFS", f"{self.report_type.upper()} GUIDANCE"

Request forecast data from NOAA GFS FTP servers.

class NoaaNbm(avwx.service.files.NoaaForecast):
220class NoaaNbm(NoaaForecast):
221    """Request forecast data from NOAA NBM FTP servers."""
222
223    _url = "https://nomads.ncep.noaa.gov/pub/data/nccf/com/blend/prod/blend.{}/{}/text/blend_{}tx.t{}z"
224    _valid_types = ("nbh", "nbs", "nbe", "nbx")
225
226    @property
227    def _urls(self) -> Iterator[str]:
228        """Iterate through hourly updates no older than two days."""
229        date = dt.datetime.now(tz=dt.timezone.utc)
230        cutoff = date - dt.timedelta(days=1)
231        while date > cutoff:
232            timestamp = date.strftime(r"%Y%m%d")
233            hour = str(date.hour).zfill(2)
234            yield self._url.format(timestamp, hour, self.report_type, hour)
235            date -= dt.timedelta(hours=1)
236
237    def _index_target(self, station: str) -> tuple[str, str]:
238        return f"{station}   ", f"{self.report_type.upper()} GUIDANCE"

Request forecast data from NOAA NBM FTP servers.

class Service:
38class Service:
39    """Base Service class for fetching reports."""
40
41    report_type: str
42    _url: ClassVar[str] = ""
43    _valid_types: ClassVar[tuple[str, ...]] = ()
44
45    def __init__(self, report_type: str):
46        if self._valid_types and report_type not in self._valid_types:
47            msg = f"'{report_type}' is not a valid report type for {self.__class__.__name__}. Expected {self._valid_types}"
48            raise ValueError(msg)
49        self.report_type = report_type
50
51    @property
52    def root(self) -> str | None:
53        """Return the service's root URL."""
54        if self._url is None:
55            return None
56        url = self._url[self._url.find("//") + 2 :]
57        return url[: url.find("/")]

Base Service class for fetching reports.

Service(report_type: str)
45    def __init__(self, report_type: str):
46        if self._valid_types and report_type not in self._valid_types:
47            msg = f"'{report_type}' is not a valid report type for {self.__class__.__name__}. Expected {self._valid_types}"
48            raise ValueError(msg)
49        self.report_type = report_type
report_type: str
root: str | None
51    @property
52    def root(self) -> str | None:
53        """Return the service's root URL."""
54        if self._url is None:
55            return None
56        url = self._url[self._url.find("//") + 2 :]
57        return url[: url.find("/")]

Return the service's root URL.