avwx.service.scrape

These services request reports via HTML scraping or direct API requests. Requests are ephemeral and will call the selected service each time.

  1"""
  2These services request reports via HTML scraping or direct API requests.
  3Requests are ephemeral and will call the selected service each time.
  4"""
  5
  6# pylint: disable=arguments-differ,invalid-name,too-many-arguments
  7
  8# stdlib
  9import asyncio as aio
 10import json
 11import random
 12import re
 13from contextlib import suppress
 14from typing import Any, Dict, List, Optional, Tuple, TypeVar, Union
 15
 16# library
 17from xmltodict import parse as parsexml  # type: ignore
 18
 19# module
 20from avwx.parsing.core import dedupe
 21from avwx.exceptions import InvalidRequest
 22from avwx.service.base import CallsHTTP, Service
 23from avwx.station import valid_station, Station
 24from avwx.structs import Coord
 25
 26
 27_T = TypeVar("_T")
 28
 29
 30_USER_AGENTS = [
 31    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_6) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0.3 Safari/605.1.15"
 32    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_5) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.1.1 Safari/605.1.15",
 33    "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:77.0) Gecko/20100101 Firefox/77.0",
 34    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.97 Safari/537.36",
 35    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:77.0) Gecko/20100101 Firefox/77.0",
 36    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.97 Safari/537.36",
 37]
 38
 39
 40class ScrapeService(Service, CallsHTTP):  # pylint: disable=too-few-public-methods
 41    """Service class for fetching reports via direct web requests
 42
 43    Unless overwritten, this class accepts `"metar"` and `"taf"` as valid report types
 44    """
 45
 46    default_timeout = 10
 47    _valid_types: Tuple[str, ...] = ("metar", "taf")
 48    _strip_whitespace: bool = True
 49
 50    def _make_err(self, body: str, key: str = "report path") -> InvalidRequest:
 51        """Returns an InvalidRequest exception with formatted error message"""
 52        msg = f"Could not find {key} in {self.__class__.__name__} response\n"
 53        return InvalidRequest(msg + body)
 54
 55    @staticmethod
 56    def _make_headers() -> dict:
 57        """Returns request headers"""
 58        return {}
 59
 60    def _post_data(self, station: str) -> dict:  # pylint: disable=unused-argument
 61        """Returns the POST form/data payload"""
 62        return {}
 63
 64    def _clean_report(self, report: _T) -> _T:
 65        """Replaces all *whitespace elements with a single space if enabled"""
 66        if not self._strip_whitespace:
 67            return report
 68        if isinstance(report, list):
 69            return dedupe(" ".join(r.split()) for r in report)  # type: ignore
 70        return " ".join(report.split()) if isinstance(report, str) else report  # type: ignore
 71
 72
 73class StationScrape(ScrapeService):
 74    """Service class fetching reports from a station code"""
 75
 76    def _make_url(self, station: str) -> Tuple[str, dict]:
 77        """Returns a formatted URL and parameters"""
 78        raise NotImplementedError()
 79
 80    def _extract(self, raw: str, station: str) -> str:
 81        """Extracts the report string from the service response"""
 82        raise NotImplementedError()
 83
 84    def _simple_extract(self, raw: str, starts: Union[str, List[str]], end: str) -> str:
 85        """Simple extract by cutting at sequential start and end points"""
 86        targets = [starts] if isinstance(starts, str) else starts
 87        for target in targets:
 88            index = raw.find(target)
 89            if index == -1:
 90                raise self._make_err("The station might not exist")
 91            raw = raw[index:]
 92        report = raw[: raw.find(end)].strip()
 93        return " ".join(dedupe(report.split()))
 94
 95    async def _fetch(self, station: str, url: str, params: dict, timeout: int) -> str:
 96        headers = self._make_headers()
 97        data = self._post_data(station) if self.method.lower() == "post" else None
 98        text = await self._call(
 99            url, params=params, headers=headers, data=data, timeout=timeout
100        )
101        report = self._extract(text, station)
102        return self._clean_report(report)
103
104    def fetch(
105        self,
106        station: str,
107        timeout: Optional[int] = None,
108    ) -> str:
109        """Fetches a report string from the service"""
110        return aio.run(self.async_fetch(station, timeout))
111
112    async def async_fetch(self, station: str, timeout: Optional[int] = None) -> str:
113        """Asynchronously fetch a report string from the service"""
114        if timeout is None:
115            timeout = self.default_timeout
116        valid_station(station)
117        url, params = self._make_url(station)
118        return await self._fetch(station, url, params, timeout)
119
120
121# Multiple sources for NOAA data
122
123
124class NOAA_FTP(StationScrape):
125    """Requests data from NOAA via FTP"""
126
127    _url = "https://tgftp.nws.noaa.gov/data/{}/{}/stations/{}.TXT"
128
129    def _make_url(self, station: str) -> Tuple[str, dict]:
130        """Returns a formatted URL and parameters"""
131        root = "forecasts" if self.report_type == "taf" else "observations"
132        return self._url.format(root, self.report_type, station), {}
133
134    def _extract(self, raw: str, station: str) -> str:
135        """Extracts the report using string finding"""
136        raw = raw[raw.find(station) :]
137        return raw[: raw.find('"')]
138
139
140class _NOAA_ScrapeURL:
141    """Mixin implementing NOAA scrape service URL"""
142
143    # pylint: disable=too-few-public-methods
144
145    report_type: str
146    _url = "https://aviationweather.gov/cgi-bin/data/{}.php"
147
148    def _make_url(self, station: str, **kwargs: Union[int, str]) -> Tuple[str, dict]:
149        """Returns a formatted URL and parameters"""
150        hours = 7 if self.report_type == "taf" else 2
151        params = {"ids": station, "format": "raw", "hours": hours, **kwargs}
152        return self._url.format(self.report_type), params
153
154
155class NOAA_Scrape(_NOAA_ScrapeURL, StationScrape):
156    """Requests data from NOAA via response scraping"""
157
158    def _extract(self, raw: str, station: str) -> str:
159        """Extracts the first report"""
160        report = ""
161        for line in raw.strip().split("\n"):
162            # Break when seeing the second non-indented line (next report)
163            if line and line[0].isalnum() and report:
164                break
165            report += line
166        return report
167
168
169class NOAA_ScrapeList(_NOAA_ScrapeURL, ScrapeService):
170    """Request listed data from NOAA via response scraping"""
171
172    _valid_types = ("pirep",)
173
174    def _extract(self, raw: str, station: str) -> List[str]:
175        """Extracts the report strings"""
176        return raw.strip().split("\n")
177
178    async def _fetch(
179        self, station: str, url: str, params: dict, timeout: int
180    ) -> List[str]:
181        headers = self._make_headers()
182        data = self._post_data(station) if self.method.lower() == "post" else None
183        text = await self._call(
184            url, params=params, headers=headers, data=data, timeout=timeout
185        )
186        report = self._extract(text, station)
187        return self._clean_report(report)
188
189    def fetch(
190        self,
191        icao: Optional[str] = None,
192        coord: Optional[Coord] = None,
193        radius: int = 10,
194        timeout: Optional[int] = None,
195    ) -> List[str]:
196        """Fetches a report string from the service"""
197        return aio.run(self.async_fetch(icao, coord, radius, timeout))
198
199    async def async_fetch(
200        self,
201        icao: Optional[str] = None,
202        coord: Optional[Coord] = None,
203        radius: int = 10,
204        timeout: Optional[int] = None,
205    ) -> List[str]:
206        """Asynchronously fetch a report string from the service"""
207        if timeout is None:
208            timeout = self.default_timeout
209        station: str
210        if icao:
211            valid_station(icao)
212            station = icao
213        elif coord:
214            if ret := Station.nearest(coord.lat, coord.lon, max_coord_distance=radius):
215                station = ret[0].icao or ""
216            else:
217                raise ValueError(
218                    f"No reference station near enough to {coord} to call service"
219                )
220        url, params = self._make_url(station, distance=radius)
221        return await self._fetch(station, url, params, timeout)
222
223
224NOAA = NOAA_Scrape
225
226
227# Regional data sources
228
229
230class AMO(StationScrape):
231    """Requests data from AMO KMA for Korean stations"""
232
233    _url = "http://amoapi.kma.go.kr/amoApi/{}"
234    default_timeout = 60
235
236    def _make_url(self, station: str) -> Tuple[str, dict]:
237        """Returns a formatted URL and parameters"""
238        return self._url.format(self.report_type), {"icao": station}
239
240    def _extract(self, raw: str, station: str) -> str:
241        """Extracts the report message from XML response"""
242        resp = parsexml(raw)
243        try:
244            report = resp["response"]["body"]["items"]["item"][
245                f"{self.report_type.lower()}Msg"
246            ]
247        except KeyError as key_error:
248            raise self._make_err(raw) from key_error
249        if not report:
250            raise self._make_err("The station might not exist")
251        # Replace line breaks
252        report = report.replace("\n", "")
253        # Remove excess leading and trailing data
254        for item in (self.report_type.upper(), "SPECI"):
255            if report.startswith(f"{item} "):
256                report = report[len(item) + 1 :]
257        report = report.rstrip("=")
258        # Make every element single-spaced and stripped
259        return " ".join(report.split())
260
261
262class MAC(StationScrape):
263    """Requests data from Meteorologia Aeronautica Civil for Columbian stations"""
264
265    _url = "http://meteorologia.aerocivil.gov.co/expert_text_query/parse"
266    method = "POST"
267
268    def _make_url(self, station: str) -> Tuple[str, dict]:
269        """Returns a formatted URL and parameters"""
270        return self._url, {"query": f"{self.report_type} {station}"}
271
272    def _extract(self, raw: str, station: str) -> str:
273        """Extracts the report message using string finding"""
274        return self._simple_extract(raw, f"{station.upper()} ", "=")
275
276
277class AUBOM(StationScrape):
278    """Requests data from the Australian Bureau of Meteorology"""
279
280    _url = "http://www.bom.gov.au/aviation/php/process.php"
281    method = "POST"
282
283    def _make_url(self, _: Any) -> Tuple[str, dict]:
284        """Returns a formatted URL and empty parameters"""
285        return self._url, {}
286
287    @staticmethod
288    def _make_headers() -> dict:
289        """Returns request headers"""
290        return {
291            "Content-Type": "application/x-www-form-urlencoded",
292            "Accept": "*/*",
293            "Accept-Language": "en-us",
294            "Accept-Encoding": "gzip, deflate",
295            "Host": "www.bom.gov.au",
296            "Origin": "http://www.bom.gov.au",
297            "User-Agent": random.choice(_USER_AGENTS),
298            "Connection": "keep-alive",
299        }
300
301    def _post_data(self, station: str) -> dict:
302        """Returns the POST form"""
303        return {"keyword": station, "type": "search", "page": "TAF"}
304
305    def _extract(self, raw: str, station: str) -> str:
306        """Extracts the reports from HTML response"""
307        index = 1 if self.report_type == "taf" else 2
308        try:
309            report = raw.split("<p")[index]
310            report = report[report.find(">") + 1 :]
311        except IndexError as index_error:
312            raise self._make_err("The station might not exist") from index_error
313        if report.startswith("<"):
314            return ""
315        report = report[: report.find("</p>")]
316        return report.replace("<br />", " ")
317
318
319class OLBS(StationScrape):
320    """Requests data from India OLBS flight briefing"""
321
322    # _url = "https://olbs.amsschennai.gov.in/nsweb/FlightBriefing/showopmetquery.php"
323    # method = "POST"
324
325    # Temp redirect
326    _url = "https://avbrief3.el.r.appspot.com/"
327
328    def _make_url(self, station: str) -> Tuple[str, dict]:
329        """Returns a formatted URL and empty parameters"""
330        return self._url, {"icao": station}
331
332    def _post_data(self, station: str) -> dict:
333        """Returns the POST form"""
334        # Can set icaos to "V*" to return all results
335        return {"icaos": station, "type": self.report_type}
336
337    @staticmethod
338    def _make_headers() -> dict:
339        """Returns request headers"""
340        return {
341            # "Content-Type": "application/x-www-form-urlencoded",
342            # "Accept": "text/html, */*; q=0.01",
343            # "Accept-Language": "en-us",
344            "Accept-Encoding": "gzip, deflate, br",
345            # "Host": "olbs.amsschennai.gov.in",
346            "User-Agent": random.choice(_USER_AGENTS),
347            "Connection": "keep-alive",
348            # "Referer": "https://olbs.amsschennai.gov.in/nsweb/FlightBriefing/",
349            # "X-Requested-With": "XMLHttpRequest",
350            "Accept-Language": "en-US,en;q=0.9",
351            "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
352            "Referer": "https://avbrief3.el.r.appspot.com/",
353            "Host": "avbrief3.el.r.appspot.com",
354        }
355
356    def _extract(self, raw: str, station: str) -> str:
357        """Extracts the reports from HTML response"""
358        # start = raw.find(f"{self.report_type.upper()} {station} ")
359        return self._simple_extract(
360            raw, [f">{self.report_type.upper()}</div>", station], "="
361        )
362
363
364class NAM(StationScrape):
365    """Requests data from NorthAviMet for North Atlantic and Nordic countries"""
366
367    _url = "https://www.northavimet.com/NamConWS/rest/opmet/command/0/"
368
369    def _make_url(self, station: str) -> Tuple[str, dict]:
370        """Returns a formatted URL and empty parameters"""
371        return self._url + station, {}
372
373    def _extract(self, raw: str, station: str) -> str:
374        """Extracts the reports from HTML response"""
375        starts = [f"<b>{self.report_type.upper()} <", f">{station.upper()}<", "<b> "]
376        report = self._simple_extract(raw, starts, "=")
377        return station + report[3:]
378
379
380class AVT(StationScrape):
381    """Requests data from AVT/XiamenAir for China
382    NOTE: This should be replaced later with a gov+https source
383    """
384
385    _url = "http://www.avt7.com/Home/AirportMetarInfo?airport4Code="
386
387    def _make_url(self, station: str) -> Tuple[str, dict]:
388        """Returns a formatted URL and empty parameters"""
389        return self._url + station, {}
390
391    def _extract(self, raw: str, station: str) -> str:
392        """Extracts the reports from HTML response"""
393        try:
394            data = json.loads(raw)
395            key = f"{self.report_type.lower()}ContentList"
396            text: str = data[key]["rows"][0]["content"]
397            return text
398        except (TypeError, json.decoder.JSONDecodeError, KeyError, IndexError):
399            return ""
400
401
402# Ancilary scrape services
403
404
405_TAG_PATTERN = re.compile(r"<[^>]*>")
406
407# Search fields https://notams.aim.faa.gov/NOTAM_Search_User_Guide_V33.pdf
408
409
410class FAA_NOTAM(ScrapeService):
411    """Sources NOTAMs from official FAA portal"""
412
413    _url = "https://notams.aim.faa.gov/notamSearch/search"
414    method = "POST"
415    _valid_types = ("notam",)
416
417    @staticmethod
418    def _make_headers() -> dict:
419        return {"Content-Type": "application/x-www-form-urlencoded"}
420
421    @staticmethod
422    def _split_coord(prefix: str, value: float) -> dict:
423        """Adds coordinate deg/min/sec fields per float value"""
424        degree, minute, second = Coord.to_dms(value)
425        if prefix == "lat":
426            key = "latitude"
427            direction = "N" if degree >= 0 else "S"
428        else:
429            key = "longitude"
430            direction = "E" if degree >= 0 else "W"
431        return {
432            f"{prefix}Degrees": abs(degree),
433            f"{prefix}Minutes": minute,
434            f"{prefix}Seconds": second,
435            f"{key}Direction": direction,
436        }
437
438    def _post_for(
439        self,
440        icao: Optional[str] = None,
441        coord: Optional[Coord] = None,
442        path: Optional[List[str]] = None,
443        radius: int = 10,
444    ) -> dict:
445        """Generate POST payload for search params in location order"""
446        data: Dict[str, Any] = {"notamsOnly": False, "radius": radius}
447        if icao:
448            data["searchType"] = 0
449            data["designatorsForLocation"] = icao
450        elif coord:
451            data["searchType"] = 3
452            data["radiusSearchOnDesignator"] = False
453            data.update(self._split_coord("lat", coord.lat))
454            data.update(self._split_coord("long", coord.lon))
455        elif path:
456            data["searchType"] = 6
457            data["flightPathText"] = " ".join(path)
458            data["flightPathBuffer"] = radius
459            data["flightPathIncludeNavaids"] = True
460            data["flightPathIncludeArtcc"] = False
461            data["flightPathIncludeTfr"] = True
462            data["flightPathIncludeRegulatory"] = False
463            data["flightPathResultsType"] = "All NOTAMs"
464        else:
465            raise InvalidRequest("Not enough info to request NOTAM data")
466        return data
467
468    def fetch(
469        self,
470        icao: Optional[str] = None,
471        coord: Optional[Coord] = None,
472        path: Optional[List[str]] = None,
473        radius: int = 10,
474        timeout: int = 10,
475    ) -> List[str]:
476        """Fetch NOTAM list from the service via ICAO, coordinate, or ident path"""
477        return aio.run(self.async_fetch(icao, coord, path, radius, timeout))
478
479    async def async_fetch(
480        self,
481        icao: Optional[str] = None,
482        coord: Optional[Coord] = None,
483        path: Optional[List[str]] = None,
484        radius: int = 10,
485        timeout: int = 10,
486    ) -> List[str]:
487        """Async fetch NOTAM list from the service via ICAO, coordinate, or ident path"""
488        headers = self._make_headers()
489        data = self._post_for(icao, coord, path, radius)
490        notams = []
491        while True:
492            text = await self._call(self._url, None, headers, data, timeout)
493            resp: dict = json.loads(text)
494            if resp.get("error"):
495                raise self._make_err("Search criteria appears to be invalid")
496            for item in resp["notamList"]:
497                if report := item.get("icaoMessage", "").strip():
498                    report = _TAG_PATTERN.sub("", report).strip()
499                    if issued := item.get("issueDate"):
500                        report = f"{issued}||{report}"
501                    notams.append(report)
502            offset = resp["endRecordCount"]
503            if not notams or offset >= resp["totalNotamCount"]:
504                break
505            data["offset"] = offset
506        return notams
507
508
509PREFERRED = {
510    "RK": AMO,
511    "SK": MAC,
512}
513BY_COUNTRY = {
514    "AU": AUBOM,
515    # "CN": AVT,
516    "DK": NAM,
517    "EE": NAM,
518    "FI": NAM,
519    "FO": NAM,
520    "GL": NAM,
521    "IN": OLBS,
522    "IS": NAM,
523    "LV": NAM,
524    "NO": NAM,
525    "SE": NAM,
526}
527
528
529def get_service(station: str, country_code: str) -> ScrapeService:
530    """Returns the preferred scrape service for a given station
531
532    ```python
533    # Fetch Australian reports
534    station = 'YWOL'
535    country = 'AU' # can source from avwx.Station.country
536    # Get the station's preferred service and initialize to fetch METARs
537    service = avwx.service.get_service(station, country)('metar')
538    # service is now avwx.service.AUBOM init'd to fetch METARs
539    # Fetch the current METAR
540    report = service.fetch(station)
541    ```
542    """
543    with suppress(KeyError):
544        return PREFERRED[station[:2]]  # type: ignore
545    return BY_COUNTRY.get(country_code, NOAA)  # type: ignore
class ScrapeService(avwx.service.base.Service, avwx.service.base.CallsHTTP):
41class ScrapeService(Service, CallsHTTP):  # pylint: disable=too-few-public-methods
42    """Service class for fetching reports via direct web requests
43
44    Unless overwritten, this class accepts `"metar"` and `"taf"` as valid report types
45    """
46
47    default_timeout = 10
48    _valid_types: Tuple[str, ...] = ("metar", "taf")
49    _strip_whitespace: bool = True
50
51    def _make_err(self, body: str, key: str = "report path") -> InvalidRequest:
52        """Returns an InvalidRequest exception with formatted error message"""
53        msg = f"Could not find {key} in {self.__class__.__name__} response\n"
54        return InvalidRequest(msg + body)
55
56    @staticmethod
57    def _make_headers() -> dict:
58        """Returns request headers"""
59        return {}
60
61    def _post_data(self, station: str) -> dict:  # pylint: disable=unused-argument
62        """Returns the POST form/data payload"""
63        return {}
64
65    def _clean_report(self, report: _T) -> _T:
66        """Replaces all *whitespace elements with a single space if enabled"""
67        if not self._strip_whitespace:
68            return report
69        if isinstance(report, list):
70            return dedupe(" ".join(r.split()) for r in report)  # type: ignore
71        return " ".join(report.split()) if isinstance(report, str) else report  # type: ignore

Service class for fetching reports via direct web requests

Unless overwritten, this class accepts "metar" and "taf" as valid report types

default_timeout = 10
class StationScrape(ScrapeService):
 74class StationScrape(ScrapeService):
 75    """Service class fetching reports from a station code"""
 76
 77    def _make_url(self, station: str) -> Tuple[str, dict]:
 78        """Returns a formatted URL and parameters"""
 79        raise NotImplementedError()
 80
 81    def _extract(self, raw: str, station: str) -> str:
 82        """Extracts the report string from the service response"""
 83        raise NotImplementedError()
 84
 85    def _simple_extract(self, raw: str, starts: Union[str, List[str]], end: str) -> str:
 86        """Simple extract by cutting at sequential start and end points"""
 87        targets = [starts] if isinstance(starts, str) else starts
 88        for target in targets:
 89            index = raw.find(target)
 90            if index == -1:
 91                raise self._make_err("The station might not exist")
 92            raw = raw[index:]
 93        report = raw[: raw.find(end)].strip()
 94        return " ".join(dedupe(report.split()))
 95
 96    async def _fetch(self, station: str, url: str, params: dict, timeout: int) -> str:
 97        headers = self._make_headers()
 98        data = self._post_data(station) if self.method.lower() == "post" else None
 99        text = await self._call(
100            url, params=params, headers=headers, data=data, timeout=timeout
101        )
102        report = self._extract(text, station)
103        return self._clean_report(report)
104
105    def fetch(
106        self,
107        station: str,
108        timeout: Optional[int] = None,
109    ) -> str:
110        """Fetches a report string from the service"""
111        return aio.run(self.async_fetch(station, timeout))
112
113    async def async_fetch(self, station: str, timeout: Optional[int] = None) -> str:
114        """Asynchronously fetch a report string from the service"""
115        if timeout is None:
116            timeout = self.default_timeout
117        valid_station(station)
118        url, params = self._make_url(station)
119        return await self._fetch(station, url, params, timeout)

Service class fetching reports from a station code

def fetch(self, station: str, timeout: Optional[int] = None) -> str:
105    def fetch(
106        self,
107        station: str,
108        timeout: Optional[int] = None,
109    ) -> str:
110        """Fetches a report string from the service"""
111        return aio.run(self.async_fetch(station, timeout))

Fetches a report string from the service

async def async_fetch(self, station: str, timeout: Optional[int] = None) -> str:
113    async def async_fetch(self, station: str, timeout: Optional[int] = None) -> str:
114        """Asynchronously fetch a report string from the service"""
115        if timeout is None:
116            timeout = self.default_timeout
117        valid_station(station)
118        url, params = self._make_url(station)
119        return await self._fetch(station, url, params, timeout)

Asynchronously fetch a report string from the service

class NOAA_FTP(StationScrape):
125class NOAA_FTP(StationScrape):
126    """Requests data from NOAA via FTP"""
127
128    _url = "https://tgftp.nws.noaa.gov/data/{}/{}/stations/{}.TXT"
129
130    def _make_url(self, station: str) -> Tuple[str, dict]:
131        """Returns a formatted URL and parameters"""
132        root = "forecasts" if self.report_type == "taf" else "observations"
133        return self._url.format(root, self.report_type, station), {}
134
135    def _extract(self, raw: str, station: str) -> str:
136        """Extracts the report using string finding"""
137        raw = raw[raw.find(station) :]
138        return raw[: raw.find('"')]

Requests data from NOAA via FTP

class NOAA_Scrape(_NOAA_ScrapeURL, StationScrape):
156class NOAA_Scrape(_NOAA_ScrapeURL, StationScrape):
157    """Requests data from NOAA via response scraping"""
158
159    def _extract(self, raw: str, station: str) -> str:
160        """Extracts the first report"""
161        report = ""
162        for line in raw.strip().split("\n"):
163            # Break when seeing the second non-indented line (next report)
164            if line and line[0].isalnum() and report:
165                break
166            report += line
167        return report

Requests data from NOAA via response scraping

class NOAA_ScrapeList(_NOAA_ScrapeURL, ScrapeService):
170class NOAA_ScrapeList(_NOAA_ScrapeURL, ScrapeService):
171    """Request listed data from NOAA via response scraping"""
172
173    _valid_types = ("pirep",)
174
175    def _extract(self, raw: str, station: str) -> List[str]:
176        """Extracts the report strings"""
177        return raw.strip().split("\n")
178
179    async def _fetch(
180        self, station: str, url: str, params: dict, timeout: int
181    ) -> List[str]:
182        headers = self._make_headers()
183        data = self._post_data(station) if self.method.lower() == "post" else None
184        text = await self._call(
185            url, params=params, headers=headers, data=data, timeout=timeout
186        )
187        report = self._extract(text, station)
188        return self._clean_report(report)
189
190    def fetch(
191        self,
192        icao: Optional[str] = None,
193        coord: Optional[Coord] = None,
194        radius: int = 10,
195        timeout: Optional[int] = None,
196    ) -> List[str]:
197        """Fetches a report string from the service"""
198        return aio.run(self.async_fetch(icao, coord, radius, timeout))
199
200    async def async_fetch(
201        self,
202        icao: Optional[str] = None,
203        coord: Optional[Coord] = None,
204        radius: int = 10,
205        timeout: Optional[int] = None,
206    ) -> List[str]:
207        """Asynchronously fetch a report string from the service"""
208        if timeout is None:
209            timeout = self.default_timeout
210        station: str
211        if icao:
212            valid_station(icao)
213            station = icao
214        elif coord:
215            if ret := Station.nearest(coord.lat, coord.lon, max_coord_distance=radius):
216                station = ret[0].icao or ""
217            else:
218                raise ValueError(
219                    f"No reference station near enough to {coord} to call service"
220                )
221        url, params = self._make_url(station, distance=radius)
222        return await self._fetch(station, url, params, timeout)

Request listed data from NOAA via response scraping

def fetch( self, icao: Optional[str] = None, coord: Optional[avwx.structs.Coord] = None, radius: int = 10, timeout: Optional[int] = None) -> List[str]:
190    def fetch(
191        self,
192        icao: Optional[str] = None,
193        coord: Optional[Coord] = None,
194        radius: int = 10,
195        timeout: Optional[int] = None,
196    ) -> List[str]:
197        """Fetches a report string from the service"""
198        return aio.run(self.async_fetch(icao, coord, radius, timeout))

Fetches a report string from the service

async def async_fetch( self, icao: Optional[str] = None, coord: Optional[avwx.structs.Coord] = None, radius: int = 10, timeout: Optional[int] = None) -> List[str]:
200    async def async_fetch(
201        self,
202        icao: Optional[str] = None,
203        coord: Optional[Coord] = None,
204        radius: int = 10,
205        timeout: Optional[int] = None,
206    ) -> List[str]:
207        """Asynchronously fetch a report string from the service"""
208        if timeout is None:
209            timeout = self.default_timeout
210        station: str
211        if icao:
212            valid_station(icao)
213            station = icao
214        elif coord:
215            if ret := Station.nearest(coord.lat, coord.lon, max_coord_distance=radius):
216                station = ret[0].icao or ""
217            else:
218                raise ValueError(
219                    f"No reference station near enough to {coord} to call service"
220                )
221        url, params = self._make_url(station, distance=radius)
222        return await self._fetch(station, url, params, timeout)

Asynchronously fetch a report string from the service

NOAA = <class 'NOAA_Scrape'>
class AMO(StationScrape):
231class AMO(StationScrape):
232    """Requests data from AMO KMA for Korean stations"""
233
234    _url = "http://amoapi.kma.go.kr/amoApi/{}"
235    default_timeout = 60
236
237    def _make_url(self, station: str) -> Tuple[str, dict]:
238        """Returns a formatted URL and parameters"""
239        return self._url.format(self.report_type), {"icao": station}
240
241    def _extract(self, raw: str, station: str) -> str:
242        """Extracts the report message from XML response"""
243        resp = parsexml(raw)
244        try:
245            report = resp["response"]["body"]["items"]["item"][
246                f"{self.report_type.lower()}Msg"
247            ]
248        except KeyError as key_error:
249            raise self._make_err(raw) from key_error
250        if not report:
251            raise self._make_err("The station might not exist")
252        # Replace line breaks
253        report = report.replace("\n", "")
254        # Remove excess leading and trailing data
255        for item in (self.report_type.upper(), "SPECI"):
256            if report.startswith(f"{item} "):
257                report = report[len(item) + 1 :]
258        report = report.rstrip("=")
259        # Make every element single-spaced and stripped
260        return " ".join(report.split())

Requests data from AMO KMA for Korean stations

default_timeout = 60
class MAC(StationScrape):
263class MAC(StationScrape):
264    """Requests data from Meteorologia Aeronautica Civil for Columbian stations"""
265
266    _url = "http://meteorologia.aerocivil.gov.co/expert_text_query/parse"
267    method = "POST"
268
269    def _make_url(self, station: str) -> Tuple[str, dict]:
270        """Returns a formatted URL and parameters"""
271        return self._url, {"query": f"{self.report_type} {station}"}
272
273    def _extract(self, raw: str, station: str) -> str:
274        """Extracts the report message using string finding"""
275        return self._simple_extract(raw, f"{station.upper()} ", "=")

Requests data from Meteorologia Aeronautica Civil for Columbian stations

method = 'POST'
class AUBOM(StationScrape):
278class AUBOM(StationScrape):
279    """Requests data from the Australian Bureau of Meteorology"""
280
281    _url = "http://www.bom.gov.au/aviation/php/process.php"
282    method = "POST"
283
284    def _make_url(self, _: Any) -> Tuple[str, dict]:
285        """Returns a formatted URL and empty parameters"""
286        return self._url, {}
287
288    @staticmethod
289    def _make_headers() -> dict:
290        """Returns request headers"""
291        return {
292            "Content-Type": "application/x-www-form-urlencoded",
293            "Accept": "*/*",
294            "Accept-Language": "en-us",
295            "Accept-Encoding": "gzip, deflate",
296            "Host": "www.bom.gov.au",
297            "Origin": "http://www.bom.gov.au",
298            "User-Agent": random.choice(_USER_AGENTS),
299            "Connection": "keep-alive",
300        }
301
302    def _post_data(self, station: str) -> dict:
303        """Returns the POST form"""
304        return {"keyword": station, "type": "search", "page": "TAF"}
305
306    def _extract(self, raw: str, station: str) -> str:
307        """Extracts the reports from HTML response"""
308        index = 1 if self.report_type == "taf" else 2
309        try:
310            report = raw.split("<p")[index]
311            report = report[report.find(">") + 1 :]
312        except IndexError as index_error:
313            raise self._make_err("The station might not exist") from index_error
314        if report.startswith("<"):
315            return ""
316        report = report[: report.find("</p>")]
317        return report.replace("<br />", " ")

Requests data from the Australian Bureau of Meteorology

method = 'POST'
class OLBS(StationScrape):
320class OLBS(StationScrape):
321    """Requests data from India OLBS flight briefing"""
322
323    # _url = "https://olbs.amsschennai.gov.in/nsweb/FlightBriefing/showopmetquery.php"
324    # method = "POST"
325
326    # Temp redirect
327    _url = "https://avbrief3.el.r.appspot.com/"
328
329    def _make_url(self, station: str) -> Tuple[str, dict]:
330        """Returns a formatted URL and empty parameters"""
331        return self._url, {"icao": station}
332
333    def _post_data(self, station: str) -> dict:
334        """Returns the POST form"""
335        # Can set icaos to "V*" to return all results
336        return {"icaos": station, "type": self.report_type}
337
338    @staticmethod
339    def _make_headers() -> dict:
340        """Returns request headers"""
341        return {
342            # "Content-Type": "application/x-www-form-urlencoded",
343            # "Accept": "text/html, */*; q=0.01",
344            # "Accept-Language": "en-us",
345            "Accept-Encoding": "gzip, deflate, br",
346            # "Host": "olbs.amsschennai.gov.in",
347            "User-Agent": random.choice(_USER_AGENTS),
348            "Connection": "keep-alive",
349            # "Referer": "https://olbs.amsschennai.gov.in/nsweb/FlightBriefing/",
350            # "X-Requested-With": "XMLHttpRequest",
351            "Accept-Language": "en-US,en;q=0.9",
352            "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
353            "Referer": "https://avbrief3.el.r.appspot.com/",
354            "Host": "avbrief3.el.r.appspot.com",
355        }
356
357    def _extract(self, raw: str, station: str) -> str:
358        """Extracts the reports from HTML response"""
359        # start = raw.find(f"{self.report_type.upper()} {station} ")
360        return self._simple_extract(
361            raw, [f">{self.report_type.upper()}</div>", station], "="
362        )

Requests data from India OLBS flight briefing

class NAM(StationScrape):
365class NAM(StationScrape):
366    """Requests data from NorthAviMet for North Atlantic and Nordic countries"""
367
368    _url = "https://www.northavimet.com/NamConWS/rest/opmet/command/0/"
369
370    def _make_url(self, station: str) -> Tuple[str, dict]:
371        """Returns a formatted URL and empty parameters"""
372        return self._url + station, {}
373
374    def _extract(self, raw: str, station: str) -> str:
375        """Extracts the reports from HTML response"""
376        starts = [f"<b>{self.report_type.upper()} <", f">{station.upper()}<", "<b> "]
377        report = self._simple_extract(raw, starts, "=")
378        return station + report[3:]

Requests data from NorthAviMet for North Atlantic and Nordic countries

class AVT(StationScrape):
381class AVT(StationScrape):
382    """Requests data from AVT/XiamenAir for China
383    NOTE: This should be replaced later with a gov+https source
384    """
385
386    _url = "http://www.avt7.com/Home/AirportMetarInfo?airport4Code="
387
388    def _make_url(self, station: str) -> Tuple[str, dict]:
389        """Returns a formatted URL and empty parameters"""
390        return self._url + station, {}
391
392    def _extract(self, raw: str, station: str) -> str:
393        """Extracts the reports from HTML response"""
394        try:
395            data = json.loads(raw)
396            key = f"{self.report_type.lower()}ContentList"
397            text: str = data[key]["rows"][0]["content"]
398            return text
399        except (TypeError, json.decoder.JSONDecodeError, KeyError, IndexError):
400            return ""

Requests data from AVT/XiamenAir for China NOTE: This should be replaced later with a gov+https source

class FAA_NOTAM(ScrapeService):
411class FAA_NOTAM(ScrapeService):
412    """Sources NOTAMs from official FAA portal"""
413
414    _url = "https://notams.aim.faa.gov/notamSearch/search"
415    method = "POST"
416    _valid_types = ("notam",)
417
418    @staticmethod
419    def _make_headers() -> dict:
420        return {"Content-Type": "application/x-www-form-urlencoded"}
421
422    @staticmethod
423    def _split_coord(prefix: str, value: float) -> dict:
424        """Adds coordinate deg/min/sec fields per float value"""
425        degree, minute, second = Coord.to_dms(value)
426        if prefix == "lat":
427            key = "latitude"
428            direction = "N" if degree >= 0 else "S"
429        else:
430            key = "longitude"
431            direction = "E" if degree >= 0 else "W"
432        return {
433            f"{prefix}Degrees": abs(degree),
434            f"{prefix}Minutes": minute,
435            f"{prefix}Seconds": second,
436            f"{key}Direction": direction,
437        }
438
439    def _post_for(
440        self,
441        icao: Optional[str] = None,
442        coord: Optional[Coord] = None,
443        path: Optional[List[str]] = None,
444        radius: int = 10,
445    ) -> dict:
446        """Generate POST payload for search params in location order"""
447        data: Dict[str, Any] = {"notamsOnly": False, "radius": radius}
448        if icao:
449            data["searchType"] = 0
450            data["designatorsForLocation"] = icao
451        elif coord:
452            data["searchType"] = 3
453            data["radiusSearchOnDesignator"] = False
454            data.update(self._split_coord("lat", coord.lat))
455            data.update(self._split_coord("long", coord.lon))
456        elif path:
457            data["searchType"] = 6
458            data["flightPathText"] = " ".join(path)
459            data["flightPathBuffer"] = radius
460            data["flightPathIncludeNavaids"] = True
461            data["flightPathIncludeArtcc"] = False
462            data["flightPathIncludeTfr"] = True
463            data["flightPathIncludeRegulatory"] = False
464            data["flightPathResultsType"] = "All NOTAMs"
465        else:
466            raise InvalidRequest("Not enough info to request NOTAM data")
467        return data
468
469    def fetch(
470        self,
471        icao: Optional[str] = None,
472        coord: Optional[Coord] = None,
473        path: Optional[List[str]] = None,
474        radius: int = 10,
475        timeout: int = 10,
476    ) -> List[str]:
477        """Fetch NOTAM list from the service via ICAO, coordinate, or ident path"""
478        return aio.run(self.async_fetch(icao, coord, path, radius, timeout))
479
480    async def async_fetch(
481        self,
482        icao: Optional[str] = None,
483        coord: Optional[Coord] = None,
484        path: Optional[List[str]] = None,
485        radius: int = 10,
486        timeout: int = 10,
487    ) -> List[str]:
488        """Async fetch NOTAM list from the service via ICAO, coordinate, or ident path"""
489        headers = self._make_headers()
490        data = self._post_for(icao, coord, path, radius)
491        notams = []
492        while True:
493            text = await self._call(self._url, None, headers, data, timeout)
494            resp: dict = json.loads(text)
495            if resp.get("error"):
496                raise self._make_err("Search criteria appears to be invalid")
497            for item in resp["notamList"]:
498                if report := item.get("icaoMessage", "").strip():
499                    report = _TAG_PATTERN.sub("", report).strip()
500                    if issued := item.get("issueDate"):
501                        report = f"{issued}||{report}"
502                    notams.append(report)
503            offset = resp["endRecordCount"]
504            if not notams or offset >= resp["totalNotamCount"]:
505                break
506            data["offset"] = offset
507        return notams

Sources NOTAMs from official FAA portal

method = 'POST'
def fetch( self, icao: Optional[str] = None, coord: Optional[avwx.structs.Coord] = None, path: Optional[List[str]] = None, radius: int = 10, timeout: int = 10) -> List[str]:
469    def fetch(
470        self,
471        icao: Optional[str] = None,
472        coord: Optional[Coord] = None,
473        path: Optional[List[str]] = None,
474        radius: int = 10,
475        timeout: int = 10,
476    ) -> List[str]:
477        """Fetch NOTAM list from the service via ICAO, coordinate, or ident path"""
478        return aio.run(self.async_fetch(icao, coord, path, radius, timeout))

Fetch NOTAM list from the service via ICAO, coordinate, or ident path

async def async_fetch( self, icao: Optional[str] = None, coord: Optional[avwx.structs.Coord] = None, path: Optional[List[str]] = None, radius: int = 10, timeout: int = 10) -> List[str]:
480    async def async_fetch(
481        self,
482        icao: Optional[str] = None,
483        coord: Optional[Coord] = None,
484        path: Optional[List[str]] = None,
485        radius: int = 10,
486        timeout: int = 10,
487    ) -> List[str]:
488        """Async fetch NOTAM list from the service via ICAO, coordinate, or ident path"""
489        headers = self._make_headers()
490        data = self._post_for(icao, coord, path, radius)
491        notams = []
492        while True:
493            text = await self._call(self._url, None, headers, data, timeout)
494            resp: dict = json.loads(text)
495            if resp.get("error"):
496                raise self._make_err("Search criteria appears to be invalid")
497            for item in resp["notamList"]:
498                if report := item.get("icaoMessage", "").strip():
499                    report = _TAG_PATTERN.sub("", report).strip()
500                    if issued := item.get("issueDate"):
501                        report = f"{issued}||{report}"
502                    notams.append(report)
503            offset = resp["endRecordCount"]
504            if not notams or offset >= resp["totalNotamCount"]:
505                break
506            data["offset"] = offset
507        return notams

Async fetch NOTAM list from the service via ICAO, coordinate, or ident path

PREFERRED = {'RK': <class 'AMO'>, 'SK': <class 'MAC'>}
BY_COUNTRY = {'AU': <class 'AUBOM'>, 'DK': <class 'NAM'>, 'EE': <class 'NAM'>, 'FI': <class 'NAM'>, 'FO': <class 'NAM'>, 'GL': <class 'NAM'>, 'IN': <class 'OLBS'>, 'IS': <class 'NAM'>, 'LV': <class 'NAM'>, 'NO': <class 'NAM'>, 'SE': <class 'NAM'>}
def get_service(station: str, country_code: str) -> ScrapeService:
530def get_service(station: str, country_code: str) -> ScrapeService:
531    """Returns the preferred scrape service for a given station
532
533    ```python
534    # Fetch Australian reports
535    station = 'YWOL'
536    country = 'AU' # can source from avwx.Station.country
537    # Get the station's preferred service and initialize to fetch METARs
538    service = avwx.service.get_service(station, country)('metar')
539    # service is now avwx.service.AUBOM init'd to fetch METARs
540    # Fetch the current METAR
541    report = service.fetch(station)
542    ```
543    """
544    with suppress(KeyError):
545        return PREFERRED[station[:2]]  # type: ignore
546    return BY_COUNTRY.get(country_code, NOAA)  # type: ignore

Returns the preferred scrape service for a given station

# Fetch Australian reports
station = 'YWOL'
country = 'AU' # can source from avwx.Station.country
# Get the station's preferred service and initialize to fetch METARs
service = avwx.service.get_service(station, country)('metar')
# service is now avwx.service.AUBOM init'd to fetch METARs
# Fetch the current METAR
report = service.fetch(station)