Skip to content

Commit

Permalink
Merge pull request #183 from akrherz/gh181_batch_iemaccess
Browse files Browse the repository at this point in the history
fix: refactor IEMAccess logic for #181
  • Loading branch information
akrherz authored Oct 28, 2023
2 parents 3e3f22b + bc4c7af commit 7518c95
Show file tree
Hide file tree
Showing 4 changed files with 282 additions and 100 deletions.
14 changes: 14 additions & 0 deletions examples/SHEF/RR8MSR.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
751
SRUS83 KMSR 191448
RR8MSR

.E SOMW3 1019 C DH0900/DIN05/5.06/5.07/5.07/5.06/5.06/5.07/5.06
.E1 5.07/5.06/5.07/5.07/5.07/5.07/5.07/5.07/5.07/5.07/5.07/5.07
.E2 5.07/5.07/5.07/5.07/5.07/5.07/5.07/5.07/5.07/5.07/5.07/5.07
.E3 5.07/5.07/5.06/5.07/5.07/5.07/5.07/5.07/5.07/5.07/5.07/5.06
.E4 5.07/5.07/5.07/5.07/5.07/5.07/5.07/5.07/5.07/5.07/5.07/5.07
.E5 5.07/5.07/5.07/5.07/5.07/5.07/5.07/5.06/5.07/5.07/5.07/5.07
.E6 5.07/5.07/5.07/5.07/5.07/5.06/5.07/5.07/5.07/5.07/5.07/5.06
.E7 5.07/5.06/5.06/5.07/5.07/5.07/5.06/5.07/5.07/5.07/5.07/5.07
.E8 5.07/5.07/5.07/5.07/5.07/5.07/5.07/5.07/5.07/5.07/5.07/5.07
.E9 5.07/5.07/5.07/5.07/5.07
6 changes: 6 additions & 0 deletions examples/SHEF/RRSNMC.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
262
SXUS72 KWBC 280332
RRSNMC
:&&HADS SOR REPORT FOR USER NMC
.A ALBW3 20231028 DH0300/PPHRG M/PPDRG -9/USIRG 10

222 changes: 134 additions & 88 deletions parsers/pywwa/workflows/shef_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,19 @@
import datetime
import random
import re
from collections import namedtuple
from typing import List
from zoneinfo import ZoneInfo

# 3rd Party
# pylint: disable=no-name-in-module
from psycopg.errors import DeadlockDetected
from pyiem import reference
from pyiem.models.shef import SHEFElement
from pyiem.nws.products.shef import parser
from pyiem.observation import Observation
from pyiem.util import LOG, convert_value, utc
from twisted.internet import reactor
from twisted.internet.task import LoopingCall
from twisted.internet.task import LoopingCall, deferLater

# Local
from pywwa import common
Expand All @@ -38,9 +38,24 @@
# station metadata
LOCS = {}
# database timezones to cache
TIMEZONES = {}
TIMEZONES = {
"UTC": ZoneInfo("UTC"),
"America/Chicago": ZoneInfo("America/Chicago"),
}
# a queue for saving database IO
CURRENT_QUEUE = {}
# Data structure to hold potential writes to IEMAccess
# [iemid] -> ACCESSDB_ENTRY -> records -> localts -> {}
ACCESSDB_QUEUE = {}
ACCESSDB_ENTRY = namedtuple(
"ACCESSDB_ENTRY",
[
"station",
"network",
"tzname",
"records",
],
)
U1980 = utc(1980)
# Networks that can come via SHEF backdoors
DOUBLEBACKED_NETWORKS = ["ISUSM", "IA_RWIS"]
Expand Down Expand Up @@ -245,6 +260,7 @@ def save_current() -> int:
continue
cnt += 1
(sid, varname, _depth) = k.split("|")
# This is unlikely, but still GIGO sometimes :/
if len(varname) != 7:
LOG.info("Got varname of '%s' somehow? %s", varname, mydict)
continue
Expand Down Expand Up @@ -289,8 +305,12 @@ def get_localtime(sid, ts):


def get_network(prod, sid, data: List[SHEFElement]) -> str:
"""Figure out which network this belongs to"""
networks = list(LOCS.get(sid, {}).keys())
"""Logic for figuring out network in face of ambiguity.
Note: This sid should already be in LOCS, so we are only either taking
the single entry or picking between DCP and COOP variants.
"""
networks = list(LOCS[sid].keys())
# This is the best we can hope for
if len(networks) == 1:
return networks[0]
Expand All @@ -306,44 +326,21 @@ def get_network(prod, sid, data: List[SHEFElement]) -> str:
is_coop = True
pnetwork = "COOP" if is_coop else "DCP"
# filter networks now
networks = [s for s in networks if s.find(pnetwork) > 0]
if len(networks) == 1:
return networks[0]
for network in networks:
if network.find(pnetwork) > 0:
return network
# Throw our hands up in the air
return networks[0]


# If networks is zero length, then we have to try some things
if not networks:
HADSDB.runInteraction(enter_unknown, sid, prod.get_product_id(), "")
if len(sid) == 5:
state = reference.nwsli2state.get(sid[3:])
country = reference.nwsli2country.get(sid[3:])
if country in ["CA", "MX"]:
return f"{country}_{state}_{pnetwork}"
if country == "US":
return f"{state}_{pnetwork}"
return f"{country}__{pnetwork}"

if sid not in UNKNOWN:
UNKNOWN[sid] = True
LOG.info("failure for sid: %s tp: %s", sid, prod.get_product_id())
return None


def process_site_frontend(prod, sid, data):
"""Frontdoor for process_site()"""
df = ACCESSDB.runInteraction(process_site, prod, sid, data)
df.addErrback(process_site_eb, prod, sid, data)
df.addErrback(common.email_error, prod.unixtext)
df.addErrback(LOG.error)


def process_site(accesstxn, prod, sid, data):
def process_site(prod, sid, data):
"""Consumption of rectified data."""
# Order the timestamps so that we process the newest data last, so that
# all obs are potentially processed through iemaccess
times = list(data.keys())
times.sort()
for tstamp in times:
process_site_time(accesstxn, prod, sid, tstamp, data[tstamp])
process_site_time(prod, sid, tstamp, data[tstamp])


def insert_raw_inbound(cursor, element: SHEFElement) -> int:
Expand Down Expand Up @@ -391,46 +388,45 @@ def update_current_queue(element: SHEFElement, product_id: str):
cur["dirty"] = True


def process_site_time(accesstxn, prod, sid, ts, elements: List[SHEFElement]):
def process_site_time(prod, sid, ts, elements: List[SHEFElement]):
"""Ingest for IEMAccess."""
network = get_network(prod, sid, elements)
if network is None or network in DOUBLEBACKED_NETWORKS:
return

localts = get_localtime(sid, ts)
# This should always work!
metadata = LOCS[sid][network]
# Do not send DCP sites with old data to IEMAccess
if network.find("_DCP") > 0 and localts < LOCS.get(sid, {}).get(
"valid", localts
):
return
metadata = LOCS.get(sid, {}).get(network)
if metadata is None:
LOG.info("Unknown station: %s %s", sid, network)
if network.find("_DCP") > 0 and localts < metadata["valid"]:
return
metadata["valid"] = localts

# Okay, time for a hack, if our observation is at midnight!
if localts.hour == 0 and localts.minute == 0:
localts -= datetime.timedelta(minutes=1)
# LOG.info("Shifting %s [%s] back one minute: %s" % (sid, network,
# localts))

iemob = Observation(
iemid=metadata["iemid"],
valid=localts,
tzname=metadata["tzname"],
record = ACCESSDB_QUEUE.setdefault(
metadata["iemid"],
ACCESSDB_ENTRY(
station=sid,
network=network,
tzname=metadata["tzname"],
records={},
),
).records.setdefault(
localts,
{
"last": U1980,
"data": {},
"product_id": prod.get_product_id(),
},
)
# This is likely going to kill performance, but we need to do it for now
iemob.load(accesstxn)
iscoop = network.find("COOP") > 0
hasdata = False
# TODO Special rstage logic in case PEDTS is defined
# pedts = metadata[network]["pedts"]
# Update last
record["last"] = utc()

report = None
afos = prod.afos
for se in elements:
if se.type != "R":
continue
if se.narrative:
report = se.narrative
if (
Expand All @@ -446,24 +442,22 @@ def process_site_time(accesstxn, prod, sid, ts, elements: List[SHEFElement]):
# We generally are in english units, this also converts the wind
# direction to the full degrees
val = se.to_english()
# TODO it is not clear if we can hit this code or not
if val is None:
# Behold, glorious hack here to force nulls into the summary
# database that uses coerce
iemob.data[f"null_{iemvar}"] = None
hasdata = True
iemob.data[iemvar] = val
record["data"][f"null_{iemvar}"] = None
record["data"][iemvar] = val
if iemvar in ["pday", "snow", "snowd"]:
# Prevent negative numbers
if val is not None and val < 0:
iemob.data[iemvar] = 0
record["data"][iemvar] = 0
if iemvar in ["sknt", "gust"] and val is not None:
# mph to knots :/
val = convert_value(val, "mile / hour", "knot")
if iscoop:
if network.find("_COOP") > 0:
# Save COOP 'at-ob' temperature into summary table
if iemvar == "tmpf":
iemob.data["coop_tmpf"] = val
record["data"]["coop_tmpf"] = val
# Save observation time into the summary table
if iemvar in [
"tmpf",
Expand All @@ -473,21 +467,26 @@ def process_site_time(accesstxn, prod, sid, ts, elements: List[SHEFElement]):
"snow",
"snowd",
]:
iemob.data["coop_valid"] = iemob.data["valid"]
# if pedts is not None and f"{pedts}Z" in data:
# val = None if data[f"{pedts}Z"] < -9998 else data[f"{pedts}Z"]
# iemob.data["rstage"] = val
record["data"]["coop_valid"] = localts

if not hasdata:
return
iemob.data["raw"] = prod.get_product_id()
iemob.data["report"] = report
# Only force COOP data into current_log even if we have newer obs
if not iemob.save(accesstxn, force_current_log=iscoop):
LOG.info("Failed to save %s %s", sid, iemob)
HADSDB.runInteraction(
enter_unknown, sid, prod.get_product_id(), network
)
record["data"]["raw"] = prod.get_product_id()
record["data"]["report"] = report


def write_access_records(accesstxn, records: [], iemid, entry: ACCESSDB_ENTRY):
"""Batch the records to to prevent deadlocks, maybe!"""
for localts, record in records:
write_access_record(accesstxn, record, iemid, localts, entry)


def write_access_record(
accesstxn, record: dict, iemid, localts, entry: ACCESSDB_ENTRY
):
"""The batched database write."""
iemob = Observation(iemid=iemid, valid=localts, tzname=entry.tzname)
iemob.data.update(record["data"])
iscoop = entry.network.find("_COOP") > 0
iemob.save(accesstxn, force_current_log=iscoop)


def log_database_queue_size():
Expand All @@ -500,20 +499,28 @@ def log_database_queue_size():
)


def process_site_eb(err, prod, sid, data):
def write_access_records_eb(err, records: list, iemid, entry: ACCESSDB_ENTRY):
"""Errorback from process_site transaction."""
if isinstance(err.value, DeadlockDetected):
jitter = random.randint(0, 30)
LOG.info(
"Database Deadlock: prod:%s sid:%s, retrying in %s seconds",
prod.get_product_id(),
sid,
"Database Deadlock: %s[%s], retrying in %s seconds",
entry.station,
entry.network,
jitter,
)
df = deferLater(
reactor,
jitter,
ACCESSDB.runInteraction,
write_access_records,
records,
iemid,
entry,
)
reactor.callLater(jitter, process_site_frontend, prod, sid, data)
df.addErrback(common.email_error)
return
msg = f"process_site({prod.get_product_id()}, {sid}, {data}) got {err}"
common.email_error(err, msg)
common.email_error(err, f"write_access_entry({entry.station}) got {err}")


def process_data(text):
Expand All @@ -538,7 +545,7 @@ def process_data(text):
if sid in UNKNOWN:
continue
if sid in LOCS:
process_site_frontend(prod, sid, data)
process_site(prod, sid, data)
else:
UNKNOWN[sid] = True
if NWSLIRE.match(sid) is not None:
Expand All @@ -547,6 +554,41 @@ def process_data(text):
return prod


def process_accessdb_frontend():
"""Catch exceptions so that Looping call keeps going."""
try:
process_accessdb()
except Exception as exp:
LOG.exception(exp)


def process_accessdb():
"""Queue up work to do."""
threshold = utc() - datetime.timedelta(minutes=5)
for iemid, entry in ACCESSDB_QUEUE.items():
records = []
for localts in list(entry.records.keys()):
# Allow a window of time to accumulate data prior to writing
if entry.records[localts]["last"] > threshold:
continue
# Get a reference and delete it from the dict
records.append((localts, entry.records.pop(localts)))
if records:
df = ACCESSDB.runInteraction(
write_access_records,
records,
iemid,
entry,
)
df.addErrback(
write_access_records_eb,
records,
iemid,
entry,
)
df.addErrback(common.email_error)


def main2(_res):
"""Go main Go!"""
LOG.info("main() fired!")
Expand All @@ -564,6 +606,10 @@ def main2(_res):
lc3 = LoopingCall(MESOSITEDB.runInteraction, load_stations)
df3 = lc3.start(60 * 60 * 12, now=False)
df3.addErrback(common.email_error)
# Process entries in ACCESSDB_QUEUE
lc4 = LoopingCall(process_accessdb_frontend)
df4 = lc4.start(15, now=False)
df4.addErrback(common.email_error)


def main():
Expand Down
Loading

0 comments on commit 7518c95

Please sign in to comment.