diff --git a/examples/SHEF/RR8MSR.txt b/examples/SHEF/RR8MSR.txt new file mode 100644 index 00000000..beb6ce64 --- /dev/null +++ b/examples/SHEF/RR8MSR.txt @@ -0,0 +1,14 @@ +751 +SRUS83 KMSR 191448 +RR8MSR + +.E SOMW3 1019 C DH0900/DIN05/5.06/5.07/5.07/5.06/5.06/5.07/5.06 +.E1 5.07/5.06/5.07/5.07/5.07/5.07/5.07/5.07/5.07/5.07/5.07/5.07 +.E2 5.07/5.07/5.07/5.07/5.07/5.07/5.07/5.07/5.07/5.07/5.07/5.07 +.E3 5.07/5.07/5.06/5.07/5.07/5.07/5.07/5.07/5.07/5.07/5.07/5.06 +.E4 5.07/5.07/5.07/5.07/5.07/5.07/5.07/5.07/5.07/5.07/5.07/5.07 +.E5 5.07/5.07/5.07/5.07/5.07/5.07/5.07/5.06/5.07/5.07/5.07/5.07 +.E6 5.07/5.07/5.07/5.07/5.07/5.06/5.07/5.07/5.07/5.07/5.07/5.06 +.E7 5.07/5.06/5.06/5.07/5.07/5.07/5.06/5.07/5.07/5.07/5.07/5.07 +.E8 5.07/5.07/5.07/5.07/5.07/5.07/5.07/5.07/5.07/5.07/5.07/5.07 +.E9 5.07/5.07/5.07/5.07/5.07 diff --git a/examples/SHEF/RRSNMC.txt b/examples/SHEF/RRSNMC.txt new file mode 100644 index 00000000..51a336eb --- /dev/null +++ b/examples/SHEF/RRSNMC.txt @@ -0,0 +1,6 @@ +262 +SXUS72 KWBC 280332 +RRSNMC +:&&HADS SOR REPORT FOR USER NMC +.A ALBW3 20231028 DH0300/PPHRG M/PPDRG -9/USIRG 10 + diff --git a/parsers/pywwa/workflows/shef_parser.py b/parsers/pywwa/workflows/shef_parser.py index 264f6262..e1bfdfa4 100644 --- a/parsers/pywwa/workflows/shef_parser.py +++ b/parsers/pywwa/workflows/shef_parser.py @@ -3,19 +3,19 @@ import datetime import random import re +from collections import namedtuple from typing import List from zoneinfo import ZoneInfo # 3rd Party # pylint: disable=no-name-in-module from psycopg.errors import DeadlockDetected -from pyiem import reference from pyiem.models.shef import SHEFElement from pyiem.nws.products.shef import parser from pyiem.observation import Observation from pyiem.util import LOG, convert_value, utc from twisted.internet import reactor -from twisted.internet.task import LoopingCall +from twisted.internet.task import LoopingCall, deferLater # Local from pywwa import common @@ -38,9 +38,24 @@ # station metadata LOCS = {} # database timezones to cache -TIMEZONES = {} +TIMEZONES = { + "UTC": ZoneInfo("UTC"), + "America/Chicago": ZoneInfo("America/Chicago"), +} # a queue for saving database IO CURRENT_QUEUE = {} +# Data structure to hold potential writes to IEMAccess +# [iemid] -> ACCESSDB_ENTRY -> records -> localts -> {} +ACCESSDB_QUEUE = {} +ACCESSDB_ENTRY = namedtuple( + "ACCESSDB_ENTRY", + [ + "station", + "network", + "tzname", + "records", + ], +) U1980 = utc(1980) # Networks that can come via SHEF backdoors DOUBLEBACKED_NETWORKS = ["ISUSM", "IA_RWIS"] @@ -245,6 +260,7 @@ def save_current() -> int: continue cnt += 1 (sid, varname, _depth) = k.split("|") + # This is unlikely, but still GIGO sometimes :/ if len(varname) != 7: LOG.info("Got varname of '%s' somehow? %s", varname, mydict) continue @@ -289,8 +305,12 @@ def get_localtime(sid, ts): def get_network(prod, sid, data: List[SHEFElement]) -> str: - """Figure out which network this belongs to""" - networks = list(LOCS.get(sid, {}).keys()) + """Logic for figuring out network in face of ambiguity. + + Note: This sid should already be in LOCS, so we are only either taking + the single entry or picking between DCP and COOP variants. + """ + networks = list(LOCS[sid].keys()) # This is the best we can hope for if len(networks) == 1: return networks[0] @@ -306,44 +326,21 @@ def get_network(prod, sid, data: List[SHEFElement]) -> str: is_coop = True pnetwork = "COOP" if is_coop else "DCP" # filter networks now - networks = [s for s in networks if s.find(pnetwork) > 0] - if len(networks) == 1: - return networks[0] + for network in networks: + if network.find(pnetwork) > 0: + return network + # Throw our hands up in the air + return networks[0] + - # If networks is zero length, then we have to try some things - if not networks: - HADSDB.runInteraction(enter_unknown, sid, prod.get_product_id(), "") - if len(sid) == 5: - state = reference.nwsli2state.get(sid[3:]) - country = reference.nwsli2country.get(sid[3:]) - if country in ["CA", "MX"]: - return f"{country}_{state}_{pnetwork}" - if country == "US": - return f"{state}_{pnetwork}" - return f"{country}__{pnetwork}" - - if sid not in UNKNOWN: - UNKNOWN[sid] = True - LOG.info("failure for sid: %s tp: %s", sid, prod.get_product_id()) - return None - - -def process_site_frontend(prod, sid, data): - """Frontdoor for process_site()""" - df = ACCESSDB.runInteraction(process_site, prod, sid, data) - df.addErrback(process_site_eb, prod, sid, data) - df.addErrback(common.email_error, prod.unixtext) - df.addErrback(LOG.error) - - -def process_site(accesstxn, prod, sid, data): +def process_site(prod, sid, data): """Consumption of rectified data.""" # Order the timestamps so that we process the newest data last, so that # all obs are potentially processed through iemaccess times = list(data.keys()) times.sort() for tstamp in times: - process_site_time(accesstxn, prod, sid, tstamp, data[tstamp]) + process_site_time(prod, sid, tstamp, data[tstamp]) def insert_raw_inbound(cursor, element: SHEFElement) -> int: @@ -391,46 +388,45 @@ def update_current_queue(element: SHEFElement, product_id: str): cur["dirty"] = True -def process_site_time(accesstxn, prod, sid, ts, elements: List[SHEFElement]): +def process_site_time(prod, sid, ts, elements: List[SHEFElement]): """Ingest for IEMAccess.""" network = get_network(prod, sid, elements) if network is None or network in DOUBLEBACKED_NETWORKS: return - localts = get_localtime(sid, ts) + # This should always work! + metadata = LOCS[sid][network] # Do not send DCP sites with old data to IEMAccess - if network.find("_DCP") > 0 and localts < LOCS.get(sid, {}).get( - "valid", localts - ): - return - metadata = LOCS.get(sid, {}).get(network) - if metadata is None: - LOG.info("Unknown station: %s %s", sid, network) + if network.find("_DCP") > 0 and localts < metadata["valid"]: return metadata["valid"] = localts # Okay, time for a hack, if our observation is at midnight! if localts.hour == 0 and localts.minute == 0: localts -= datetime.timedelta(minutes=1) - # LOG.info("Shifting %s [%s] back one minute: %s" % (sid, network, - # localts)) - iemob = Observation( - iemid=metadata["iemid"], - valid=localts, - tzname=metadata["tzname"], + record = ACCESSDB_QUEUE.setdefault( + metadata["iemid"], + ACCESSDB_ENTRY( + station=sid, + network=network, + tzname=metadata["tzname"], + records={}, + ), + ).records.setdefault( + localts, + { + "last": U1980, + "data": {}, + "product_id": prod.get_product_id(), + }, ) - # This is likely going to kill performance, but we need to do it for now - iemob.load(accesstxn) - iscoop = network.find("COOP") > 0 - hasdata = False - # TODO Special rstage logic in case PEDTS is defined - # pedts = metadata[network]["pedts"] + # Update last + record["last"] = utc() + report = None afos = prod.afos for se in elements: - if se.type != "R": - continue if se.narrative: report = se.narrative if ( @@ -446,24 +442,22 @@ def process_site_time(accesstxn, prod, sid, ts, elements: List[SHEFElement]): # We generally are in english units, this also converts the wind # direction to the full degrees val = se.to_english() - # TODO it is not clear if we can hit this code or not if val is None: # Behold, glorious hack here to force nulls into the summary # database that uses coerce - iemob.data[f"null_{iemvar}"] = None - hasdata = True - iemob.data[iemvar] = val + record["data"][f"null_{iemvar}"] = None + record["data"][iemvar] = val if iemvar in ["pday", "snow", "snowd"]: # Prevent negative numbers if val is not None and val < 0: - iemob.data[iemvar] = 0 + record["data"][iemvar] = 0 if iemvar in ["sknt", "gust"] and val is not None: # mph to knots :/ val = convert_value(val, "mile / hour", "knot") - if iscoop: + if network.find("_COOP") > 0: # Save COOP 'at-ob' temperature into summary table if iemvar == "tmpf": - iemob.data["coop_tmpf"] = val + record["data"]["coop_tmpf"] = val # Save observation time into the summary table if iemvar in [ "tmpf", @@ -473,21 +467,26 @@ def process_site_time(accesstxn, prod, sid, ts, elements: List[SHEFElement]): "snow", "snowd", ]: - iemob.data["coop_valid"] = iemob.data["valid"] - # if pedts is not None and f"{pedts}Z" in data: - # val = None if data[f"{pedts}Z"] < -9998 else data[f"{pedts}Z"] - # iemob.data["rstage"] = val + record["data"]["coop_valid"] = localts - if not hasdata: - return - iemob.data["raw"] = prod.get_product_id() - iemob.data["report"] = report - # Only force COOP data into current_log even if we have newer obs - if not iemob.save(accesstxn, force_current_log=iscoop): - LOG.info("Failed to save %s %s", sid, iemob) - HADSDB.runInteraction( - enter_unknown, sid, prod.get_product_id(), network - ) + record["data"]["raw"] = prod.get_product_id() + record["data"]["report"] = report + + +def write_access_records(accesstxn, records: [], iemid, entry: ACCESSDB_ENTRY): + """Batch the records to to prevent deadlocks, maybe!""" + for localts, record in records: + write_access_record(accesstxn, record, iemid, localts, entry) + + +def write_access_record( + accesstxn, record: dict, iemid, localts, entry: ACCESSDB_ENTRY +): + """The batched database write.""" + iemob = Observation(iemid=iemid, valid=localts, tzname=entry.tzname) + iemob.data.update(record["data"]) + iscoop = entry.network.find("_COOP") > 0 + iemob.save(accesstxn, force_current_log=iscoop) def log_database_queue_size(): @@ -500,20 +499,28 @@ def log_database_queue_size(): ) -def process_site_eb(err, prod, sid, data): +def write_access_records_eb(err, records: list, iemid, entry: ACCESSDB_ENTRY): """Errorback from process_site transaction.""" if isinstance(err.value, DeadlockDetected): jitter = random.randint(0, 30) LOG.info( - "Database Deadlock: prod:%s sid:%s, retrying in %s seconds", - prod.get_product_id(), - sid, + "Database Deadlock: %s[%s], retrying in %s seconds", + entry.station, + entry.network, + jitter, + ) + df = deferLater( + reactor, jitter, + ACCESSDB.runInteraction, + write_access_records, + records, + iemid, + entry, ) - reactor.callLater(jitter, process_site_frontend, prod, sid, data) + df.addErrback(common.email_error) return - msg = f"process_site({prod.get_product_id()}, {sid}, {data}) got {err}" - common.email_error(err, msg) + common.email_error(err, f"write_access_entry({entry.station}) got {err}") def process_data(text): @@ -538,7 +545,7 @@ def process_data(text): if sid in UNKNOWN: continue if sid in LOCS: - process_site_frontend(prod, sid, data) + process_site(prod, sid, data) else: UNKNOWN[sid] = True if NWSLIRE.match(sid) is not None: @@ -547,6 +554,41 @@ def process_data(text): return prod +def process_accessdb_frontend(): + """Catch exceptions so that Looping call keeps going.""" + try: + process_accessdb() + except Exception as exp: + LOG.exception(exp) + + +def process_accessdb(): + """Queue up work to do.""" + threshold = utc() - datetime.timedelta(minutes=5) + for iemid, entry in ACCESSDB_QUEUE.items(): + records = [] + for localts in list(entry.records.keys()): + # Allow a window of time to accumulate data prior to writing + if entry.records[localts]["last"] > threshold: + continue + # Get a reference and delete it from the dict + records.append((localts, entry.records.pop(localts))) + if records: + df = ACCESSDB.runInteraction( + write_access_records, + records, + iemid, + entry, + ) + df.addErrback( + write_access_records_eb, + records, + iemid, + entry, + ) + df.addErrback(common.email_error) + + def main2(_res): """Go main Go!""" LOG.info("main() fired!") @@ -564,6 +606,10 @@ def main2(_res): lc3 = LoopingCall(MESOSITEDB.runInteraction, load_stations) df3 = lc3.start(60 * 60 * 12, now=False) df3.addErrback(common.email_error) + # Process entries in ACCESSDB_QUEUE + lc4 = LoopingCall(process_accessdb_frontend) + df4 = lc4.start(15, now=False) + df4.addErrback(common.email_error) def main(): diff --git a/tests/workflows/test_shef_parser.py b/tests/workflows/test_shef_parser.py index dc36ed79..e1c2a82d 100644 --- a/tests/workflows/test_shef_parser.py +++ b/tests/workflows/test_shef_parser.py @@ -1,5 +1,7 @@ """Test shef_parser.""" +import datetime from functools import partial +from zoneinfo import ZoneInfo # 3rd Party # pylint: disable=no-member-in-module @@ -26,9 +28,91 @@ def run_interaction(cursor, sql, args): def sync_workflow(prod, cursor): """Common workflow.""" mydata = shef_parser.restructure_data(prod) - print(mydata) for sid, data in mydata.items(): - shef_parser.process_site(cursor, prod, sid, data) + shef_parser.process_site(prod, sid, data) + # Just exercise the API + shef_parser.process_accessdb_frontend() + + # Arm the entries for processing + ts = utc() - datetime.timedelta(days=1) + for iemid, entry in shef_parser.ACCESSDB_QUEUE.items(): + for localts, record in entry.records.items(): + record["last"] = ts + shef_parser.write_access_records( + cursor, [(localts, record)], iemid, entry + ) + # Just exercise the API + shef_parser.process_accessdb_frontend() + + +def test_accessdb_exception(): + """Test some GIGO raises an exception.""" + shef_parser.ACCESSDB_QUEUE[-99] = 0 + shef_parser.process_accessdb_frontend() + shef_parser.ACCESSDB_QUEUE.pop(-99) + + +def test_missing_value(): + """Test that this missing value flags a database write to null_ col""" + shef_parser.LOCS["ALBW3"] = { + "WI_DCP": { + "valid": shef_parser.U1980, + "iemid": -99, + "tzname": "America/Chicago", + "epoc": 1, + "pedts": 1, + } + } + pywwa.CTX.utcnow = utc(2023, 10, 28, 3, 40) + shef_parser.process_data(get_example_file("SHEF/RRSNMC.txt")) + + +def test_midnight(): + """Test that a midnight report gets moved back one minute.""" + shef_parser.LOCS["AISI4"] = { + "IA_DCP": { + "valid": shef_parser.U1980, + "iemid": -99, + "tzname": "America/Chicago", + "epoc": 1, + "pedts": 1, + } + } + pywwa.CTX.utcnow = utc(2022, 11, 10, 12) + payload = get_example_file("SHEF/RR1.txt").replace("1150", "0600") + shef_parser.process_data(payload) + assert shef_parser.ACCESSDB_QUEUE[-99].records[utc(2022, 11, 10, 5, 59)] + + +def test_old_dcpdata(): + """Test that old DCP data is not sent to the database.""" + shef_parser.LOCS["AISI4"] = { + "IA_DCP": { + "valid": shef_parser.U1980, + "iemid": -99, + "tzname": "America/Chicago", + "epoc": 1, + "pedts": 1, + } + } + pywwa.CTX.utcnow = utc(2022, 11, 10, 12) + shef_parser.process_data(get_example_file("SHEF/RR1.txt")) + # rewind to 2021 + pywwa.CTX.utcnow = utc(2021, 11, 10, 12) + shef_parser.process_data(get_example_file("SHEF/RR1.txt")) + + +def test_get_localtime(): + """Test when we don't know of a station.""" + tzinfo = ZoneInfo("America/Chicago") + assert shef_parser.get_localtime("XX_DCP123456", tzinfo) == tzinfo + + +def test_231027_rr8msr_novariable(): + """Test this found in the wild.""" + pywwa.CTX.utcnow = utc(2023, 10, 19, 15) + prod = shef_parser.process_data(get_example_file("SHEF/RR8MSR.txt")) + assert not prod.data def test_empty_product(): @@ -62,6 +146,13 @@ def test_230926_rr8krf(cursor): shef_parser.insert_raw_inbound(cursor, element) +def test_bad_element_in_current_queue(): + """Test GIGO on current_queue.""" + shef_parser.CURRENT_QUEUE.clear() + shef_parser.CURRENT_QUEUE["HI|BYE|HI"] = {"dirty": True} + shef_parser.save_current() + + @pytest.mark.parametrize("database", ["iem"]) def test_omit_report(cursor): """Test that the report is omitted...""" @@ -103,26 +194,51 @@ def test_omit_report(cursor): shef_parser.ACCESSDB.runOperation = partial(run_interaction, cursor) assert shef_parser.save_current() == 7 + assert shef_parser.save_current() == 0 -@pytest.mark.parametrize("database", ["iem"]) -def test_process_site_eb(cursor): +def test_process_site_eb(): """Test that the errorback works without any side effects.""" pywwa.CTX.utcnow = utc(2017, 8, 15, 14) - prod = shef_parser.process_data(get_example_file("RR7.txt")) - sync_workflow(prod, cursor) - shef_parser.process_site_eb(Failure(Exception("Hi Daryl")), prod, "", {}) - shef_parser.process_site_eb(Failure(DeadlockDetected()), prod, "", {}) + shef_parser.process_data(get_example_file("RR7.txt")) + entry = shef_parser.ACCESSDB_ENTRY( + station="", network="", tzname="America/Chicago", records={} + ) + record = {"data": {}, "last": utc(), "product_id": ""} + shef_parser.write_access_records_eb( + Failure(Exception("Hi Daryl")), [(utc(), record)], 0, entry + ) + shef_parser.write_access_records_eb( + Failure(DeadlockDetected()), [(utc, record)], 0, entry + ) -@pytest.mark.parametrize("database", ["iem"]) -def test_checkvars(cursor): +def test_checkvars(): """Excerise the checkvars logic with the RR1.txt example""" + # Define an entry with two networks so to make life choices + shef_parser.LOCS["AISI4"] = { + "IA_COOP": { + "valid": shef_parser.U1980, + "iemid": -99, + "tzname": "America/Chicago", + "epoc": 1, + "pedts": 1, + }, + "ISUSM": { + "valid": shef_parser.U1980, + "iemid": -99, + "tzname": "America/Chicago", + "epoc": 1, + "pedts": 1, + }, + } payload = get_example_file("SHEF/RR1.txt") pywwa.CTX.utcnow = utc(2022, 11, 10, 12) for repl in ["TX", "HG", "SF", "PPH"]: - prod = shef_parser.process_data(payload.replace("/TX", f"/{repl}")) - sync_workflow(prod, cursor) + shef_parser.process_data(payload.replace("/TX", f"/{repl}")) + shef_parser.process_data(payload.replace("RR1", "RR3")) + shef_parser.LOCS["AISI4"].pop("IA_COOP") + shef_parser.process_data(payload) def test_restructure_data_eightchar_id():