avwx.service
Report Source Services
AVWX fetches the raw weather reports from third-party services via REST API calls or file downloads. We use Service objects to handle the request and extraction for us.
Basic Module Use
METARs and TAFs are the most widely-supported report types, so an effort has been made to localize some of these services to a regional source. The get_service
function was introduced to determine the best service for a given station.
# Fetch Australian reports
station = 'YWOL'
country = 'AU' # can source from avwx.Station.country
# Get the station's preferred service and initialize to fetch METARs
service = avwx.service.get_service(station, country)('metar')
# service is now avwx.service.AUBOM init'd to fetch METARs
# Fetch the current METAR
report = service.fetch(station)
Other report types require specific service classes which are found in their respective submodules. However, you can normally let the report type classes handle these services for you.
Adding a New Service
If the existing services are not supplying the report(s) you need, adding a new service is easy. First, you'll need to determine if your source can be scraped or you need to download a file.
ScrapeService
For web scraping sources, you'll need to do the following things:
- Add the base URL and method (if not
"GET"
) - Implement the
ScrapeService._make_url
to return the source URL and query parameters - Implement the
ScrapeService._extract
function to return just the report string (starting at the station ID) from the response
Let's look at the MAC service as an example:
class MAC(StationScrape):
"""Requests data from Meteorologia Aeronautica Civil for Columbian stations"""
_url = "http://meteorologia.aerocivil.gov.co/expert_text_query/parse"
method = "POST"
def _make_url(self, station: str) -> tuple[str, dict]:
"""Returns a formatted URL and parameters"""
return self._url, {"query": f"{self.report_type} {station}"}
def _extract(self, raw: str, station: str) -> str:
"""Extracts the report message using string finding"""
return self._simple_extract(raw, f"{station.upper()} ", "=")
Our URL and query parameters are returned using _make_url
so fetch
knows how to request the report. The result of this query is given to _extract
which returns the report or list of reports.
Once your service is created, it can optionally be added to avwx.service.scrape.PREFERRED
if the service covers all stations with a known ICAO prefix or avwx.service.scrape.BY_COUNTRY
if the service covers all stations in a single country. This is how avwx.service.get_service
determines the preferred service. For example, the MAC service is preferred over NOAA for all ICAOs starting with "SK" while AUBOM is better for all Australian stations.
FileService
For file-based sources, you'll need to do the following things:
- Add the base URL and valid report types
- Implement the
FileService._urls
to iterate through source URLs - Implement the
FileService._extract
function to return just the report string (starting at the station ID) from the response
Let's look at the NOAA_NBM service as an example:
class NOAA_NBM(FileService):
"""Requests forecast data from NOAA NBM FTP servers"""
_url = "https://nomads.ncep.noaa.gov/pub/data/nccf/com/blend/prod/blend.{}/{}/text/blend_{}tx.t{}z"
_valid_types = ("nbh", "nbs", "nbe")
@property
def _urls(self) -> Iterator[str]:
"""Iterates through hourly updates no older than two days"""
date = dt.datetime.now(tz=dt.timezone.utc)
cutoff = date - dt.timedelta(days=1)
while date > cutoff:
timestamp = date.strftime(r"%Y%m%d")
hour = str(date.hour).zfill(2)
yield self.url.format(timestamp, hour, self.report_type, hour)
date -= dt.timedelta(hours=1)
def _extract(self, station: str, source: TextIO) -> Optional[str]:
"""Returns report pulled from the saved file"""
start = station + " "
end = self.report_type.upper() + " GUIDANCE"
txt = source.read()
txt = txt[txt.find(start) :]
txt = txt[: txt.find(end, 30)]
lines = []
for line in txt.split("\n"):
if "CLIMO" not in line:
line = line.strip()
if not line:
break
lines.append(line)
return "\n".join(lines) or None
In this example, we iterate through _urls
looking for the most recent published file. URL iterators should always have a lower bound to stop iteration so the service can return a null response.
Once the file is downloaded, the requested station and file-like object are passed to the _extract
method to find and return the report from the file. This method will not be called if the file doesn't exist.
1""".. include:: ../../docs/service.md""" 2 3from avwx.service.base import Service 4from avwx.service.files import NoaaGfs, NoaaNbm 5from avwx.service.scrape import ( 6 Amo, 7 Aubom, 8 Avt, 9 FaaNotam, 10 Mac, 11 Nam, 12 Noaa, 13 Olbs, 14 get_service, 15) 16 17__all__ = ( 18 "get_service", 19 "Noaa", 20 "Amo", 21 "Aubom", 22 "Avt", 23 "Mac", 24 "Nam", 25 "Olbs", 26 "FaaNotam", 27 "NoaaGfs", 28 "NoaaNbm", 29 "Service", 30)
545def get_service(station: str, country_code: str) -> ScrapeService: 546 """Return the preferred scrape service for a given station. 547 548 ```python 549 # Fetch Australian reports 550 station = "YWOL" 551 country = "AU" # can source from avwx.Station.country 552 # Get the station's preferred service and initialize to fetch METARs 553 service = avwx.service.get_service(station, country)("metar") 554 # service is now avwx.service.Aubom init'd to fetch METARs 555 # Fetch the current METAR 556 report = service.fetch(station) 557 ``` 558 """ 559 with suppress(KeyError): 560 return PREFERRED[station[:2]] # type: ignore 561 return BY_COUNTRY.get(country_code, Noaa) # type: ignore
Return the preferred scrape service for a given station.
# Fetch Australian reports
station = "YWOL"
country = "AU" # can source from avwx.Station.country
# Get the station's preferred service and initialize to fetch METARs
service = avwx.service.get_service(station, country)("metar")
# service is now avwx.service.Aubom init'd to fetch METARs
# Fetch the current METAR
report = service.fetch(station)
241class Amo(StationScrape): 242 """Request data from AMO KMA for Korean stations.""" 243 244 _url = "http://amoapi.kma.go.kr/amoApi/{}" 245 default_timeout = 60 246 247 def _make_url(self, station: str) -> tuple[str, dict]: 248 """Return a formatted URL and parameters.""" 249 return self._url.format(self.report_type), {"icao": station} 250 251 def _extract(self, raw: str, station: str) -> str: # noqa: ARG002 252 """Extract the report message from XML response.""" 253 resp = parsexml(raw) 254 try: 255 report = resp["response"]["body"]["items"]["item"][f"{self.report_type.lower()}Msg"] 256 except KeyError as key_error: 257 raise self._make_err(raw) from key_error 258 if not report: 259 msg = "The station might not exist" 260 raise self._make_err(msg) 261 # Replace line breaks 262 report = report.replace("\n", "") 263 # Remove excess leading and trailing data 264 for item in (self.report_type.upper(), "SPECI"): 265 if report.startswith(f"{item} "): 266 report = report[len(item) + 1 :] 267 report = report.rstrip("=") 268 # Make every element single-spaced and stripped 269 return " ".join(report.split())
Request data from AMO KMA for Korean stations.
292class Aubom(StationScrape): 293 """Request data from the Australian Bureau of Meteorology.""" 294 295 _url = "http://www.bom.gov.au/aviation/php/process.php" 296 method = "POST" 297 298 @staticmethod 299 def _make_headers() -> dict: 300 """Return request headers.""" 301 return { 302 "Content-Type": "application/x-www-form-urlencoded", 303 "Accept": "*/*", 304 "Accept-Language": "en-us", 305 "Accept-Encoding": "gzip, deflate", 306 "Host": "www.bom.gov.au", 307 "Origin": "http://www.bom.gov.au", 308 "User-Agent": secrets.choice(_USER_AGENTS), 309 "Connection": "keep-alive", 310 } 311 312 def _post_data(self, station: str) -> dict: 313 """Return the POST form.""" 314 return {"keyword": station, "type": "search", "page": "TAF"} 315 316 def _extract(self, raw: str, station: str) -> str: # noqa: ARG002 317 """Extract the reports from HTML response.""" 318 index = 1 if self.report_type == "taf" else 2 319 try: 320 report = raw.split("<p")[index] 321 report = report[report.find(">") + 1 :] 322 except IndexError as index_error: 323 msg = "The station might not exist" 324 raise self._make_err(msg) from index_error 325 if report.startswith("<"): 326 return "" 327 report = report[: report.find("</p>")] 328 return report.replace("<br />", " ")
Request data from the Australian Bureau of Meteorology.
393class Avt(StationScrape): 394 """Request data from AVT/XiamenAir for China. 395 NOTE: This should be replaced later with a gov+https source. 396 """ 397 398 _url = "http://www.avt7.com/Home/AirportMetarInfo?airport4Code=" 399 400 def _make_url(self, station: str) -> tuple[str, dict]: 401 """Return a formatted URL and empty parameters.""" 402 return self._url + station, {} 403 404 def _extract(self, raw: str, station: str) -> str: # noqa: ARG002 405 """Extract the reports from HTML response.""" 406 try: 407 data = json.loads(raw) 408 key = f"{self.report_type.lower()}ContentList" 409 text: str = data[key]["rows"][0]["content"] 410 except (TypeError, json.decoder.JSONDecodeError, KeyError, IndexError): 411 return "" 412 else: 413 return text
Request data from AVT/XiamenAir for China. NOTE: This should be replaced later with a gov+https source.
272class Mac(StationScrape): 273 """Request data from Meteorologia Aeronautica Civil for Columbian stations.""" 274 275 _url = "https://meteorologia.aerocivil.gov.co/expert_text_query/parse" 276 method = "POST" 277 278 @staticmethod 279 def _make_headers() -> dict: 280 """Return request headers.""" 281 return {"X-Requested-With": "XMLHttpRequest"} 282 283 def _post_data(self, station: str) -> dict: 284 """Return the POST form/data payload.""" 285 return {"query": f"{self.report_type} {station}"} 286 287 def _extract(self, raw: str, station: str) -> str: 288 """Extract the report message using string finding.""" 289 return self._simple_extract(raw, f"{station.upper()} ", "=")
Request data from Meteorologia Aeronautica Civil for Columbian stations.
374class Nam(StationScrape): 375 """Request data from NorthAviMet for North Atlantic and Nordic countries.""" 376 377 _url = "https://www.northavimet.com/NamConWS/rest/opmet/command/0/" 378 379 def _make_url(self, station: str) -> tuple[str, dict]: 380 """Return a formatted URL and empty parameters.""" 381 return self._url + station, {} 382 383 def _extract(self, raw: str, station: str) -> str: 384 """Extract the reports from HTML response.""" 385 starts = [f">{self.report_type.upper()} <", f">{station.upper()}<", "top'>"] 386 report = self._simple_extract(raw, starts, "=") 387 index = report.rfind(">") 388 if index > -1: 389 report = report[index + 1 :] 390 return f"{station} {report.strip()}"
Request data from NorthAviMet for North Atlantic and Nordic countries.
331class Olbs(StationScrape): 332 """Request data from India OLBS flight briefing.""" 333 334 # _url = "https://olbs.amsschennai.gov.in/nsweb/FlightBriefing/showopmetquery.php" 335 # method = "POST" 336 337 # Temp redirect 338 _url = "https://avbrief3.el.r.appspot.com/" 339 340 def _make_url(self, station: str) -> tuple[str, dict]: 341 """Return a formatted URL and empty parameters.""" 342 return self._url, {"icao": station} 343 344 def _post_data(self, station: str) -> dict: 345 """Return the POST form.""" 346 # Can set icaos to "V*" to return all results 347 return {"icaos": station, "type": self.report_type} 348 349 @staticmethod 350 def _make_headers() -> dict: 351 """Return request headers.""" 352 return { 353 # "Content-Type": "application/x-www-form-urlencoded", 354 # "Accept": "text/html, */*; q=0.01", 355 # "Accept-Language": "en-us", 356 "Accept-Encoding": "gzip, deflate, br", 357 # "Host": "olbs.amsschennai.gov.in", 358 "User-Agent": secrets.choice(_USER_AGENTS), 359 "Connection": "keep-alive", 360 # "Referer": "https://olbs.amsschennai.gov.in/nsweb/FlightBriefing/", 361 # "X-Requested-With": "XMLHttpRequest", 362 "Accept-Language": "en-US,en;q=0.9", 363 "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", 364 "Referer": "https://avbrief3.el.r.appspot.com/", 365 "Host": "avbrief3.el.r.appspot.com", 366 } 367 368 def _extract(self, raw: str, station: str) -> str: 369 """Extract the reports from HTML response.""" 370 # start = raw.find(f"{self.report_type.upper()} {station} ") 371 return self._simple_extract(raw, [f">{self.report_type.upper()}</div>", station], ["=", "<"])
Request data from India OLBS flight briefing.
424class FaaNotam(ScrapeService): 425 """Source NOTAMs from official FAA portal.""" 426 427 _url = "https://notams.aim.faa.gov/notamSearch/search" 428 method = "POST" 429 _valid_types = ("notam",) 430 431 @staticmethod 432 def _make_headers() -> dict: 433 return {"Content-Type": "application/x-www-form-urlencoded"} 434 435 @staticmethod 436 def _split_coord(prefix: str, value: float) -> dict: 437 """Add coordinate deg/min/sec fields per float value.""" 438 degree, minute, second = Coord.to_dms(value) 439 if prefix == "lat": 440 key = "latitude" 441 direction = "N" if degree >= 0 else "S" 442 else: 443 key = "longitude" 444 direction = "E" if degree >= 0 else "W" 445 return { 446 f"{prefix}Degrees": abs(degree), 447 f"{prefix}Minutes": minute, 448 f"{prefix}Seconds": second, 449 f"{key}Direction": direction, 450 } 451 452 def _post_for( 453 self, 454 icao: str | None = None, 455 coord: Coord | None = None, 456 path: list[str] | None = None, 457 radius: int = 10, 458 ) -> dict: 459 """Generate POST payload for search params in location order.""" 460 data: dict[str, Any] = {"notamsOnly": False, "radius": radius} 461 if icao: 462 data["searchType"] = 0 463 data["designatorsForLocation"] = icao 464 elif coord: 465 data["searchType"] = 3 466 data["radiusSearchOnDesignator"] = False 467 data |= self._split_coord("lat", coord.lat) 468 data |= self._split_coord("long", coord.lon) 469 elif path: 470 data["searchType"] = 6 471 data["flightPathText"] = " ".join(path) 472 data["flightPathBuffer"] = radius 473 data["flightPathIncludeNavaids"] = True 474 data["flightPathIncludeArtcc"] = False 475 data["flightPathIncludeTfr"] = True 476 data["flightPathIncludeRegulatory"] = False 477 data["flightPathResultsType"] = "All NOTAMs" 478 else: 479 msg = "Not enough info to request NOTAM data" 480 raise InvalidRequest(msg) 481 return data 482 483 def fetch( 484 self, 485 icao: str | None = None, 486 coord: Coord | None = None, 487 path: list[str] | None = None, 488 radius: int = 10, 489 timeout: int = 10, 490 ) -> list[str]: 491 """Fetch NOTAM list from the service via ICAO, coordinate, or ident path.""" 492 return aio.run(self.async_fetch(icao, coord, path, radius, timeout)) 493 494 async def async_fetch( 495 self, 496 icao: str | None = None, 497 coord: Coord | None = None, 498 path: list[str] | None = None, 499 radius: int = 10, 500 timeout: int = 10, 501 ) -> list[str]: 502 """Async fetch NOTAM list from the service via ICAO, coordinate, or ident path.""" 503 headers = self._make_headers() 504 data = self._post_for(icao, coord, path, radius) 505 notams = [] 506 while True: 507 text = await self._call(self._url, None, headers, data, timeout) 508 resp: dict = json.loads(text) 509 if resp.get("error"): 510 msg = "Search criteria appears to be invalid" 511 raise self._make_err(msg) 512 for item in resp["notamList"]: 513 if report := item.get("icaoMessage", "").strip(): 514 report = _TAG_PATTERN.sub("", report).strip() 515 if issued := item.get("issueDate"): 516 report = f"{issued}||{report}" 517 notams.append(report) 518 offset = resp["endRecordCount"] 519 if not notams or offset >= resp["totalNotamCount"]: 520 break 521 data["offset"] = offset 522 return notams
Source NOTAMs from official FAA portal.
483 def fetch( 484 self, 485 icao: str | None = None, 486 coord: Coord | None = None, 487 path: list[str] | None = None, 488 radius: int = 10, 489 timeout: int = 10, 490 ) -> list[str]: 491 """Fetch NOTAM list from the service via ICAO, coordinate, or ident path.""" 492 return aio.run(self.async_fetch(icao, coord, path, radius, timeout))
Fetch NOTAM list from the service via ICAO, coordinate, or ident path.
494 async def async_fetch( 495 self, 496 icao: str | None = None, 497 coord: Coord | None = None, 498 path: list[str] | None = None, 499 radius: int = 10, 500 timeout: int = 10, 501 ) -> list[str]: 502 """Async fetch NOTAM list from the service via ICAO, coordinate, or ident path.""" 503 headers = self._make_headers() 504 data = self._post_for(icao, coord, path, radius) 505 notams = [] 506 while True: 507 text = await self._call(self._url, None, headers, data, timeout) 508 resp: dict = json.loads(text) 509 if resp.get("error"): 510 msg = "Search criteria appears to be invalid" 511 raise self._make_err(msg) 512 for item in resp["notamList"]: 513 if report := item.get("icaoMessage", "").strip(): 514 report = _TAG_PATTERN.sub("", report).strip() 515 if issued := item.get("issueDate"): 516 report = f"{issued}||{report}" 517 notams.append(report) 518 offset = resp["endRecordCount"] 519 if not notams or offset >= resp["totalNotamCount"]: 520 break 521 data["offset"] = offset 522 return notams
Async fetch NOTAM list from the service via ICAO, coordinate, or ident path.
241class NoaaGfs(NoaaForecast): 242 """Request forecast data from NOAA GFS FTP servers.""" 243 244 _url = "https://nomads.ncep.noaa.gov/pub/data/nccf/com/gfs/prod/gfsmos.{}/mdl_gfs{}.t{}z" 245 _valid_types = ("mav", "mex") 246 247 _cycles: ClassVar[dict[str, tuple[int, ...]]] = {"mav": (0, 6, 12, 18), "mex": (0, 12)} 248 249 @property 250 def _urls(self) -> Iterator[str]: 251 """Iterate through update cycles no older than two days.""" 252 warnings.warn( 253 "GFS fetch has been deprecated due to NOAA retiring the format. Migrate to NBM for similar data", 254 DeprecationWarning, 255 stacklevel=2, 256 ) 257 now = dt.datetime.now(tz=dt.timezone.utc) 258 date = dt.datetime.now(tz=dt.timezone.utc) 259 cutoff = date - dt.timedelta(days=1) 260 while date > cutoff: 261 for cycle in reversed(self._cycles[self.report_type]): 262 date = date.replace(hour=cycle) 263 if date > now: 264 continue 265 timestamp = date.strftime(r"%Y%m%d") 266 hour = str(date.hour).zfill(2) 267 yield self._url.format(timestamp, self.report_type, hour) 268 date -= dt.timedelta(hours=1) 269 270 def _index_target(self, station: str) -> tuple[str, str]: 271 return f"{station} GFS", f"{self.report_type.upper()} GUIDANCE"
Request forecast data from NOAA GFS FTP servers.
220class NoaaNbm(NoaaForecast): 221 """Request forecast data from NOAA NBM FTP servers.""" 222 223 _url = "https://nomads.ncep.noaa.gov/pub/data/nccf/com/blend/prod/blend.{}/{}/text/blend_{}tx.t{}z" 224 _valid_types = ("nbh", "nbs", "nbe", "nbx") 225 226 @property 227 def _urls(self) -> Iterator[str]: 228 """Iterate through hourly updates no older than two days.""" 229 date = dt.datetime.now(tz=dt.timezone.utc) 230 cutoff = date - dt.timedelta(days=1) 231 while date > cutoff: 232 timestamp = date.strftime(r"%Y%m%d") 233 hour = str(date.hour).zfill(2) 234 yield self._url.format(timestamp, hour, self.report_type, hour) 235 date -= dt.timedelta(hours=1) 236 237 def _index_target(self, station: str) -> tuple[str, str]: 238 return f"{station} ", f"{self.report_type.upper()} GUIDANCE"
Request forecast data from NOAA NBM FTP servers.
38class Service: 39 """Base Service class for fetching reports.""" 40 41 report_type: str 42 _url: ClassVar[str] = "" 43 _valid_types: ClassVar[tuple[str, ...]] = () 44 45 def __init__(self, report_type: str): 46 if self._valid_types and report_type not in self._valid_types: 47 msg = f"'{report_type}' is not a valid report type for {self.__class__.__name__}. Expected {self._valid_types}" 48 raise ValueError(msg) 49 self.report_type = report_type 50 51 @property 52 def root(self) -> str | None: 53 """Return the service's root URL.""" 54 if self._url is None: 55 return None 56 url = self._url[self._url.find("//") + 2 :] 57 return url[: url.find("/")]
Base Service class for fetching reports.