Skip to content

Commit

Permalink
Merge pull request #184 from akrherz/shef
Browse files Browse the repository at this point in the history
mnt: optimize SHEF parser some more
  • Loading branch information
akrherz authored Oct 28, 2023
2 parents 7518c95 + 4f9581b commit 30b961f
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 32 deletions.
41 changes: 16 additions & 25 deletions parsers/pywwa/workflows/shef_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,6 @@
UNKNOWN = {}
# station metadata
LOCS = {}
# database timezones to cache
TIMEZONES = {
"UTC": ZoneInfo("UTC"),
"America/Chicago": ZoneInfo("America/Chicago"),
}
# a queue for saving database IO
CURRENT_QUEUE = {}
# Data structure to hold potential writes to IEMAccess
Expand All @@ -52,13 +47,15 @@
[
"station",
"network",
"tzname",
"tzinfo",
"records",
],
)
U1980 = utc(1980)
# Networks that can come via SHEF backdoors
DOUBLEBACKED_NETWORKS = ["ISUSM", "IA_RWIS"]
P1H = datetime.timedelta(hours=1)
P60D = datetime.timedelta(days=60)


DIRECTMAP = {
Expand Down Expand Up @@ -150,7 +147,11 @@ def load_stations(txn):
stid = row["id"]
iemid = row["iemid"]
network = row["network"]
tzname = row["tzname"]
try:
tzinfo = ZoneInfo(row["tzname"])
except Exception:
LOG.info("ZoneInfo does not like tzname: %s", row["tzname"])
tzinfo = ZoneInfo("UTC")
pedts = row["pedts"]
if stid in UNKNOWN:
LOG.info(" station: %s is no longer unknown!", stid)
Expand All @@ -160,18 +161,12 @@ def load_stations(txn):
metadata[network] = {
"valid": U1980,
"iemid": iemid,
"tzname": tzname,
"tzinfo": tzinfo,
"epoc": epoc,
"pedts": pedts,
}
else:
metadata[network]["epoc"] = epoc
if tzname not in TIMEZONES:
try:
TIMEZONES[tzname] = ZoneInfo(tzname)
except Exception:
LOG.info("ZoneInfo does not like tzname: %s", tzname)
TIMEZONES[tzname] = ZoneInfo("UTC")

# Now we find things that are outdated, note that other code can add
# placeholders that can get zapped here.
Expand Down Expand Up @@ -199,9 +194,9 @@ def restructure_data(prod):
if se.type != "R":
continue
# We don't care about data in the future!
if se.valid > (utcnow + datetime.timedelta(hours=1)):
if se.valid > (utcnow + P1H):
continue
if se.valid < (utcnow - datetime.timedelta(days=60)):
if se.valid < (utcnow - P60D):
if se.station in old:
continue
LOG.info("Rejecting old data %s %s", se.station, se.valid)
Expand Down Expand Up @@ -299,9 +294,7 @@ def get_localtime(sid, ts):
if sid not in LOCS:
return ts
_network = list(LOCS[sid])[0]
return ts.astimezone(
TIMEZONES.get(LOCS[sid][_network]["tzname"], ZoneInfo("UTC"))
)
return ts.astimezone(LOCS[sid][_network]["tzinfo"])


def get_network(prod, sid, data: List[SHEFElement]) -> str:
Expand Down Expand Up @@ -410,7 +403,7 @@ def process_site_time(prod, sid, ts, elements: List[SHEFElement]):
ACCESSDB_ENTRY(
station=sid,
network=network,
tzname=metadata["tzname"],
tzinfo=metadata["tzinfo"],
records={},
),
).records.setdefault(
Expand Down Expand Up @@ -483,7 +476,7 @@ def write_access_record(
accesstxn, record: dict, iemid, localts, entry: ACCESSDB_ENTRY
):
"""The batched database write."""
iemob = Observation(iemid=iemid, valid=localts, tzname=entry.tzname)
iemob = Observation(iemid=iemid, valid=localts, tzname=entry.tzinfo.key)
iemob.data.update(record["data"])
iscoop = entry.network.find("_COOP") > 0
iemob.save(accesstxn, force_current_log=iscoop)
Expand Down Expand Up @@ -532,10 +525,8 @@ def process_data(text):
return prod
product_id = prod.get_product_id()
# Update CURRENT_QUEUE
utcnow = common.utcnow()
for element in prod.data:
if element.valid > (utcnow + datetime.timedelta(hours=1)):
continue
time_threshold = common.utcnow() + P1H
for element in [e for e in prod.data if e.valid < time_threshold]:
update_current_queue(element, product_id)
# Create a nicer data structure
mydata = restructure_data(prod)
Expand Down
14 changes: 7 additions & 7 deletions tests/workflows/test_shef_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def test_missing_value():
"WI_DCP": {
"valid": shef_parser.U1980,
"iemid": -99,
"tzname": "America/Chicago",
"tzinfo": ZoneInfo("America/Chicago"),
"epoc": 1,
"pedts": 1,
}
Expand All @@ -73,7 +73,7 @@ def test_midnight():
"IA_DCP": {
"valid": shef_parser.U1980,
"iemid": -99,
"tzname": "America/Chicago",
"tzinfo": ZoneInfo("America/Chicago"),
"epoc": 1,
"pedts": 1,
}
Expand All @@ -90,7 +90,7 @@ def test_old_dcpdata():
"IA_DCP": {
"valid": shef_parser.U1980,
"iemid": -99,
"tzname": "America/Chicago",
"tzinfo": ZoneInfo("America/Chicago"),
"epoc": 1,
"pedts": 1,
}
Expand Down Expand Up @@ -167,7 +167,7 @@ def test_omit_report(cursor):
"MS_COOP": {
"valid": shef_parser.U1980,
"iemid": -99,
"tzname": "America/Chicago",
"tzinfo": ZoneInfo("America/Chicago"),
"epoc": 1,
"pedts": 1,
}
Expand Down Expand Up @@ -202,7 +202,7 @@ def test_process_site_eb():
pywwa.CTX.utcnow = utc(2017, 8, 15, 14)
shef_parser.process_data(get_example_file("RR7.txt"))
entry = shef_parser.ACCESSDB_ENTRY(
station="", network="", tzname="America/Chicago", records={}
station="", network="", tzinfo=ZoneInfo("America/Chicago"), records={}
)
record = {"data": {}, "last": utc(), "product_id": ""}
shef_parser.write_access_records_eb(
Expand All @@ -220,14 +220,14 @@ def test_checkvars():
"IA_COOP": {
"valid": shef_parser.U1980,
"iemid": -99,
"tzname": "America/Chicago",
"tzinfo": ZoneInfo("America/Chicago"),
"epoc": 1,
"pedts": 1,
},
"ISUSM": {
"valid": shef_parser.U1980,
"iemid": -99,
"tzname": "America/Chicago",
"tzinfo": ZoneInfo("America/Chicago"),
"epoc": 1,
"pedts": 1,
},
Expand Down

0 comments on commit 30b961f

Please sign in to comment.