Skip to content

Commit

Permalink
Merge pull request #132 from pastas/improve_hpd
Browse files Browse the repository at this point in the history
Improve hpd
  • Loading branch information
dbrakenhoff authored Aug 9, 2024
2 parents fa7cee4 + e8e15d4 commit 62161f2
Show file tree
Hide file tree
Showing 5 changed files with 176 additions and 77 deletions.
2 changes: 1 addition & 1 deletion pastastore/extensions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ def activate_hydropandas_extension():

print(
"Registered HydroPandas extension in PastaStore class, "
"e.g. `pstore.hpd.download_bro()`."
"e.g. `pstore.hpd.download_bro_gmw()`."
)
222 changes: 147 additions & 75 deletions pastastore/extensions/hpd.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
"""

import logging
from typing import Optional, Union
from typing import List, Optional, Union

import hydropandas as hpd
import numpy as np
Expand Down Expand Up @@ -50,6 +50,7 @@ def add_obscollection(
kind: Optional[str] = None,
data_column: Optional[str] = None,
unit_multiplier: float = 1.0,
update: bool = False,
):
"""Add an ObsCollection to the PastaStore.
Expand All @@ -66,12 +67,20 @@ def add_obscollection(
name of column containing observation values, by default None.
unit_multiplier : float, optional
multiply unit by this value before saving it in the store
update : bool, optional
if True, update currently stored time series with new data
"""
for name, row in oc.iterrows():
obs = row["obs"]
# metadata = row.drop("obs").to_dict()
self.add_observation(
libname, obs, name=name, kind=kind, data_column=data_column
libname,
obs,
name=name,
kind=kind,
data_column=data_column,
unit_multiplier=unit_multiplier,
update=update,
)

def add_observation(
Expand All @@ -82,6 +91,7 @@ def add_observation(
kind: Optional[str] = None,
data_column: Optional[str] = None,
unit_multiplier: float = 1.0,
update: bool = False,
):
"""Add an hydropandas observation series to the PastaStore.
Expand All @@ -101,6 +111,8 @@ def add_observation(
name of column containing observation values, by default None.
unit_multiplier : float, optional
multiply unit by this value before saving it in the store
update : bool, optional
if True, update currently stored time series with new data
"""
# if data_column is not None, use data_column
if data_column is not None:
Expand All @@ -126,6 +138,13 @@ def add_observation(
# gather metadata from obs object
metadata = {key: getattr(obs, key) for key in obs._metadata}

# convert np dtypes to builtins
for k, v in metadata.items():
if isinstance(v, np.integer):
metadata[k] = int(v)
elif isinstance(v, np.floating):
metadata[k] = float(v)

metadata.pop("name", None)
metadata.pop("meta", None)
unit = metadata.get("unit", None)
Expand All @@ -138,18 +157,28 @@ def add_observation(
if len(source) > 0:
source = f"{source} "

if update:
action_msg = "updated in"
else:
action_msg = "added to"

if libname == "oseries":
self._store.add_oseries(o, name, metadata=metadata)
logger.info("%sobservation '%s' added to oseries library.", source, name)
self._store.upsert_oseries(o, name, metadata=metadata)
logger.info(
"%sobservation '%s' %s oseries library.", source, name, action_msg
)
elif libname == "stresses":
if kind is None:
raise ValueError("`kind` must be specified for stresses!")
self._store.add_stress(o * unit_multiplier, name, kind, metadata=metadata)
self._store.upsert_stress(
o * unit_multiplier, name, kind, metadata=metadata
)
logger.info(
"%sstress '%s' (kind='%s') added to stresses library.",
"%sstress '%s' (kind='%s') %s stresses library.",
source,
name,
kind,
action_msg,
)
else:
raise ValueError("libname must be 'oseries' or 'stresses'.")
Expand All @@ -161,7 +190,6 @@ def download_knmi_precipitation(
tmin: TimeType = None,
tmax: TimeType = None,
unit_multiplier: float = 1e3,
update: bool = False,
**kwargs,
):
"""Download precipitation data from KNMI and store in PastaStore.
Expand All @@ -179,8 +207,6 @@ def download_knmi_precipitation(
unit_multiplier : float, optional
multiply unit by this value before saving it in the store,
by default 1e3 to convert m to mm
update : bool, optional
if True, update currently stored precipitation time series with new data
"""
self.download_knmi_meteo(
meteo_var=meteo_var,
Expand All @@ -189,7 +215,6 @@ def download_knmi_precipitation(
tmin=tmin,
tmax=tmax,
unit_multiplier=unit_multiplier,
update=update,
**kwargs,
)

Expand All @@ -200,7 +225,6 @@ def download_knmi_evaporation(
tmin: TimeType = None,
tmax: TimeType = None,
unit_multiplier: float = 1e3,
update: bool = False,
**kwargs,
):
"""Download evaporation data from KNMI and store in PastaStore.
Expand All @@ -218,8 +242,6 @@ def download_knmi_evaporation(
unit_multiplier : float, optional
multiply unit by this value before saving it in the store,
by default 1e3 to convert m to mm
update : bool, optional
if True, update currently stored evaporation time series with new data
"""
self.download_knmi_meteo(
meteo_var=meteo_var,
Expand All @@ -228,7 +250,6 @@ def download_knmi_evaporation(
tmin=tmin,
tmax=tmax,
unit_multiplier=unit_multiplier,
update=update,
**kwargs,
)

Expand All @@ -240,7 +261,6 @@ def download_knmi_meteo(
tmin: TimeType = None,
tmax: TimeType = None,
unit_multiplier: float = 1.0,
update: bool = False,
**kwargs,
):
"""Download meteorological data from KNMI and store in PastaStore.
Expand All @@ -261,41 +281,15 @@ def download_knmi_meteo(
unit_multiplier : float, optional
multiply unit by this value before saving it in the store,
by default 1.0 (no conversion)
update : bool, optional
if True, update currently stored precipitation time series with new data
"""
# get tmin/tmax if not specified
if update:
stressnames = self._store.stresses.loc[
self._store.stresses["kind"] == kind
].index.tolist()
tmintmax = self._store.get_tmin_tmax("stresses", names=stressnames)
if tmin is None:
tmin = tmintmax.loc[:, "tmax"].min()
if tmax is None:
tmax = Timestamp.now().normalize()
else:
tmintmax = self._store.get_tmin_tmax("oseries")
if tmin is None:
tmin = tmintmax.loc[:, "tmin"].min() - Timedelta(days=10 * 365)
if tmax is None:
tmax = tmintmax.loc[:, "tmax"].max()
tmintmax = self._store.get_tmin_tmax("oseries")
if tmin is None:
tmin = tmintmax.loc[:, "tmin"].min() - Timedelta(days=10 * 365)
if tmax is None:
tmax = tmintmax.loc[:, "tmax"].max()

# if update, only download data for stations in store
if update:
locations = None
if stns is None:
stns = self._store.stresses.loc[stressnames, "station"].tolist()
else:
check = np.isin(
stns, self._store.stresses.loc[stressnames, "station"].values
)
if not check.all():
raise ValueError(
"Not all specified stations are in the store: "
f"{np.array(stns)[~check]}"
)
elif stns is None:
if stns is None:
locations = self._store.oseries.loc[:, ["x", "y"]]
else:
locations = None
Expand All @@ -317,14 +311,71 @@ def download_knmi_meteo(
kind=kind,
data_column=meteo_var,
unit_multiplier=unit_multiplier,
update=False,
)

def update_knmi_meteo(
self,
names: Optional[List[str]] = None,
tmin: TimeType = None,
tmax: TimeType = None,
):
"""Update meteorological data from KNMI in PastaStore.
Parameters
----------
names : list of str, optional
list of names of observations to update, by default None
tmin : TimeType, optional
start time, by default None, which uses current last observation timestamp
as tmin
tmax : TimeType, optional
end time, by default None, which defaults to today
"""
if names is None:
names = self._store.stresses.loc[
self._store.stresses["source"] == "KNMI"
].index.tolist()

tmintmax = self._store.get_tmin_tmax("stresses", names=names)

for name in tqdm(names, desc="Updating KNMI meteo stresses"):
stn = self._store.stresses.loc[name, "station"]
meteo_var = self._store.stresses.loc[name, "meteo_var"]
unit = self._store.stresses.loc[name, "unit"]
kind = self._store.stresses.loc[name, "kind"]

if unit == "mm":
unit_multiplier = 1e3
else:
unit_multiplier = 1.0

if tmin is None:
tmin = tmintmax.loc[name, "tmax"]

knmi = hpd.read_knmi(
stns=[stn],
meteo_vars=[meteo_var],
starts=tmin,
ends=tmax,
)

self.add_observation(
"stresses",
knmi["obs"].iloc[0],
name=name,
kind=kind,
data_column=meteo_var,
unit_multiplier=unit_multiplier,
update=True,
)

def download_bro_gmw(
self,
extent=None,
tmin=None,
tmax=None,
update=False,
extent: Optional[List[float]] = None,
tmin: TimeType = None,
tmax: TimeType = None,
update: bool = False,
**kwargs,
):
"""Download groundwater monitoring well observations from BRO.
Expand All @@ -337,32 +388,53 @@ def download_bro_gmw(
Start date of the observations to download.
tmax: pandas.Timestamp, optional
End date of the observations to download.
update: bool, optional
If True, update existing observations in the store.
**kwargs: dict, optional
Additional keyword arguments to pass to `hpd.read_bro()`
"""
if extent is not None and update:
raise ValueError("Cannot specify extent AND update=True.")
elif extent is None and not update:
raise ValueError("Either extent or update=True must be specified.")
bro = hpd.read_bro(
extent=extent,
tmin=tmin,
tmax=tmax,
**kwargs,
)
self.add_obscollection("oseries", bro, data_column="values", update=update)

if update:
tmintmax = self._store.get_tmin_tmax("oseries")
for obsnam in tqdm(
self._store.oseries.index, desc="Updating oseries from BRO"
):
bro_id, tube_number = obsnam.split("_")
tmin, tmax = tmintmax.loc[obsnam]
obs = hpd.GroundWaterObs.from_bro(
bro_id, int(tube_number), tmin=tmin, tmax=tmax, **kwargs
)
self.add_observation("oseries", obs, name=obsnam, data_column="values")
else:
bro = hpd.read_bro(
extent=extent,
tmin=tmin,
tmax=tmax,
**kwargs,
def update_bro_gmw(
self,
names: Optional[List[str]] = None,
tmin: TimeType = None,
tmax: TimeType = None,
**kwargs,
):
"""Update groundwater monitoring well observations from BRO.
Parameters
----------
names : list of str, optional
list of names of observations to update, by default None which updates all
stored oseries.
tmin : TimeType, optional
start time, by default None, which uses current last observation timestamp
as tmin
tmax : TimeType, optional
end time, by default None, which defaults to today
**kwargs : dict, optional
Additional keyword arguments to pass to `hpd.GroundwaterObs.from_bro()`
"""
if names is None:
names = self._store.oseries.index.to_list()

tmintmax = self._store.get_tmin_tmax("oseries")

for obsnam in tqdm(names, desc="Updating BRO oseries"):
bro_id, tube_number = obsnam.split("_")

if tmin is None:
_, tmin = tmintmax.loc[obsnam] # tmin is stored tmax

obs = hpd.GroundwaterObs.from_bro(
bro_id, int(tube_number), tmin=tmin, tmax=tmax, **kwargs
)
self.add_observation(
"oseries", obs, name=obsnam, data_column="values", update=True
)
self.add_obscollection("oseries", bro, data_column="values")
3 changes: 2 additions & 1 deletion pastastore/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
PASTAS_LEQ_022 = PASTAS_VERSION <= parse_version("0.22.0")
PASTAS_GEQ_150 = PASTAS_VERSION >= parse_version("1.5.0")

__version__ = "1.5.0"
__version__ = "1.6.0"


def show_versions(optional=False) -> None:
Expand All @@ -21,6 +21,7 @@ def show_versions(optional=False) -> None:
Print the version of optional dependencies, by default False
"""
msg = (
f"Pastastore version : {__version__}\n\n"
f"Python version : {python_version()}\n"
f"Pandas version : {metadata.version('pandas')}\n"
f"Matplotlib version : {metadata.version('matplotlib')}\n"
Expand Down
Binary file added tests/data/test_hpd_update.zip
Binary file not shown.
Loading

0 comments on commit 62161f2

Please sign in to comment.