From 4f9581bb0c0d04b62c0eaa27ff66a04af5f8e619 Mon Sep 17 00:00:00 2001 From: akrherz Date: Sat, 28 Oct 2023 13:36:18 -0500 Subject: [PATCH] mnt: optimize SHEF parser some more --- parsers/pywwa/workflows/shef_parser.py | 41 ++++++++++---------------- tests/workflows/test_shef_parser.py | 14 ++++----- 2 files changed, 23 insertions(+), 32 deletions(-) diff --git a/parsers/pywwa/workflows/shef_parser.py b/parsers/pywwa/workflows/shef_parser.py index e1bfdfa4..547fd40f 100644 --- a/parsers/pywwa/workflows/shef_parser.py +++ b/parsers/pywwa/workflows/shef_parser.py @@ -37,11 +37,6 @@ UNKNOWN = {} # station metadata LOCS = {} -# database timezones to cache -TIMEZONES = { - "UTC": ZoneInfo("UTC"), - "America/Chicago": ZoneInfo("America/Chicago"), -} # a queue for saving database IO CURRENT_QUEUE = {} # Data structure to hold potential writes to IEMAccess @@ -52,13 +47,15 @@ [ "station", "network", - "tzname", + "tzinfo", "records", ], ) U1980 = utc(1980) # Networks that can come via SHEF backdoors DOUBLEBACKED_NETWORKS = ["ISUSM", "IA_RWIS"] +P1H = datetime.timedelta(hours=1) +P60D = datetime.timedelta(days=60) DIRECTMAP = { @@ -150,7 +147,11 @@ def load_stations(txn): stid = row["id"] iemid = row["iemid"] network = row["network"] - tzname = row["tzname"] + try: + tzinfo = ZoneInfo(row["tzname"]) + except Exception: + LOG.info("ZoneInfo does not like tzname: %s", row["tzname"]) + tzinfo = ZoneInfo("UTC") pedts = row["pedts"] if stid in UNKNOWN: LOG.info(" station: %s is no longer unknown!", stid) @@ -160,18 +161,12 @@ def load_stations(txn): metadata[network] = { "valid": U1980, "iemid": iemid, - "tzname": tzname, + "tzinfo": tzinfo, "epoc": epoc, "pedts": pedts, } else: metadata[network]["epoc"] = epoc - if tzname not in TIMEZONES: - try: - TIMEZONES[tzname] = ZoneInfo(tzname) - except Exception: - LOG.info("ZoneInfo does not like tzname: %s", tzname) - TIMEZONES[tzname] = ZoneInfo("UTC") # Now we find things that are outdated, note that other code can add # placeholders that can get zapped here. @@ -199,9 +194,9 @@ def restructure_data(prod): if se.type != "R": continue # We don't care about data in the future! - if se.valid > (utcnow + datetime.timedelta(hours=1)): + if se.valid > (utcnow + P1H): continue - if se.valid < (utcnow - datetime.timedelta(days=60)): + if se.valid < (utcnow - P60D): if se.station in old: continue LOG.info("Rejecting old data %s %s", se.station, se.valid) @@ -299,9 +294,7 @@ def get_localtime(sid, ts): if sid not in LOCS: return ts _network = list(LOCS[sid])[0] - return ts.astimezone( - TIMEZONES.get(LOCS[sid][_network]["tzname"], ZoneInfo("UTC")) - ) + return ts.astimezone(LOCS[sid][_network]["tzinfo"]) def get_network(prod, sid, data: List[SHEFElement]) -> str: @@ -410,7 +403,7 @@ def process_site_time(prod, sid, ts, elements: List[SHEFElement]): ACCESSDB_ENTRY( station=sid, network=network, - tzname=metadata["tzname"], + tzinfo=metadata["tzinfo"], records={}, ), ).records.setdefault( @@ -483,7 +476,7 @@ def write_access_record( accesstxn, record: dict, iemid, localts, entry: ACCESSDB_ENTRY ): """The batched database write.""" - iemob = Observation(iemid=iemid, valid=localts, tzname=entry.tzname) + iemob = Observation(iemid=iemid, valid=localts, tzname=entry.tzinfo.key) iemob.data.update(record["data"]) iscoop = entry.network.find("_COOP") > 0 iemob.save(accesstxn, force_current_log=iscoop) @@ -532,10 +525,8 @@ def process_data(text): return prod product_id = prod.get_product_id() # Update CURRENT_QUEUE - utcnow = common.utcnow() - for element in prod.data: - if element.valid > (utcnow + datetime.timedelta(hours=1)): - continue + time_threshold = common.utcnow() + P1H + for element in [e for e in prod.data if e.valid < time_threshold]: update_current_queue(element, product_id) # Create a nicer data structure mydata = restructure_data(prod) diff --git a/tests/workflows/test_shef_parser.py b/tests/workflows/test_shef_parser.py index e1c2a82d..a146d600 100644 --- a/tests/workflows/test_shef_parser.py +++ b/tests/workflows/test_shef_parser.py @@ -58,7 +58,7 @@ def test_missing_value(): "WI_DCP": { "valid": shef_parser.U1980, "iemid": -99, - "tzname": "America/Chicago", + "tzinfo": ZoneInfo("America/Chicago"), "epoc": 1, "pedts": 1, } @@ -73,7 +73,7 @@ def test_midnight(): "IA_DCP": { "valid": shef_parser.U1980, "iemid": -99, - "tzname": "America/Chicago", + "tzinfo": ZoneInfo("America/Chicago"), "epoc": 1, "pedts": 1, } @@ -90,7 +90,7 @@ def test_old_dcpdata(): "IA_DCP": { "valid": shef_parser.U1980, "iemid": -99, - "tzname": "America/Chicago", + "tzinfo": ZoneInfo("America/Chicago"), "epoc": 1, "pedts": 1, } @@ -167,7 +167,7 @@ def test_omit_report(cursor): "MS_COOP": { "valid": shef_parser.U1980, "iemid": -99, - "tzname": "America/Chicago", + "tzinfo": ZoneInfo("America/Chicago"), "epoc": 1, "pedts": 1, } @@ -202,7 +202,7 @@ def test_process_site_eb(): pywwa.CTX.utcnow = utc(2017, 8, 15, 14) shef_parser.process_data(get_example_file("RR7.txt")) entry = shef_parser.ACCESSDB_ENTRY( - station="", network="", tzname="America/Chicago", records={} + station="", network="", tzinfo=ZoneInfo("America/Chicago"), records={} ) record = {"data": {}, "last": utc(), "product_id": ""} shef_parser.write_access_records_eb( @@ -220,14 +220,14 @@ def test_checkvars(): "IA_COOP": { "valid": shef_parser.U1980, "iemid": -99, - "tzname": "America/Chicago", + "tzinfo": ZoneInfo("America/Chicago"), "epoc": 1, "pedts": 1, }, "ISUSM": { "valid": shef_parser.U1980, "iemid": -99, - "tzname": "America/Chicago", + "tzinfo": ZoneInfo("America/Chicago"), "epoc": 1, "pedts": 1, },