avwx.parsing.sanitization.base
Core sanitiation functions that accept report-specific elements.
1"""Core sanitiation functions that accept report-specific elements.""" 2 3from collections.abc import Callable 4 5from avwx.parsing.core import dedupe, is_variable_wind_direction, is_wind 6from avwx.parsing.sanitization.cleaners.base import ( 7 CleanerListType, 8 CleanItem, 9 CleanPair, 10 CombineItems, 11 RemoveItem, 12 SplitItem, 13) 14from avwx.parsing.sanitization.cleaners.cloud import separate_cloud_layers 15from avwx.parsing.sanitization.cleaners.wind import sanitize_wind 16from avwx.structs import Sanitization 17 18 19def sanitize_string_with( 20 replacements: dict[str, str], 21) -> Callable[[str, Sanitization], str]: 22 """Return a function to sanitize the report string with a given list of replacements.""" 23 24 def sanitize_report_string(text: str, sans: Sanitization) -> str: 25 """Provide sanitization for operations that work better when the report is a string.""" 26 text = text.strip().upper().rstrip("=") 27 if len(text) < 4: 28 return text 29 # Standardize whitespace 30 text = " ".join(text.split()) 31 # Prevent changes to station ID 32 stid, text = text[:4], text[4:] 33 # Replace invalid key-value pairs 34 for key, rep in replacements.items(): 35 if key in text: 36 text = text.replace(key, rep) 37 sans.log(key, rep) 38 separated = separate_cloud_layers(text) 39 if text != separated: 40 sans.extra_spaces_needed = True 41 return stid + separated 42 43 return sanitize_report_string 44 45 46def sanitize_list_with( 47 cleaners: CleanerListType, 48) -> Callable[[list[str], Sanitization], list[str]]: 49 """Return a function to sanitize the report list with a given list of cleaners.""" 50 _cleaners = [o() for o in cleaners] 51 52 def sanitize_report_list(wxdata: list[str], sans: Sanitization) -> list[str]: 53 """Provide sanitization for operations that work better when the report is a list.""" 54 for i, item in reversed(list(enumerate(wxdata))): 55 for cleaner in _cleaners: 56 # TODO: Py3.10 change to match/case on type 57 if isinstance(cleaner, CombineItems): 58 if i and cleaner.can_handle(wxdata[i - 1], item): 59 wxdata[i - 1] += wxdata.pop(i) 60 sans.extra_spaces_found = True 61 if cleaner.should_break: 62 break 63 elif isinstance(cleaner, SplitItem): 64 if index := cleaner.split_at(item): 65 wxdata.insert(i + 1, item[index:]) 66 wxdata[i] = item[:index] 67 sans.extra_spaces_needed = True 68 if cleaner.should_break: 69 break 70 elif isinstance(cleaner, CleanPair): 71 if i and cleaner.can_handle(wxdata[i - 1], item): 72 clean_first, clean_second = cleaner.clean(wxdata[i - 1], item) 73 if wxdata[i - 1] != clean_first: 74 sans.log(wxdata[i - 1], clean_first) 75 wxdata[i - 1] = clean_first 76 if item != clean_second: 77 sans.log(item, clean_second) 78 wxdata[i] = clean_second 79 break 80 elif cleaner.can_handle(item): 81 if isinstance(cleaner, RemoveItem): 82 sans.log(wxdata.pop(i)) 83 elif isinstance(cleaner, CleanItem): 84 cleaned = cleaner.clean(item) 85 wxdata[i] = cleaned 86 sans.log(item, cleaned) 87 if cleaner.should_break: 88 break 89 90 # TODO: Replace with above syntax after testing? 91 # May wish to keep since some elements could be checked after space needed...but so could the others? 92 93 # Check for wind sanitization 94 for i, item in enumerate(wxdata): 95 # Skip Station 96 if i == 0: 97 continue 98 if is_variable_wind_direction(item): 99 replaced = item[:7] 100 wxdata[i] = replaced 101 sans.log(item, replaced) 102 continue 103 possible_wind = sanitize_wind(item) 104 if is_wind(possible_wind): 105 if item != possible_wind: 106 sans.log(item, possible_wind) 107 wxdata[i] = possible_wind 108 109 # Strip extra characters before dedupe 110 stripped = [i.strip("./\\") for i in wxdata] 111 if wxdata != stripped: 112 sans.log_list(wxdata, stripped) 113 deduped = dedupe(stripped, only_neighbors=True) 114 if len(deduped) != len(wxdata): 115 sans.duplicates_found = True 116 return deduped 117 118 return sanitize_report_list
def
sanitize_string_with( replacements: dict[str, str]) -> Callable[[str, avwx.structs.Sanitization], str]:
20def sanitize_string_with( 21 replacements: dict[str, str], 22) -> Callable[[str, Sanitization], str]: 23 """Return a function to sanitize the report string with a given list of replacements.""" 24 25 def sanitize_report_string(text: str, sans: Sanitization) -> str: 26 """Provide sanitization for operations that work better when the report is a string.""" 27 text = text.strip().upper().rstrip("=") 28 if len(text) < 4: 29 return text 30 # Standardize whitespace 31 text = " ".join(text.split()) 32 # Prevent changes to station ID 33 stid, text = text[:4], text[4:] 34 # Replace invalid key-value pairs 35 for key, rep in replacements.items(): 36 if key in text: 37 text = text.replace(key, rep) 38 sans.log(key, rep) 39 separated = separate_cloud_layers(text) 40 if text != separated: 41 sans.extra_spaces_needed = True 42 return stid + separated 43 44 return sanitize_report_string
Return a function to sanitize the report string with a given list of replacements.
def
sanitize_list_with( cleaners: list[type[avwx.parsing.sanitization.cleaners.base.CleanItem] | type[avwx.parsing.sanitization.cleaners.base.CleanPair] | type[avwx.parsing.sanitization.cleaners.base.RemoveItem] | type[avwx.parsing.sanitization.cleaners.base.SplitItem] | type[avwx.parsing.sanitization.cleaners.base.CombineItems]]) -> Callable[[list[str], avwx.structs.Sanitization], list[str]]:
47def sanitize_list_with( 48 cleaners: CleanerListType, 49) -> Callable[[list[str], Sanitization], list[str]]: 50 """Return a function to sanitize the report list with a given list of cleaners.""" 51 _cleaners = [o() for o in cleaners] 52 53 def sanitize_report_list(wxdata: list[str], sans: Sanitization) -> list[str]: 54 """Provide sanitization for operations that work better when the report is a list.""" 55 for i, item in reversed(list(enumerate(wxdata))): 56 for cleaner in _cleaners: 57 # TODO: Py3.10 change to match/case on type 58 if isinstance(cleaner, CombineItems): 59 if i and cleaner.can_handle(wxdata[i - 1], item): 60 wxdata[i - 1] += wxdata.pop(i) 61 sans.extra_spaces_found = True 62 if cleaner.should_break: 63 break 64 elif isinstance(cleaner, SplitItem): 65 if index := cleaner.split_at(item): 66 wxdata.insert(i + 1, item[index:]) 67 wxdata[i] = item[:index] 68 sans.extra_spaces_needed = True 69 if cleaner.should_break: 70 break 71 elif isinstance(cleaner, CleanPair): 72 if i and cleaner.can_handle(wxdata[i - 1], item): 73 clean_first, clean_second = cleaner.clean(wxdata[i - 1], item) 74 if wxdata[i - 1] != clean_first: 75 sans.log(wxdata[i - 1], clean_first) 76 wxdata[i - 1] = clean_first 77 if item != clean_second: 78 sans.log(item, clean_second) 79 wxdata[i] = clean_second 80 break 81 elif cleaner.can_handle(item): 82 if isinstance(cleaner, RemoveItem): 83 sans.log(wxdata.pop(i)) 84 elif isinstance(cleaner, CleanItem): 85 cleaned = cleaner.clean(item) 86 wxdata[i] = cleaned 87 sans.log(item, cleaned) 88 if cleaner.should_break: 89 break 90 91 # TODO: Replace with above syntax after testing? 92 # May wish to keep since some elements could be checked after space needed...but so could the others? 93 94 # Check for wind sanitization 95 for i, item in enumerate(wxdata): 96 # Skip Station 97 if i == 0: 98 continue 99 if is_variable_wind_direction(item): 100 replaced = item[:7] 101 wxdata[i] = replaced 102 sans.log(item, replaced) 103 continue 104 possible_wind = sanitize_wind(item) 105 if is_wind(possible_wind): 106 if item != possible_wind: 107 sans.log(item, possible_wind) 108 wxdata[i] = possible_wind 109 110 # Strip extra characters before dedupe 111 stripped = [i.strip("./\\") for i in wxdata] 112 if wxdata != stripped: 113 sans.log_list(wxdata, stripped) 114 deduped = dedupe(stripped, only_neighbors=True) 115 if len(deduped) != len(wxdata): 116 sans.duplicates_found = True 117 return deduped 118 119 return sanitize_report_list
Return a function to sanitize the report list with a given list of cleaners.