avwx.service
Report Source Services
AVWX fetches the raw weather reports from third-party services via REST API calls or file downloads. We use Service objects to handle the request and extraction for us.
Basic Module Use
METARs and TAFs are the most widely-supported report types, so an effort has been made to localize some of these services to a regional source. The get_service
function was introduced to determine the best service for a given station.
# Fetch Australian reports
station = 'YWOL'
country = 'AU' # can source from avwx.Station.country
# Get the station's preferred service and initialize to fetch METARs
service = avwx.service.get_service(station, country)('metar')
# service is now avwx.service.AUBOM init'd to fetch METARs
# Fetch the current METAR
report = service.fetch(station)
Other report types require specific service classes which are found in their respective submodules. However, you can normally let the report type classes handle these services for you.
Adding a New Service
If the existing services are not supplying the report(s) you need, adding a new service is easy. First, you'll need to determine if your source can be scraped or you need to download a file.
ScrapeService
For web scraping sources, you'll need to do the following things:
- Add the base URL and method (if not
"GET"
) - Implement the
ScrapeService._make_url
to return the source URL and query parameters - Implement the
ScrapeService._extract
function to return just the report string (starting at the station ID) from the response
Let's look at the MAC service as an example:
class MAC(StationScrape):
"""Requests data from Meteorologia Aeronautica Civil for Columbian stations"""
_url = "http://meteorologia.aerocivil.gov.co/expert_text_query/parse"
method = "POST"
def _make_url(self, station: str) -> tuple[str, dict]:
"""Returns a formatted URL and parameters"""
return self._url, {"query": f"{self.report_type} {station}"}
def _extract(self, raw: str, station: str) -> str:
"""Extracts the report message using string finding"""
return self._simple_extract(raw, f"{station.upper()} ", "=")
Our URL and query parameters are returned using _make_url
so fetch
knows how to request the report. The result of this query is given to _extract
which returns the report or list of reports.
Once your service is created, it can optionally be added to avwx.service.scrape.PREFERRED
if the service covers all stations with a known ICAO prefix or avwx.service.scrape.BY_COUNTRY
if the service covers all stations in a single country. This is how avwx.service.get_service
determines the preferred service. For example, the MAC service is preferred over NOAA for all ICAOs starting with "SK" while AUBOM is better for all Australian stations.
FileService
For file-based sources, you'll need to do the following things:
- Add the base URL and valid report types
- Implement the
FileService._urls
to iterate through source URLs - Implement the
FileService._extract
function to return just the report string (starting at the station ID) from the response
Let's look at the NOAA_NBM service as an example:
class NOAA_NBM(FileService):
"""Requests forecast data from NOAA NBM FTP servers"""
_url = "https://nomads.ncep.noaa.gov/pub/data/nccf/com/blend/prod/blend.{}/{}/text/blend_{}tx.t{}z"
_valid_types = ("nbh", "nbs", "nbe")
@property
def _urls(self) -> Iterator[str]:
"""Iterates through hourly updates no older than two days"""
date = dt.datetime.now(tz=dt.timezone.utc)
cutoff = date - dt.timedelta(days=1)
while date > cutoff:
timestamp = date.strftime(r"%Y%m%d")
hour = str(date.hour).zfill(2)
yield self.url.format(timestamp, hour, self.report_type, hour)
date -= dt.timedelta(hours=1)
def _extract(self, station: str, source: TextIO) -> Optional[str]:
"""Returns report pulled from the saved file"""
start = station + " "
end = self.report_type.upper() + " GUIDANCE"
txt = source.read()
txt = txt[txt.find(start) :]
txt = txt[: txt.find(end, 30)]
lines = []
for line in txt.split("\n"):
if "CLIMO" not in line:
line = line.strip()
if not line:
break
lines.append(line)
return "\n".join(lines) or None
In this example, we iterate through _urls
looking for the most recent published file. URL iterators should always have a lower bound to stop iteration so the service can return a null response.
Once the file is downloaded, the requested station and file-like object are passed to the _extract
method to find and return the report from the file. This method will not be called if the file doesn't exist.
1""".. include:: ../../docs/service.md""" 2 3from avwx.service.base import Service 4from avwx.service.files import NoaaGfs, NoaaNbm 5from avwx.service.scrape import ( 6 Amo, 7 Aubom, 8 Avt, 9 FaaNotam, 10 Mac, 11 Nam, 12 Noaa, 13 Olbs, 14 get_service, 15) 16 17__all__ = ( 18 "get_service", 19 "Noaa", 20 "Amo", 21 "Aubom", 22 "Avt", 23 "Mac", 24 "Nam", 25 "Olbs", 26 "FaaNotam", 27 "NoaaGfs", 28 "NoaaNbm", 29 "Service", 30)
529def get_service(station: str, country_code: str) -> ScrapeService: 530 """Return the preferred scrape service for a given station. 531 532 ```python 533 # Fetch Australian reports 534 station = "YWOL" 535 country = "AU" # can source from avwx.Station.country 536 # Get the station's preferred service and initialize to fetch METARs 537 service = avwx.service.get_service(station, country)("metar") 538 # service is now avwx.service.Aubom init'd to fetch METARs 539 # Fetch the current METAR 540 report = service.fetch(station) 541 ``` 542 """ 543 with suppress(KeyError): 544 return PREFERRED[station[:2]] # type: ignore 545 return BY_COUNTRY.get(country_code, Noaa) # type: ignore
Return the preferred scrape service for a given station.
# Fetch Australian reports
station = "YWOL"
country = "AU" # can source from avwx.Station.country
# Get the station's preferred service and initialize to fetch METARs
service = avwx.service.get_service(station, country)("metar")
# service is now avwx.service.Aubom init'd to fetch METARs
# Fetch the current METAR
report = service.fetch(station)
225class Amo(StationScrape): 226 """Request data from AMO KMA for Korean stations.""" 227 228 _url = "http://amoapi.kma.go.kr/amoApi/{}" 229 default_timeout = 60 230 231 def _make_url(self, station: str) -> tuple[str, dict]: 232 """Return a formatted URL and parameters.""" 233 return self._url.format(self.report_type), {"icao": station} 234 235 def _extract(self, raw: str, station: str) -> str: # noqa: ARG002 236 """Extract the report message from XML response.""" 237 resp = parsexml(raw) 238 try: 239 report = resp["response"]["body"]["items"]["item"][f"{self.report_type.lower()}Msg"] 240 except KeyError as key_error: 241 raise self._make_err(raw) from key_error 242 if not report: 243 msg = "The station might not exist" 244 raise self._make_err(msg) 245 # Replace line breaks 246 report = report.replace("\n", "") 247 # Remove excess leading and trailing data 248 for item in (self.report_type.upper(), "SPECI"): 249 if report.startswith(f"{item} "): 250 report = report[len(item) + 1 :] 251 report = report.rstrip("=") 252 # Make every element single-spaced and stripped 253 return " ".join(report.split())
Request data from AMO KMA for Korean stations.
276class Aubom(StationScrape): 277 """Request data from the Australian Bureau of Meteorology.""" 278 279 _url = "http://www.bom.gov.au/aviation/php/process.php" 280 method = "POST" 281 282 @staticmethod 283 def _make_headers() -> dict: 284 """Return request headers.""" 285 return { 286 "Content-Type": "application/x-www-form-urlencoded", 287 "Accept": "*/*", 288 "Accept-Language": "en-us", 289 "Accept-Encoding": "gzip, deflate", 290 "Host": "www.bom.gov.au", 291 "Origin": "http://www.bom.gov.au", 292 "User-Agent": secrets.choice(_USER_AGENTS), 293 "Connection": "keep-alive", 294 } 295 296 def _post_data(self, station: str) -> dict: 297 """Return the POST form.""" 298 return {"keyword": station, "type": "search", "page": "TAF"} 299 300 def _extract(self, raw: str, station: str) -> str: # noqa: ARG002 301 """Extract the reports from HTML response.""" 302 index = 1 if self.report_type == "taf" else 2 303 try: 304 report = raw.split("<p")[index] 305 report = report[report.find(">") + 1 :] 306 except IndexError as index_error: 307 msg = "The station might not exist" 308 raise self._make_err(msg) from index_error 309 if report.startswith("<"): 310 return "" 311 report = report[: report.find("</p>")] 312 return report.replace("<br />", " ")
Request data from the Australian Bureau of Meteorology.
377class Avt(StationScrape): 378 """Request data from AVT/XiamenAir for China. 379 NOTE: This should be replaced later with a gov+https source. 380 """ 381 382 _url = "http://www.avt7.com/Home/AirportMetarInfo?airport4Code=" 383 384 def _make_url(self, station: str) -> tuple[str, dict]: 385 """Return a formatted URL and empty parameters.""" 386 return self._url + station, {} 387 388 def _extract(self, raw: str, station: str) -> str: # noqa: ARG002 389 """Extract the reports from HTML response.""" 390 try: 391 data = json.loads(raw) 392 key = f"{self.report_type.lower()}ContentList" 393 text: str = data[key]["rows"][0]["content"] 394 except (TypeError, json.decoder.JSONDecodeError, KeyError, IndexError): 395 return "" 396 else: 397 return text
Request data from AVT/XiamenAir for China. NOTE: This should be replaced later with a gov+https source.
256class Mac(StationScrape): 257 """Request data from Meteorologia Aeronautica Civil for Columbian stations.""" 258 259 _url = "https://meteorologia.aerocivil.gov.co/expert_text_query/parse" 260 method = "POST" 261 262 @staticmethod 263 def _make_headers() -> dict: 264 """Return request headers.""" 265 return {"X-Requested-With": "XMLHttpRequest"} 266 267 def _post_data(self, station: str) -> dict: 268 """Return the POST form/data payload.""" 269 return {"query": f"{self.report_type} {station}"} 270 271 def _extract(self, raw: str, station: str) -> str: 272 """Extract the report message using string finding.""" 273 return self._simple_extract(raw, f"{station.upper()} ", "=")
Request data from Meteorologia Aeronautica Civil for Columbian stations.
358class Nam(StationScrape): 359 """Request data from NorthAviMet for North Atlantic and Nordic countries.""" 360 361 _url = "https://www.northavimet.com/NamConWS/rest/opmet/command/0/" 362 363 def _make_url(self, station: str) -> tuple[str, dict]: 364 """Return a formatted URL and empty parameters.""" 365 return self._url + station, {} 366 367 def _extract(self, raw: str, station: str) -> str: 368 """Extract the reports from HTML response.""" 369 starts = [f">{self.report_type.upper()} <", f">{station.upper()}<", "top'>"] 370 report = self._simple_extract(raw, starts, "=") 371 index = report.rfind(">") 372 if index > -1: 373 report = report[index + 1 :] 374 return f"{station} {report.strip()}"
Request data from NorthAviMet for North Atlantic and Nordic countries.
315class Olbs(StationScrape): 316 """Request data from India OLBS flight briefing.""" 317 318 # _url = "https://olbs.amsschennai.gov.in/nsweb/FlightBriefing/showopmetquery.php" 319 # method = "POST" 320 321 # Temp redirect 322 _url = "https://avbrief3.el.r.appspot.com/" 323 324 def _make_url(self, station: str) -> tuple[str, dict]: 325 """Return a formatted URL and empty parameters.""" 326 return self._url, {"icao": station} 327 328 def _post_data(self, station: str) -> dict: 329 """Return the POST form.""" 330 # Can set icaos to "V*" to return all results 331 return {"icaos": station, "type": self.report_type} 332 333 @staticmethod 334 def _make_headers() -> dict: 335 """Return request headers.""" 336 return { 337 # "Content-Type": "application/x-www-form-urlencoded", 338 # "Accept": "text/html, */*; q=0.01", 339 # "Accept-Language": "en-us", 340 "Accept-Encoding": "gzip, deflate, br", 341 # "Host": "olbs.amsschennai.gov.in", 342 "User-Agent": secrets.choice(_USER_AGENTS), 343 "Connection": "keep-alive", 344 # "Referer": "https://olbs.amsschennai.gov.in/nsweb/FlightBriefing/", 345 # "X-Requested-With": "XMLHttpRequest", 346 "Accept-Language": "en-US,en;q=0.9", 347 "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", 348 "Referer": "https://avbrief3.el.r.appspot.com/", 349 "Host": "avbrief3.el.r.appspot.com", 350 } 351 352 def _extract(self, raw: str, station: str) -> str: 353 """Extract the reports from HTML response.""" 354 # start = raw.find(f"{self.report_type.upper()} {station} ") 355 return self._simple_extract(raw, [f">{self.report_type.upper()}</div>", station], ["=", "<"])
Request data from India OLBS flight briefing.
408class FaaNotam(ScrapeService): 409 """Source NOTAMs from official FAA portal.""" 410 411 _url = "https://notams.aim.faa.gov/notamSearch/search" 412 method = "POST" 413 _valid_types = ("notam",) 414 415 @staticmethod 416 def _make_headers() -> dict: 417 return {"Content-Type": "application/x-www-form-urlencoded"} 418 419 @staticmethod 420 def _split_coord(prefix: str, value: float) -> dict: 421 """Add coordinate deg/min/sec fields per float value.""" 422 degree, minute, second = Coord.to_dms(value) 423 if prefix == "lat": 424 key = "latitude" 425 direction = "N" if degree >= 0 else "S" 426 else: 427 key = "longitude" 428 direction = "E" if degree >= 0 else "W" 429 return { 430 f"{prefix}Degrees": abs(degree), 431 f"{prefix}Minutes": minute, 432 f"{prefix}Seconds": second, 433 f"{key}Direction": direction, 434 } 435 436 def _post_for( 437 self, 438 icao: str | None = None, 439 coord: Coord | None = None, 440 path: list[str] | None = None, 441 radius: int = 10, 442 ) -> dict: 443 """Generate POST payload for search params in location order.""" 444 data: dict[str, Any] = {"notamsOnly": False, "radius": radius} 445 if icao: 446 data["searchType"] = 0 447 data["designatorsForLocation"] = icao 448 elif coord: 449 data["searchType"] = 3 450 data["radiusSearchOnDesignator"] = False 451 data |= self._split_coord("lat", coord.lat) 452 data |= self._split_coord("long", coord.lon) 453 elif path: 454 data["searchType"] = 6 455 data["flightPathText"] = " ".join(path) 456 data["flightPathBuffer"] = radius 457 data["flightPathIncludeNavaids"] = True 458 data["flightPathIncludeArtcc"] = False 459 data["flightPathIncludeTfr"] = True 460 data["flightPathIncludeRegulatory"] = False 461 data["flightPathResultsType"] = "All NOTAMs" 462 else: 463 msg = "Not enough info to request NOTAM data" 464 raise InvalidRequest(msg) 465 return data 466 467 def fetch( 468 self, 469 icao: str | None = None, 470 coord: Coord | None = None, 471 path: list[str] | None = None, 472 radius: int = 10, 473 timeout: int = 10, 474 ) -> list[str]: 475 """Fetch NOTAM list from the service via ICAO, coordinate, or ident path.""" 476 return aio.run(self.async_fetch(icao, coord, path, radius, timeout)) 477 478 async def async_fetch( 479 self, 480 icao: str | None = None, 481 coord: Coord | None = None, 482 path: list[str] | None = None, 483 radius: int = 10, 484 timeout: int = 10, 485 ) -> list[str]: 486 """Async fetch NOTAM list from the service via ICAO, coordinate, or ident path.""" 487 headers = self._make_headers() 488 data = self._post_for(icao, coord, path, radius) 489 notams = [] 490 while True: 491 text = await self._call(self._url, None, headers, data, timeout) 492 resp: dict = json.loads(text) 493 if resp.get("error"): 494 msg = "Search criteria appears to be invalid" 495 raise self._make_err(msg) 496 for item in resp["notamList"]: 497 if report := item.get("icaoMessage", "").strip(): 498 report = _TAG_PATTERN.sub("", report).strip() 499 if issued := item.get("issueDate"): 500 report = f"{issued}||{report}" 501 notams.append(report) 502 offset = resp["endRecordCount"] 503 if not notams or offset >= resp["totalNotamCount"]: 504 break 505 data["offset"] = offset 506 return notams
Source NOTAMs from official FAA portal.
467 def fetch( 468 self, 469 icao: str | None = None, 470 coord: Coord | None = None, 471 path: list[str] | None = None, 472 radius: int = 10, 473 timeout: int = 10, 474 ) -> list[str]: 475 """Fetch NOTAM list from the service via ICAO, coordinate, or ident path.""" 476 return aio.run(self.async_fetch(icao, coord, path, radius, timeout))
Fetch NOTAM list from the service via ICAO, coordinate, or ident path.
478 async def async_fetch( 479 self, 480 icao: str | None = None, 481 coord: Coord | None = None, 482 path: list[str] | None = None, 483 radius: int = 10, 484 timeout: int = 10, 485 ) -> list[str]: 486 """Async fetch NOTAM list from the service via ICAO, coordinate, or ident path.""" 487 headers = self._make_headers() 488 data = self._post_for(icao, coord, path, radius) 489 notams = [] 490 while True: 491 text = await self._call(self._url, None, headers, data, timeout) 492 resp: dict = json.loads(text) 493 if resp.get("error"): 494 msg = "Search criteria appears to be invalid" 495 raise self._make_err(msg) 496 for item in resp["notamList"]: 497 if report := item.get("icaoMessage", "").strip(): 498 report = _TAG_PATTERN.sub("", report).strip() 499 if issued := item.get("issueDate"): 500 report = f"{issued}||{report}" 501 notams.append(report) 502 offset = resp["endRecordCount"] 503 if not notams or offset >= resp["totalNotamCount"]: 504 break 505 data["offset"] = offset 506 return notams
Async fetch NOTAM list from the service via ICAO, coordinate, or ident path.
241class NoaaGfs(NoaaForecast): 242 """Request forecast data from NOAA GFS FTP servers.""" 243 244 _url = "https://nomads.ncep.noaa.gov/pub/data/nccf/com/gfs/prod/gfsmos.{}/mdl_gfs{}.t{}z" 245 _valid_types = ("mav", "mex") 246 247 _cycles: ClassVar[dict[str, tuple[int, ...]]] = {"mav": (0, 6, 12, 18), "mex": (0, 12)} 248 249 @property 250 def _urls(self) -> Iterator[str]: 251 """Iterate through update cycles no older than two days.""" 252 warnings.warn( 253 "GFS fetch has been deprecated due to NOAA retiring the format. Migrate to NBM for similar data", 254 DeprecationWarning, 255 stacklevel=2, 256 ) 257 now = dt.datetime.now(tz=dt.timezone.utc) 258 date = dt.datetime.now(tz=dt.timezone.utc) 259 cutoff = date - dt.timedelta(days=1) 260 while date > cutoff: 261 for cycle in reversed(self._cycles[self.report_type]): 262 date = date.replace(hour=cycle) 263 if date > now: 264 continue 265 timestamp = date.strftime(r"%Y%m%d") 266 hour = str(date.hour).zfill(2) 267 yield self._url.format(timestamp, self.report_type, hour) 268 date -= dt.timedelta(hours=1) 269 270 def _index_target(self, station: str) -> tuple[str, str]: 271 return f"{station} GFS", f"{self.report_type.upper()} GUIDANCE"
Request forecast data from NOAA GFS FTP servers.
220class NoaaNbm(NoaaForecast): 221 """Request forecast data from NOAA NBM FTP servers.""" 222 223 _url = "https://nomads.ncep.noaa.gov/pub/data/nccf/com/blend/prod/blend.{}/{}/text/blend_{}tx.t{}z" 224 _valid_types = ("nbh", "nbs", "nbe", "nbx") 225 226 @property 227 def _urls(self) -> Iterator[str]: 228 """Iterate through hourly updates no older than two days.""" 229 date = dt.datetime.now(tz=dt.timezone.utc) 230 cutoff = date - dt.timedelta(days=1) 231 while date > cutoff: 232 timestamp = date.strftime(r"%Y%m%d") 233 hour = str(date.hour).zfill(2) 234 yield self._url.format(timestamp, hour, self.report_type, hour) 235 date -= dt.timedelta(hours=1) 236 237 def _index_target(self, station: str) -> tuple[str, str]: 238 return f"{station} ", f"{self.report_type.upper()} GUIDANCE"
Request forecast data from NOAA NBM FTP servers.
35class Service: 36 """Base Service class for fetching reports.""" 37 38 report_type: str 39 _url: ClassVar[str] = "" 40 _valid_types: ClassVar[tuple[str, ...]] = () 41 42 def __init__(self, report_type: str): 43 if self._valid_types and report_type not in self._valid_types: 44 msg = f"'{report_type}' is not a valid report type for {self.__class__.__name__}. Expected {self._valid_types}" 45 raise ValueError(msg) 46 self.report_type = report_type 47 48 @property 49 def root(self) -> str | None: 50 """Return the service's root URL.""" 51 if self._url is None: 52 return None 53 url = self._url[self._url.find("//") + 2 :] 54 return url[: url.find("/")]
Base Service class for fetching reports.