avwx.service
Report Source Services
AVWX fetches the raw weather reports from third-party services via REST API calls or file downloads. We use Service objects to handle the request and extraction for us.
Basic Module Use
METARs and TAFs are the most widely-supported report types, so an effort has been made to localize some of these services to a regional source. The get_service
function was introduced to determine the best service for a given station.
# Fetch Australian reports
station = 'YWOL'
country = 'AU' # can source from avwx.Station.country
# Get the station's preferred service and initialize to fetch METARs
service = avwx.service.get_service(station, country)('metar')
# service is now avwx.service.AUBOM init'd to fetch METARs
# Fetch the current METAR
report = service.fetch(station)
Other report types require specific service classes which are found in their respective submodules. However, you can normally let the report type classes handle these services for you.
Adding a New Service
If the existing services are not supplying the report(s) you need, adding a new service is easy. First, you'll need to determine if your source can be scraped or you need to download a file.
ScrapeService
For web scraping sources, you'll need to do the following things:
- Add the base URL and method (if not
"GET"
) - Implement the
ScrapeService._make_url
to return the source URL and query parameters - Implement the
ScrapeService._extract
function to return just the report string (starting at the station ID) from the response
Let's look at the MAC service as an example:
class MAC(StationScrape):
"""Requests data from Meteorologia Aeronautica Civil for Columbian stations"""
_url = "http://meteorologia.aerocivil.gov.co/expert_text_query/parse"
method = "POST"
def _make_url(self, station: str) -> tuple[str, dict]:
"""Returns a formatted URL and parameters"""
return self._url, {"query": f"{self.report_type} {station}"}
def _extract(self, raw: str, station: str) -> str:
"""Extracts the report message using string finding"""
return self._simple_extract(raw, f"{station.upper()} ", "=")
Our URL and query parameters are returned using _make_url
so fetch
knows how to request the report. The result of this query is given to _extract
which returns the report or list of reports.
Once your service is created, it can optionally be added to avwx.service.scrape.PREFERRED
if the service covers all stations with a known ICAO prefix or avwx.service.scrape.BY_COUNTRY
if the service covers all stations in a single country. This is how avwx.service.get_service
determines the preferred service. For example, the MAC service is preferred over NOAA for all ICAOs starting with "SK" while AUBOM is better for all Australian stations.
FileService
For file-based sources, you'll need to do the following things:
- Add the base URL and valid report types
- Implement the
FileService._urls
to iterate through source URLs - Implement the
FileService._extract
function to return just the report string (starting at the station ID) from the response
Let's look at the NOAA_NBM service as an example:
class NOAA_NBM(FileService):
"""Requests forecast data from NOAA NBM FTP servers"""
_url = "https://nomads.ncep.noaa.gov/pub/data/nccf/com/blend/prod/blend.{}/{}/text/blend_{}tx.t{}z"
_valid_types = ("nbh", "nbs", "nbe")
@property
def _urls(self) -> Iterator[str]:
"""Iterates through hourly updates no older than two days"""
date = dt.datetime.now(tz=dt.timezone.utc)
cutoff = date - dt.timedelta(days=1)
while date > cutoff:
timestamp = date.strftime(r"%Y%m%d")
hour = str(date.hour).zfill(2)
yield self.url.format(timestamp, hour, self.report_type, hour)
date -= dt.timedelta(hours=1)
def _extract(self, station: str, source: TextIO) -> Optional[str]:
"""Returns report pulled from the saved file"""
start = station + " "
end = self.report_type.upper() + " GUIDANCE"
txt = source.read()
txt = txt[txt.find(start) :]
txt = txt[: txt.find(end, 30)]
lines = []
for line in txt.split("\n"):
if "CLIMO" not in line:
line = line.strip()
if not line:
break
lines.append(line)
return "\n".join(lines) or None
In this example, we iterate through _urls
looking for the most recent published file. URL iterators should always have a lower bound to stop iteration so the service can return a null response.
Once the file is downloaded, the requested station and file-like object are passed to the _extract
method to find and return the report from the file. This method will not be called if the file doesn't exist.
1""".. include:: ../../docs/service.md""" 2 3from avwx.service.base import Service 4from avwx.service.files import NoaaGfs, NoaaNbm 5from avwx.service.scrape import ( 6 Amo, 7 Aubom, 8 Avt, 9 FaaNotam, 10 Mac, 11 Nam, 12 Noaa, 13 Olbs, 14 get_service, 15) 16 17__all__ = ( 18 "get_service", 19 "Noaa", 20 "Amo", 21 "Aubom", 22 "Avt", 23 "Mac", 24 "Nam", 25 "Olbs", 26 "FaaNotam", 27 "NoaaGfs", 28 "NoaaNbm", 29 "Service", 30)
543def get_service(station: str, country_code: str) -> ScrapeService: 544 """Return the preferred scrape service for a given station. 545 546 ```python 547 # Fetch Australian reports 548 station = "YWOL" 549 country = "AU" # can source from avwx.Station.country 550 # Get the station's preferred service and initialize to fetch METARs 551 service = avwx.service.get_service(station, country)("metar") 552 # service is now avwx.service.Aubom init'd to fetch METARs 553 # Fetch the current METAR 554 report = service.fetch(station) 555 ``` 556 """ 557 with suppress(KeyError): 558 return PREFERRED[station[:2]] # type: ignore 559 return BY_COUNTRY.get(country_code, Noaa) # type: ignore
Return the preferred scrape service for a given station.
# Fetch Australian reports
station = "YWOL"
country = "AU" # can source from avwx.Station.country
# Get the station's preferred service and initialize to fetch METARs
service = avwx.service.get_service(station, country)("metar")
# service is now avwx.service.Aubom init'd to fetch METARs
# Fetch the current METAR
report = service.fetch(station)
239class Amo(StationScrape): 240 """Request data from AMO KMA for Korean stations.""" 241 242 _url = "http://amoapi.kma.go.kr/amoApi/{}" 243 default_timeout = 60 244 245 def _make_url(self, station: str) -> tuple[str, dict]: 246 """Return a formatted URL and parameters.""" 247 return self._url.format(self.report_type), {"icao": station} 248 249 def _extract(self, raw: str, station: str) -> str: # noqa: ARG002 250 """Extract the report message from XML response.""" 251 resp = parsexml(raw) 252 try: 253 report = resp["response"]["body"]["items"]["item"][f"{self.report_type.lower()}Msg"] 254 except KeyError as key_error: 255 raise self._make_err(raw) from key_error 256 if not report: 257 msg = "The station might not exist" 258 raise self._make_err(msg) 259 # Replace line breaks 260 report = report.replace("\n", "") 261 # Remove excess leading and trailing data 262 for item in (self.report_type.upper(), "SPECI"): 263 if report.startswith(f"{item} "): 264 report = report[len(item) + 1 :] 265 report = report.rstrip("=") 266 # Make every element single-spaced and stripped 267 return " ".join(report.split())
Request data from AMO KMA for Korean stations.
290class Aubom(StationScrape): 291 """Request data from the Australian Bureau of Meteorology.""" 292 293 _url = "http://www.bom.gov.au/aviation/php/process.php" 294 method = "POST" 295 296 @staticmethod 297 def _make_headers() -> dict: 298 """Return request headers.""" 299 return { 300 "Content-Type": "application/x-www-form-urlencoded", 301 "Accept": "*/*", 302 "Accept-Language": "en-us", 303 "Accept-Encoding": "gzip, deflate", 304 "Host": "www.bom.gov.au", 305 "Origin": "http://www.bom.gov.au", 306 "User-Agent": secrets.choice(_USER_AGENTS), 307 "Connection": "keep-alive", 308 } 309 310 def _post_data(self, station: str) -> dict: 311 """Return the POST form.""" 312 return {"keyword": station, "type": "search", "page": "TAF"} 313 314 def _extract(self, raw: str, station: str) -> str: # noqa: ARG002 315 """Extract the reports from HTML response.""" 316 index = 1 if self.report_type == "taf" else 2 317 try: 318 report = raw.split("<p")[index] 319 report = report[report.find(">") + 1 :] 320 except IndexError as index_error: 321 msg = "The station might not exist" 322 raise self._make_err(msg) from index_error 323 if report.startswith("<"): 324 return "" 325 report = report[: report.find("</p>")] 326 return report.replace("<br />", " ")
Request data from the Australian Bureau of Meteorology.
391class Avt(StationScrape): 392 """Request data from AVT/XiamenAir for China. 393 NOTE: This should be replaced later with a gov+https source. 394 """ 395 396 _url = "http://www.avt7.com/Home/AirportMetarInfo?airport4Code=" 397 398 def _make_url(self, station: str) -> tuple[str, dict]: 399 """Return a formatted URL and empty parameters.""" 400 return self._url + station, {} 401 402 def _extract(self, raw: str, station: str) -> str: # noqa: ARG002 403 """Extract the reports from HTML response.""" 404 try: 405 data = json.loads(raw) 406 key = f"{self.report_type.lower()}ContentList" 407 text: str = data[key]["rows"][0]["content"] 408 except (TypeError, json.decoder.JSONDecodeError, KeyError, IndexError): 409 return "" 410 else: 411 return text
Request data from AVT/XiamenAir for China. NOTE: This should be replaced later with a gov+https source.
270class Mac(StationScrape): 271 """Request data from Meteorologia Aeronautica Civil for Columbian stations.""" 272 273 _url = "https://meteorologia.aerocivil.gov.co/expert_text_query/parse" 274 method = "POST" 275 276 @staticmethod 277 def _make_headers() -> dict: 278 """Return request headers.""" 279 return {"X-Requested-With": "XMLHttpRequest"} 280 281 def _post_data(self, station: str) -> dict: 282 """Return the POST form/data payload.""" 283 return {"query": f"{self.report_type} {station}"} 284 285 def _extract(self, raw: str, station: str) -> str: 286 """Extract the report message using string finding.""" 287 return self._simple_extract(raw, f"{station.upper()} ", "=")
Request data from Meteorologia Aeronautica Civil for Columbian stations.
372class Nam(StationScrape): 373 """Request data from NorthAviMet for North Atlantic and Nordic countries.""" 374 375 _url = "https://www.northavimet.com/NamConWS/rest/opmet/command/0/" 376 377 def _make_url(self, station: str) -> tuple[str, dict]: 378 """Return a formatted URL and empty parameters.""" 379 return self._url + station, {} 380 381 def _extract(self, raw: str, station: str) -> str: 382 """Extract the reports from HTML response.""" 383 starts = [f">{self.report_type.upper()} <", f">{station.upper()}<", "top'>"] 384 report = self._simple_extract(raw, starts, "=") 385 index = report.rfind(">") 386 if index > -1: 387 report = report[index + 1 :] 388 return f"{station} {report.strip()}"
Request data from NorthAviMet for North Atlantic and Nordic countries.
329class Olbs(StationScrape): 330 """Request data from India OLBS flight briefing.""" 331 332 # _url = "https://olbs.amsschennai.gov.in/nsweb/FlightBriefing/showopmetquery.php" 333 # method = "POST" 334 335 # Temp redirect 336 _url = "https://avbrief3.el.r.appspot.com/" 337 338 def _make_url(self, station: str) -> tuple[str, dict]: 339 """Return a formatted URL and empty parameters.""" 340 return self._url, {"icao": station} 341 342 def _post_data(self, station: str) -> dict: 343 """Return the POST form.""" 344 # Can set icaos to "V*" to return all results 345 return {"icaos": station, "type": self.report_type} 346 347 @staticmethod 348 def _make_headers() -> dict: 349 """Return request headers.""" 350 return { 351 # "Content-Type": "application/x-www-form-urlencoded", 352 # "Accept": "text/html, */*; q=0.01", 353 # "Accept-Language": "en-us", 354 "Accept-Encoding": "gzip, deflate, br", 355 # "Host": "olbs.amsschennai.gov.in", 356 "User-Agent": secrets.choice(_USER_AGENTS), 357 "Connection": "keep-alive", 358 # "Referer": "https://olbs.amsschennai.gov.in/nsweb/FlightBriefing/", 359 # "X-Requested-With": "XMLHttpRequest", 360 "Accept-Language": "en-US,en;q=0.9", 361 "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", 362 "Referer": "https://avbrief3.el.r.appspot.com/", 363 "Host": "avbrief3.el.r.appspot.com", 364 } 365 366 def _extract(self, raw: str, station: str) -> str: 367 """Extract the reports from HTML response.""" 368 # start = raw.find(f"{self.report_type.upper()} {station} ") 369 return self._simple_extract(raw, [f">{self.report_type.upper()}</div>", station], ["=", "<"])
Request data from India OLBS flight briefing.
422class FaaNotam(ScrapeService): 423 """Source NOTAMs from official FAA portal.""" 424 425 _url = "https://notams.aim.faa.gov/notamSearch/search" 426 method = "POST" 427 _valid_types = ("notam",) 428 429 @staticmethod 430 def _make_headers() -> dict: 431 return {"Content-Type": "application/x-www-form-urlencoded"} 432 433 @staticmethod 434 def _split_coord(prefix: str, value: float) -> dict: 435 """Add coordinate deg/min/sec fields per float value.""" 436 degree, minute, second = Coord.to_dms(value) 437 if prefix == "lat": 438 key = "latitude" 439 direction = "N" if degree >= 0 else "S" 440 else: 441 key = "longitude" 442 direction = "E" if degree >= 0 else "W" 443 return { 444 f"{prefix}Degrees": abs(degree), 445 f"{prefix}Minutes": minute, 446 f"{prefix}Seconds": second, 447 f"{key}Direction": direction, 448 } 449 450 def _post_for( 451 self, 452 icao: str | None = None, 453 coord: Coord | None = None, 454 path: list[str] | None = None, 455 radius: int = 10, 456 ) -> dict: 457 """Generate POST payload for search params in location order.""" 458 data: dict[str, Any] = {"notamsOnly": False, "radius": radius} 459 if icao: 460 data["searchType"] = 0 461 data["designatorsForLocation"] = icao 462 elif coord: 463 data["searchType"] = 3 464 data["radiusSearchOnDesignator"] = False 465 data |= self._split_coord("lat", coord.lat) 466 data |= self._split_coord("long", coord.lon) 467 elif path: 468 data["searchType"] = 6 469 data["flightPathText"] = " ".join(path) 470 data["flightPathBuffer"] = radius 471 data["flightPathIncludeNavaids"] = True 472 data["flightPathIncludeArtcc"] = False 473 data["flightPathIncludeTfr"] = True 474 data["flightPathIncludeRegulatory"] = False 475 data["flightPathResultsType"] = "All NOTAMs" 476 else: 477 msg = "Not enough info to request NOTAM data" 478 raise InvalidRequest(msg) 479 return data 480 481 def fetch( 482 self, 483 icao: str | None = None, 484 coord: Coord | None = None, 485 path: list[str] | None = None, 486 radius: int = 10, 487 timeout: int = 10, 488 ) -> list[str]: 489 """Fetch NOTAM list from the service via ICAO, coordinate, or ident path.""" 490 return aio.run(self.async_fetch(icao, coord, path, radius, timeout)) 491 492 async def async_fetch( 493 self, 494 icao: str | None = None, 495 coord: Coord | None = None, 496 path: list[str] | None = None, 497 radius: int = 10, 498 timeout: int = 10, 499 ) -> list[str]: 500 """Async fetch NOTAM list from the service via ICAO, coordinate, or ident path.""" 501 headers = self._make_headers() 502 data = self._post_for(icao, coord, path, radius) 503 notams = [] 504 while True: 505 text = await self._call(self._url, None, headers, data, timeout) 506 resp: dict = json.loads(text) 507 if resp.get("error"): 508 msg = "Search criteria appears to be invalid" 509 raise self._make_err(msg) 510 for item in resp["notamList"]: 511 if report := item.get("icaoMessage", "").strip(): 512 report = _TAG_PATTERN.sub("", report).strip() 513 if issued := item.get("issueDate"): 514 report = f"{issued}||{report}" 515 notams.append(report) 516 offset = resp["endRecordCount"] 517 if not notams or offset >= resp["totalNotamCount"]: 518 break 519 data["offset"] = offset 520 return notams
Source NOTAMs from official FAA portal.
481 def fetch( 482 self, 483 icao: str | None = None, 484 coord: Coord | None = None, 485 path: list[str] | None = None, 486 radius: int = 10, 487 timeout: int = 10, 488 ) -> list[str]: 489 """Fetch NOTAM list from the service via ICAO, coordinate, or ident path.""" 490 return aio.run(self.async_fetch(icao, coord, path, radius, timeout))
Fetch NOTAM list from the service via ICAO, coordinate, or ident path.
492 async def async_fetch( 493 self, 494 icao: str | None = None, 495 coord: Coord | None = None, 496 path: list[str] | None = None, 497 radius: int = 10, 498 timeout: int = 10, 499 ) -> list[str]: 500 """Async fetch NOTAM list from the service via ICAO, coordinate, or ident path.""" 501 headers = self._make_headers() 502 data = self._post_for(icao, coord, path, radius) 503 notams = [] 504 while True: 505 text = await self._call(self._url, None, headers, data, timeout) 506 resp: dict = json.loads(text) 507 if resp.get("error"): 508 msg = "Search criteria appears to be invalid" 509 raise self._make_err(msg) 510 for item in resp["notamList"]: 511 if report := item.get("icaoMessage", "").strip(): 512 report = _TAG_PATTERN.sub("", report).strip() 513 if issued := item.get("issueDate"): 514 report = f"{issued}||{report}" 515 notams.append(report) 516 offset = resp["endRecordCount"] 517 if not notams or offset >= resp["totalNotamCount"]: 518 break 519 data["offset"] = offset 520 return notams
Async fetch NOTAM list from the service via ICAO, coordinate, or ident path.
241class NoaaGfs(NoaaForecast): 242 """Request forecast data from NOAA GFS FTP servers.""" 243 244 _url = "https://nomads.ncep.noaa.gov/pub/data/nccf/com/gfs/prod/gfsmos.{}/mdl_gfs{}.t{}z" 245 _valid_types = ("mav", "mex") 246 247 _cycles: ClassVar[dict[str, tuple[int, ...]]] = {"mav": (0, 6, 12, 18), "mex": (0, 12)} 248 249 @property 250 def _urls(self) -> Iterator[str]: 251 """Iterate through update cycles no older than two days.""" 252 warnings.warn( 253 "GFS fetch has been deprecated due to NOAA retiring the format. Migrate to NBM for similar data", 254 DeprecationWarning, 255 stacklevel=2, 256 ) 257 now = dt.datetime.now(tz=dt.timezone.utc) 258 date = dt.datetime.now(tz=dt.timezone.utc) 259 cutoff = date - dt.timedelta(days=1) 260 while date > cutoff: 261 for cycle in reversed(self._cycles[self.report_type]): 262 date = date.replace(hour=cycle) 263 if date > now: 264 continue 265 timestamp = date.strftime(r"%Y%m%d") 266 hour = str(date.hour).zfill(2) 267 yield self._url.format(timestamp, self.report_type, hour) 268 date -= dt.timedelta(hours=1) 269 270 def _index_target(self, station: str) -> tuple[str, str]: 271 return f"{station} GFS", f"{self.report_type.upper()} GUIDANCE"
Request forecast data from NOAA GFS FTP servers.
220class NoaaNbm(NoaaForecast): 221 """Request forecast data from NOAA NBM FTP servers.""" 222 223 _url = "https://nomads.ncep.noaa.gov/pub/data/nccf/com/blend/prod/blend.{}/{}/text/blend_{}tx.t{}z" 224 _valid_types = ("nbh", "nbs", "nbe", "nbx") 225 226 @property 227 def _urls(self) -> Iterator[str]: 228 """Iterate through hourly updates no older than two days.""" 229 date = dt.datetime.now(tz=dt.timezone.utc) 230 cutoff = date - dt.timedelta(days=1) 231 while date > cutoff: 232 timestamp = date.strftime(r"%Y%m%d") 233 hour = str(date.hour).zfill(2) 234 yield self._url.format(timestamp, hour, self.report_type, hour) 235 date -= dt.timedelta(hours=1) 236 237 def _index_target(self, station: str) -> tuple[str, str]: 238 return f"{station} ", f"{self.report_type.upper()} GUIDANCE"
Request forecast data from NOAA NBM FTP servers.
35class Service: 36 """Base Service class for fetching reports.""" 37 38 report_type: str 39 _url: ClassVar[str] = "" 40 _valid_types: ClassVar[tuple[str, ...]] = () 41 42 def __init__(self, report_type: str): 43 if self._valid_types and report_type not in self._valid_types: 44 msg = f"'{report_type}' is not a valid report type for {self.__class__.__name__}. Expected {self._valid_types}" 45 raise ValueError(msg) 46 self.report_type = report_type 47 48 @property 49 def root(self) -> str | None: 50 """Return the service's root URL.""" 51 if self._url is None: 52 return None 53 url = self._url[self._url.find("//") + 2 :] 54 return url[: url.find("/")]
Base Service class for fetching reports.