avwx.service

Report Source Services

AVWX fetches the raw weather reports from third-party services via REST API calls or file downloads. We use Service objects to handle the request and extraction for us.

Basic Module Use

METARs and TAFs are the most widely-supported report types, so an effort has been made to localize some of these services to a regional source. The get_service function was introduced to determine the best service for a given station.

# Fetch Australian reports
station = 'YWOL'
country = 'AU' # can source from avwx.Station.country
# Get the station's preferred service and initialize to fetch METARs
service = avwx.service.get_service(station, country)('metar')
# service is now avwx.service.AUBOM init'd to fetch METARs
# Fetch the current METAR
report = service.fetch(station)

Other report types require specific service classes which are found in their respective submodules. However, you can normally let the report type classes handle these services for you.

Adding a New Service

If the existing services are not supplying the report(s) you need, adding a new service is easy. First, you'll need to determine if your source can be scraped or you need to download a file.

ScrapeService

For web scraping sources, you'll need to do the following things:

  • Add the base URL and method (if not "GET")
  • Implement the ScrapeService._make_url to return the source URL and query parameters
  • Implement the ScrapeService._extract function to return just the report string (starting at the station ID) from the response

Let's look at the MAC service as an example:

class MAC(StationScrape):
    """Requests data from Meteorologia Aeronautica Civil for Columbian stations"""

    _url = "http://meteorologia.aerocivil.gov.co/expert_text_query/parse"
    method = "POST"

    def _make_url(self, station: str) -> tuple[str, dict]:
        """Returns a formatted URL and parameters"""
        return self._url, {"query": f"{self.report_type} {station}"}

    def _extract(self, raw: str, station: str) -> str:
        """Extracts the report message using string finding"""
        return self._simple_extract(raw, f"{station.upper()} ", "=")

Our URL and query parameters are returned using _make_url so fetch knows how to request the report. The result of this query is given to _extract which returns the report or list of reports.

Once your service is created, it can optionally be added to avwx.service.scrape.PREFERRED if the service covers all stations with a known ICAO prefix or avwx.service.scrape.BY_COUNTRY if the service covers all stations in a single country. This is how avwx.service.get_service determines the preferred service. For example, the MAC service is preferred over NOAA for all ICAOs starting with "SK" while AUBOM is better for all Australian stations.

FileService

For file-based sources, you'll need to do the following things:

  • Add the base URL and valid report types
  • Implement the FileService._urls to iterate through source URLs
  • Implement the FileService._extract function to return just the report string (starting at the station ID) from the response

Let's look at the NOAA_NBM service as an example:

class NOAA_NBM(FileService):
    """Requests forecast data from NOAA NBM FTP servers"""

    _url = "https://nomads.ncep.noaa.gov/pub/data/nccf/com/blend/prod/blend.{}/{}/text/blend_{}tx.t{}z"
    _valid_types = ("nbh", "nbs", "nbe")

    @property
    def _urls(self) -> Iterator[str]:
        """Iterates through hourly updates no older than two days"""
        date = dt.datetime.now(tz=dt.timezone.utc)
        cutoff = date - dt.timedelta(days=1)
        while date > cutoff:
            timestamp = date.strftime(r"%Y%m%d")
            hour = str(date.hour).zfill(2)
            yield self.url.format(timestamp, hour, self.report_type, hour)
            date -= dt.timedelta(hours=1)

    def _extract(self, station: str, source: TextIO) -> Optional[str]:
        """Returns report pulled from the saved file"""
        start = station + "   "
        end = self.report_type.upper() + " GUIDANCE"
        txt = source.read()
        txt = txt[txt.find(start) :]
        txt = txt[: txt.find(end, 30)]
        lines = []
        for line in txt.split("\n"):
            if "CLIMO" not in line:
                line = line.strip()
            if not line:
                break
            lines.append(line)
        return "\n".join(lines) or None

In this example, we iterate through _urls looking for the most recent published file. URL iterators should always have a lower bound to stop iteration so the service can return a null response.

Once the file is downloaded, the requested station and file-like object are passed to the _extract method to find and return the report from the file. This method will not be called if the file doesn't exist.

 1""".. include:: ../../docs/service.md"""
 2
 3from avwx.service.base import Service
 4from avwx.service.files import NoaaGfs, NoaaNbm
 5from avwx.service.scrape import (
 6    Amo,
 7    Aubom,
 8    Avt,
 9    FaaNotam,
10    Mac,
11    Nam,
12    Noaa,
13    Olbs,
14    get_service,
15)
16
17__all__ = (
18    "get_service",
19    "Noaa",
20    "Amo",
21    "Aubom",
22    "Avt",
23    "Mac",
24    "Nam",
25    "Olbs",
26    "FaaNotam",
27    "NoaaGfs",
28    "NoaaNbm",
29    "Service",
30)
def get_service(station: str, country_code: str) -> avwx.service.scrape.ScrapeService:
529def get_service(station: str, country_code: str) -> ScrapeService:
530    """Return the preferred scrape service for a given station.
531
532    ```python
533    # Fetch Australian reports
534    station = "YWOL"
535    country = "AU"  # can source from avwx.Station.country
536    # Get the station's preferred service and initialize to fetch METARs
537    service = avwx.service.get_service(station, country)("metar")
538    # service is now avwx.service.Aubom init'd to fetch METARs
539    # Fetch the current METAR
540    report = service.fetch(station)
541    ```
542    """
543    with suppress(KeyError):
544        return PREFERRED[station[:2]]  # type: ignore
545    return BY_COUNTRY.get(country_code, Noaa)  # type: ignore

Return the preferred scrape service for a given station.

# Fetch Australian reports
station = "YWOL"
country = "AU"  # can source from avwx.Station.country
# Get the station's preferred service and initialize to fetch METARs
service = avwx.service.get_service(station, country)("metar")
# service is now avwx.service.Aubom init'd to fetch METARs
# Fetch the current METAR
report = service.fetch(station)
Noaa = <class 'avwx.service.scrape.NoaaScrape'>
class Amo(avwx.service.scrape.StationScrape):
225class Amo(StationScrape):
226    """Request data from AMO KMA for Korean stations."""
227
228    _url = "http://amoapi.kma.go.kr/amoApi/{}"
229    default_timeout = 60
230
231    def _make_url(self, station: str) -> tuple[str, dict]:
232        """Return a formatted URL and parameters."""
233        return self._url.format(self.report_type), {"icao": station}
234
235    def _extract(self, raw: str, station: str) -> str:  # noqa: ARG002
236        """Extract the report message from XML response."""
237        resp = parsexml(raw)
238        try:
239            report = resp["response"]["body"]["items"]["item"][f"{self.report_type.lower()}Msg"]
240        except KeyError as key_error:
241            raise self._make_err(raw) from key_error
242        if not report:
243            msg = "The station might not exist"
244            raise self._make_err(msg)
245        # Replace line breaks
246        report = report.replace("\n", "")
247        # Remove excess leading and trailing data
248        for item in (self.report_type.upper(), "SPECI"):
249            if report.startswith(f"{item} "):
250                report = report[len(item) + 1 :]
251        report = report.rstrip("=")
252        # Make every element single-spaced and stripped
253        return " ".join(report.split())

Request data from AMO KMA for Korean stations.

default_timeout = 60
class Aubom(avwx.service.scrape.StationScrape):
276class Aubom(StationScrape):
277    """Request data from the Australian Bureau of Meteorology."""
278
279    _url = "http://www.bom.gov.au/aviation/php/process.php"
280    method = "POST"
281
282    @staticmethod
283    def _make_headers() -> dict:
284        """Return request headers."""
285        return {
286            "Content-Type": "application/x-www-form-urlencoded",
287            "Accept": "*/*",
288            "Accept-Language": "en-us",
289            "Accept-Encoding": "gzip, deflate",
290            "Host": "www.bom.gov.au",
291            "Origin": "http://www.bom.gov.au",
292            "User-Agent": secrets.choice(_USER_AGENTS),
293            "Connection": "keep-alive",
294        }
295
296    def _post_data(self, station: str) -> dict:
297        """Return the POST form."""
298        return {"keyword": station, "type": "search", "page": "TAF"}
299
300    def _extract(self, raw: str, station: str) -> str:  # noqa: ARG002
301        """Extract the reports from HTML response."""
302        index = 1 if self.report_type == "taf" else 2
303        try:
304            report = raw.split("<p")[index]
305            report = report[report.find(">") + 1 :]
306        except IndexError as index_error:
307            msg = "The station might not exist"
308            raise self._make_err(msg) from index_error
309        if report.startswith("<"):
310            return ""
311        report = report[: report.find("</p>")]
312        return report.replace("<br />", " ")

Request data from the Australian Bureau of Meteorology.

method = 'POST'
class Avt(avwx.service.scrape.StationScrape):
377class Avt(StationScrape):
378    """Request data from AVT/XiamenAir for China.
379    NOTE: This should be replaced later with a gov+https source.
380    """
381
382    _url = "http://www.avt7.com/Home/AirportMetarInfo?airport4Code="
383
384    def _make_url(self, station: str) -> tuple[str, dict]:
385        """Return a formatted URL and empty parameters."""
386        return self._url + station, {}
387
388    def _extract(self, raw: str, station: str) -> str:  # noqa: ARG002
389        """Extract the reports from HTML response."""
390        try:
391            data = json.loads(raw)
392            key = f"{self.report_type.lower()}ContentList"
393            text: str = data[key]["rows"][0]["content"]
394        except (TypeError, json.decoder.JSONDecodeError, KeyError, IndexError):
395            return ""
396        else:
397            return text

Request data from AVT/XiamenAir for China. NOTE: This should be replaced later with a gov+https source.

class Mac(avwx.service.scrape.StationScrape):
256class Mac(StationScrape):
257    """Request data from Meteorologia Aeronautica Civil for Columbian stations."""
258
259    _url = "https://meteorologia.aerocivil.gov.co/expert_text_query/parse"
260    method = "POST"
261
262    @staticmethod
263    def _make_headers() -> dict:
264        """Return request headers."""
265        return {"X-Requested-With": "XMLHttpRequest"}
266
267    def _post_data(self, station: str) -> dict:
268        """Return the POST form/data payload."""
269        return {"query": f"{self.report_type} {station}"}
270
271    def _extract(self, raw: str, station: str) -> str:
272        """Extract the report message using string finding."""
273        return self._simple_extract(raw, f"{station.upper()} ", "=")

Request data from Meteorologia Aeronautica Civil for Columbian stations.

method = 'POST'
class Nam(avwx.service.scrape.StationScrape):
358class Nam(StationScrape):
359    """Request data from NorthAviMet for North Atlantic and Nordic countries."""
360
361    _url = "https://www.northavimet.com/NamConWS/rest/opmet/command/0/"
362
363    def _make_url(self, station: str) -> tuple[str, dict]:
364        """Return a formatted URL and empty parameters."""
365        return self._url + station, {}
366
367    def _extract(self, raw: str, station: str) -> str:
368        """Extract the reports from HTML response."""
369        starts = [f">{self.report_type.upper()} <", f">{station.upper()}<", "top'>"]
370        report = self._simple_extract(raw, starts, "=")
371        index = report.rfind(">")
372        if index > -1:
373            report = report[index + 1 :]
374        return f"{station} {report.strip()}"

Request data from NorthAviMet for North Atlantic and Nordic countries.

class Olbs(avwx.service.scrape.StationScrape):
315class Olbs(StationScrape):
316    """Request data from India OLBS flight briefing."""
317
318    # _url = "https://olbs.amsschennai.gov.in/nsweb/FlightBriefing/showopmetquery.php"
319    # method = "POST"
320
321    # Temp redirect
322    _url = "https://avbrief3.el.r.appspot.com/"
323
324    def _make_url(self, station: str) -> tuple[str, dict]:
325        """Return a formatted URL and empty parameters."""
326        return self._url, {"icao": station}
327
328    def _post_data(self, station: str) -> dict:
329        """Return the POST form."""
330        # Can set icaos to "V*" to return all results
331        return {"icaos": station, "type": self.report_type}
332
333    @staticmethod
334    def _make_headers() -> dict:
335        """Return request headers."""
336        return {
337            # "Content-Type": "application/x-www-form-urlencoded",
338            # "Accept": "text/html, */*; q=0.01",
339            # "Accept-Language": "en-us",
340            "Accept-Encoding": "gzip, deflate, br",
341            # "Host": "olbs.amsschennai.gov.in",
342            "User-Agent": secrets.choice(_USER_AGENTS),
343            "Connection": "keep-alive",
344            # "Referer": "https://olbs.amsschennai.gov.in/nsweb/FlightBriefing/",
345            # "X-Requested-With": "XMLHttpRequest",
346            "Accept-Language": "en-US,en;q=0.9",
347            "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
348            "Referer": "https://avbrief3.el.r.appspot.com/",
349            "Host": "avbrief3.el.r.appspot.com",
350        }
351
352    def _extract(self, raw: str, station: str) -> str:
353        """Extract the reports from HTML response."""
354        # start = raw.find(f"{self.report_type.upper()} {station} ")
355        return self._simple_extract(raw, [f">{self.report_type.upper()}</div>", station], ["=", "<"])

Request data from India OLBS flight briefing.

class FaaNotam(avwx.service.scrape.ScrapeService):
408class FaaNotam(ScrapeService):
409    """Source NOTAMs from official FAA portal."""
410
411    _url = "https://notams.aim.faa.gov/notamSearch/search"
412    method = "POST"
413    _valid_types = ("notam",)
414
415    @staticmethod
416    def _make_headers() -> dict:
417        return {"Content-Type": "application/x-www-form-urlencoded"}
418
419    @staticmethod
420    def _split_coord(prefix: str, value: float) -> dict:
421        """Add coordinate deg/min/sec fields per float value."""
422        degree, minute, second = Coord.to_dms(value)
423        if prefix == "lat":
424            key = "latitude"
425            direction = "N" if degree >= 0 else "S"
426        else:
427            key = "longitude"
428            direction = "E" if degree >= 0 else "W"
429        return {
430            f"{prefix}Degrees": abs(degree),
431            f"{prefix}Minutes": minute,
432            f"{prefix}Seconds": second,
433            f"{key}Direction": direction,
434        }
435
436    def _post_for(
437        self,
438        icao: str | None = None,
439        coord: Coord | None = None,
440        path: list[str] | None = None,
441        radius: int = 10,
442    ) -> dict:
443        """Generate POST payload for search params in location order."""
444        data: dict[str, Any] = {"notamsOnly": False, "radius": radius}
445        if icao:
446            data["searchType"] = 0
447            data["designatorsForLocation"] = icao
448        elif coord:
449            data["searchType"] = 3
450            data["radiusSearchOnDesignator"] = False
451            data |= self._split_coord("lat", coord.lat)
452            data |= self._split_coord("long", coord.lon)
453        elif path:
454            data["searchType"] = 6
455            data["flightPathText"] = " ".join(path)
456            data["flightPathBuffer"] = radius
457            data["flightPathIncludeNavaids"] = True
458            data["flightPathIncludeArtcc"] = False
459            data["flightPathIncludeTfr"] = True
460            data["flightPathIncludeRegulatory"] = False
461            data["flightPathResultsType"] = "All NOTAMs"
462        else:
463            msg = "Not enough info to request NOTAM data"
464            raise InvalidRequest(msg)
465        return data
466
467    def fetch(
468        self,
469        icao: str | None = None,
470        coord: Coord | None = None,
471        path: list[str] | None = None,
472        radius: int = 10,
473        timeout: int = 10,
474    ) -> list[str]:
475        """Fetch NOTAM list from the service via ICAO, coordinate, or ident path."""
476        return aio.run(self.async_fetch(icao, coord, path, radius, timeout))
477
478    async def async_fetch(
479        self,
480        icao: str | None = None,
481        coord: Coord | None = None,
482        path: list[str] | None = None,
483        radius: int = 10,
484        timeout: int = 10,
485    ) -> list[str]:
486        """Async fetch NOTAM list from the service via ICAO, coordinate, or ident path."""
487        headers = self._make_headers()
488        data = self._post_for(icao, coord, path, radius)
489        notams = []
490        while True:
491            text = await self._call(self._url, None, headers, data, timeout)
492            resp: dict = json.loads(text)
493            if resp.get("error"):
494                msg = "Search criteria appears to be invalid"
495                raise self._make_err(msg)
496            for item in resp["notamList"]:
497                if report := item.get("icaoMessage", "").strip():
498                    report = _TAG_PATTERN.sub("", report).strip()
499                    if issued := item.get("issueDate"):
500                        report = f"{issued}||{report}"
501                    notams.append(report)
502            offset = resp["endRecordCount"]
503            if not notams or offset >= resp["totalNotamCount"]:
504                break
505            data["offset"] = offset
506        return notams

Source NOTAMs from official FAA portal.

method = 'POST'
def fetch( self, icao: str | None = None, coord: avwx.structs.Coord | None = None, path: list[str] | None = None, radius: int = 10, timeout: int = 10) -> list[str]:
467    def fetch(
468        self,
469        icao: str | None = None,
470        coord: Coord | None = None,
471        path: list[str] | None = None,
472        radius: int = 10,
473        timeout: int = 10,
474    ) -> list[str]:
475        """Fetch NOTAM list from the service via ICAO, coordinate, or ident path."""
476        return aio.run(self.async_fetch(icao, coord, path, radius, timeout))

Fetch NOTAM list from the service via ICAO, coordinate, or ident path.

async def async_fetch( self, icao: str | None = None, coord: avwx.structs.Coord | None = None, path: list[str] | None = None, radius: int = 10, timeout: int = 10) -> list[str]:
478    async def async_fetch(
479        self,
480        icao: str | None = None,
481        coord: Coord | None = None,
482        path: list[str] | None = None,
483        radius: int = 10,
484        timeout: int = 10,
485    ) -> list[str]:
486        """Async fetch NOTAM list from the service via ICAO, coordinate, or ident path."""
487        headers = self._make_headers()
488        data = self._post_for(icao, coord, path, radius)
489        notams = []
490        while True:
491            text = await self._call(self._url, None, headers, data, timeout)
492            resp: dict = json.loads(text)
493            if resp.get("error"):
494                msg = "Search criteria appears to be invalid"
495                raise self._make_err(msg)
496            for item in resp["notamList"]:
497                if report := item.get("icaoMessage", "").strip():
498                    report = _TAG_PATTERN.sub("", report).strip()
499                    if issued := item.get("issueDate"):
500                        report = f"{issued}||{report}"
501                    notams.append(report)
502            offset = resp["endRecordCount"]
503            if not notams or offset >= resp["totalNotamCount"]:
504                break
505            data["offset"] = offset
506        return notams

Async fetch NOTAM list from the service via ICAO, coordinate, or ident path.

class NoaaGfs(avwx.service.files.NoaaForecast):
241class NoaaGfs(NoaaForecast):
242    """Request forecast data from NOAA GFS FTP servers."""
243
244    _url = "https://nomads.ncep.noaa.gov/pub/data/nccf/com/gfs/prod/gfsmos.{}/mdl_gfs{}.t{}z"
245    _valid_types = ("mav", "mex")
246
247    _cycles: ClassVar[dict[str, tuple[int, ...]]] = {"mav": (0, 6, 12, 18), "mex": (0, 12)}
248
249    @property
250    def _urls(self) -> Iterator[str]:
251        """Iterate through update cycles no older than two days."""
252        warnings.warn(
253            "GFS fetch has been deprecated due to NOAA retiring the format. Migrate to NBM for similar data",
254            DeprecationWarning,
255            stacklevel=2,
256        )
257        now = dt.datetime.now(tz=dt.timezone.utc)
258        date = dt.datetime.now(tz=dt.timezone.utc)
259        cutoff = date - dt.timedelta(days=1)
260        while date > cutoff:
261            for cycle in reversed(self._cycles[self.report_type]):
262                date = date.replace(hour=cycle)
263                if date > now:
264                    continue
265                timestamp = date.strftime(r"%Y%m%d")
266                hour = str(date.hour).zfill(2)
267                yield self._url.format(timestamp, self.report_type, hour)
268            date -= dt.timedelta(hours=1)
269
270    def _index_target(self, station: str) -> tuple[str, str]:
271        return f"{station}   GFS", f"{self.report_type.upper()} GUIDANCE"

Request forecast data from NOAA GFS FTP servers.

class NoaaNbm(avwx.service.files.NoaaForecast):
220class NoaaNbm(NoaaForecast):
221    """Request forecast data from NOAA NBM FTP servers."""
222
223    _url = "https://nomads.ncep.noaa.gov/pub/data/nccf/com/blend/prod/blend.{}/{}/text/blend_{}tx.t{}z"
224    _valid_types = ("nbh", "nbs", "nbe", "nbx")
225
226    @property
227    def _urls(self) -> Iterator[str]:
228        """Iterate through hourly updates no older than two days."""
229        date = dt.datetime.now(tz=dt.timezone.utc)
230        cutoff = date - dt.timedelta(days=1)
231        while date > cutoff:
232            timestamp = date.strftime(r"%Y%m%d")
233            hour = str(date.hour).zfill(2)
234            yield self._url.format(timestamp, hour, self.report_type, hour)
235            date -= dt.timedelta(hours=1)
236
237    def _index_target(self, station: str) -> tuple[str, str]:
238        return f"{station}   ", f"{self.report_type.upper()} GUIDANCE"

Request forecast data from NOAA NBM FTP servers.

class Service:
35class Service:
36    """Base Service class for fetching reports."""
37
38    report_type: str
39    _url: ClassVar[str] = ""
40    _valid_types: ClassVar[tuple[str, ...]] = ()
41
42    def __init__(self, report_type: str):
43        if self._valid_types and report_type not in self._valid_types:
44            msg = f"'{report_type}' is not a valid report type for {self.__class__.__name__}. Expected {self._valid_types}"
45            raise ValueError(msg)
46        self.report_type = report_type
47
48    @property
49    def root(self) -> str | None:
50        """Return the service's root URL."""
51        if self._url is None:
52            return None
53        url = self._url[self._url.find("//") + 2 :]
54        return url[: url.find("/")]

Base Service class for fetching reports.

Service(report_type: str)
42    def __init__(self, report_type: str):
43        if self._valid_types and report_type not in self._valid_types:
44            msg = f"'{report_type}' is not a valid report type for {self.__class__.__name__}. Expected {self._valid_types}"
45            raise ValueError(msg)
46        self.report_type = report_type
report_type: str
root: str | None
48    @property
49    def root(self) -> str | None:
50        """Return the service's root URL."""
51        if self._url is None:
52            return None
53        url = self._url[self._url.find("//") + 2 :]
54        return url[: url.find("/")]

Return the service's root URL.