From 29d8b814f3f022d63d3e070231f6e59b45814207 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20S=C3=A1nchez-Gallego?= Date: Mon, 11 Nov 2024 08:34:01 -0800 Subject: [PATCH] Update DB record during fill (#10) * Refactor code to update DB record during fill * Rename done -> complete --- src/lvmcryo/__main__.py | 43 +++------- src/lvmcryo/handlers/ln2.py | 20 ++++- src/lvmcryo/runner.py | 4 + src/lvmcryo/tools.py | 154 ++++++++++++++++++++++++++---------- 4 files changed, 149 insertions(+), 72 deletions(-) diff --git a/src/lvmcryo/__main__.py b/src/lvmcryo/__main__.py index f505c8a..1f1b196 100644 --- a/src/lvmcryo/__main__.py +++ b/src/lvmcryo/__main__.py @@ -21,6 +21,7 @@ from typer.core import TyperGroup from lvmcryo.config import Actions, InteractiveMode, NotificationLevel +from lvmcryo.tools import DBHandler LOCKFILE = pathlib.Path("/data/lvmcryo.lock") @@ -393,7 +394,6 @@ async def ln2( """ - import json from logging import FileHandler from tempfile import NamedTemporaryFile @@ -410,7 +410,6 @@ async def ln2( LockExistsError, add_json_handler, ensure_lock, - write_fill_to_db, ) from lvmcryo.validate import validate_fill @@ -426,7 +425,6 @@ async def ln2( json_path: pathlib.Path | None = None json_handler: FileHandler | None = None - log_data: list[dict] | None = None images: dict[str, pathlib.Path | None] = {} if action == Actions.abort: @@ -543,10 +541,15 @@ async def ln2( log.warning("Lock file exists. Removing it because --clear-lock.") LOCKFILE.unlink() + db_handler = DBHandler(action, handler, config, json_handler=json_handler) + record_pk = await db_handler.write(complete=False) + if record_pk: + log.debug(f"Record {record_pk} created in the database.") + try: with ensure_lock(LOCKFILE): # Run worker. - await ln2_runner(handler, config, notifier) + await ln2_runner(handler, config, notifier, db_handler=db_handler) # Check handler status. if handler.failed: @@ -605,10 +608,10 @@ async def ln2( log.info(f"Event times:\n{handler.event_times.model_dump_json(indent=2)}") if not skip_finally: - configuration_json = config.model_dump() | { - valve: valve_model.model_dump() - for valve, valve_model in config.valve_info.items() - } + # Do a quick update of the DB record since post_fill_tasks() may + # block for a long time. + await db_handler.write(error=error) + plot_paths = await post_fill_tasks( handler, notifier=notifier, @@ -638,29 +641,7 @@ async def ln2( error = validate_error log.info("Writing fill metadata to database.") - if json_handler and json_path: - json_handler.flush() - with json_path.open("r") as ff: - log_data = [json.loads(line) for line in ff.readlines()] - - record_pk = await write_fill_to_db( - handler, - api_db_route=config.internal_config["api_routes"]["register_fill"], - plot_paths=plot_paths, - db_extra_payload={ - "error": str(error) if error is not None else None, - "action": action.value, - "log_file": str(config.log_path) if config.log_path else None, - "json_file": str(json_path) - if json_path and config.write_json - else None, - "log_data": log_data, - "configuration": configuration_json, - "valve_times": handler.get_valve_times(as_string=True), - }, - ) - if record_pk: - log.debug(f"Record {record_pk} created in the database.") + await db_handler.write(complete=True, plot_paths=plot_paths, error=error) if config.notify: images = { diff --git a/src/lvmcryo/handlers/ln2.py b/src/lvmcryo/handlers/ln2.py index d36cbc1..9e2e0e9 100644 --- a/src/lvmcryo/handlers/ln2.py +++ b/src/lvmcryo/handlers/ln2.py @@ -13,7 +13,7 @@ import logging from dataclasses import dataclass, field -from typing import Coroutine, Literal, NoReturn, overload +from typing import Any, Callable, Coroutine, Literal, NoReturn, overload import sshkeyboard from pydantic import BaseModel, field_serializer @@ -281,6 +281,7 @@ async def purge( min_purge_time: float | None = None, max_purge_time: float | None = None, prompt: bool | None = None, + preopen_cb: Callable[[], Coroutine | Any] | None = None, ): """Purges the system. @@ -301,6 +302,8 @@ async def purge( prompt Whether to show a prompt to stop or cancel the purge. If ``None``, determined from the instance ``interactive`` attribute. + preopen_cb + A callback to run before opening the purge valve. """ @@ -322,6 +325,12 @@ async def purge( self._kb_monitor(action="purge") try: + if preopen_cb: + if asyncio.iscoroutinefunction(preopen_cb): + await preopen_cb() + else: + preopen_cb() + await valve_handler.start_fill( min_open_time=min_purge_time or 0.0, max_open_time=max_purge_time, @@ -352,6 +361,7 @@ async def fill( min_fill_time: float | None = None, max_fill_time: float | None = None, prompt: bool | None = None, + preopen_cb: Callable[[], Coroutine | Any] | None = None, ): """Fills the selected cameras. @@ -376,6 +386,8 @@ async def fill( prompt Whether to show a prompt to stop or cancel the fill. If ``None``, determined from the instance ``interactive`` attribute. + preopen_cb + A callback to run before opening the fill valves. """ @@ -413,6 +425,12 @@ async def fill( self._kb_monitor(action="fill") try: + if preopen_cb: + if asyncio.iscoroutinefunction(preopen_cb): + await preopen_cb() + else: + preopen_cb() + await asyncio.gather(*fill_tasks) if not self.aborted: diff --git a/src/lvmcryo/runner.py b/src/lvmcryo/runner.py index 8e2c2df..7131149 100644 --- a/src/lvmcryo/runner.py +++ b/src/lvmcryo/runner.py @@ -31,6 +31,7 @@ if TYPE_CHECKING: from lvmcryo.config import Config + from lvmcryo.tools import DBHandler __all__ = ["ln2_runner"] @@ -58,6 +59,7 @@ async def ln2_runner( handler: LN2Handler, config: Config, notifier: Notifier | None = None, + db_handler: DBHandler | None = None, ): """Runs the purge/fill process. @@ -128,6 +130,7 @@ async def ln2_runner( min_purge_time=config.min_purge_time, max_purge_time=max_purge_time, prompt=not config.no_prompt, + preopen_cb=db_handler.write if db_handler is not None else None, ) if handler.failed or handler.aborted: @@ -143,6 +146,7 @@ async def ln2_runner( min_fill_time=config.min_fill_time, max_fill_time=max_fill_time, prompt=not config.no_prompt, + preopen_cb=db_handler.write if db_handler is not None else None, ) if handler.failed or handler.aborted: diff --git a/src/lvmcryo/tools.py b/src/lvmcryo/tools.py index 40ee823..e9d64c5 100644 --- a/src/lvmcryo/tools.py +++ b/src/lvmcryo/tools.py @@ -10,13 +10,15 @@ import asyncio import contextlib +import json import logging import os import pathlib import time +import warnings from contextlib import suppress from functools import partial -from logging import getLogger +from logging import FileHandler, getLogger from typing import TYPE_CHECKING, Any @@ -32,6 +34,7 @@ if TYPE_CHECKING: from datetime import datetime + from lvmcryo.config import Config from lvmcryo.handlers.ln2 import LN2Handler @@ -276,54 +279,125 @@ def date_json(date: datetime | None) -> str | None: return date.isoformat() if date else None -async def write_fill_to_db( - handler: LN2Handler, - api_db_route: str = "http://lvm-hub.lco.cl:8090/api/spectrographs/fills/register", - plot_paths: dict[str, pathlib.Path] = {}, - db_extra_payload: dict[str, Any] = {}, -): - """Records the fill to the database. +class DBHandler: + """Handles writing the fill to the database. Parameters ---------- + action + The action being performed. handler The `.LN2Handler` instance. + config + The configuration object. api_db_route The API route to write the data to the database. - plot_paths - A dictionary with the paths to the plots. - db_extra_payload - Extra payload to send to the database registration endpoint. - - Returns - ------- - pk - The primary key of the new record in the database. + json_handler + The logging handler used to write JSON data. """ - event_times = handler.event_times - - async with httpx.AsyncClient(follow_redirects=True) as client: - response = await client.post( - api_db_route, - json={ - "start_time": date_json(event_times.start_time), - "end_time": date_json(event_times.end_time), - "purge_start": date_json(event_times.purge_start), - "purge_complete": date_json(event_times.purge_complete), - "fill_start": date_json(event_times.fill_start), - "fill_complete": date_json(event_times.fill_complete), - "fail_time": date_json(event_times.fail_time), - "abort_time": date_json(event_times.abort_time), - "failed": handler.failed, - "aborted": handler.aborted, - "plot_paths": {k: str(v) for k, v in plot_paths.items()}, - **db_extra_payload, - }, - ) - response.raise_for_status() + def __init__( + self, + action: str, + handler: LN2Handler, + config: Config, + api_route: str | None = None, + json_handler: FileHandler | None = None, + ) -> None: + self.pk: int | None = None + + self.action = action + self.handler = handler + self.config = config + self.api_route = api_route or config.internal_config["api_routes.register_fill"] + + self.json_handler = json_handler + + def get_log_data(self): + """Returns the log data for the fill.""" + + if self.json_handler: + self.json_handler.flush() + json_path = pathlib.Path(self.json_handler.baseFilename) + with json_path.open("r") as ff: + return [json.loads(line) for line in ff.readlines()] - record_id = response.json() + return None + + async def write( + self, + complete: bool = False, + plot_paths: dict[str, pathlib.Path] = {}, + error: Exception | str | None = None, + raise_on_error: bool = False, + ): + """Records the fill to the database. - return record_id + Parameters + ---------- + complete + Whether the action is complete. + plot_paths + A dictionary with the paths to the plots. + error + The error message or ``None`` if no error. + raise_on_error + Whether to raise an exception if the write fails. + + Returns + ------- + pk + The primary key of the new record in the database. + + """ + + event_times = self.handler.event_times + log_path = self.config.log_path + + json_path = self.json_handler.baseFilename if self.json_handler else None + json_file = str(json_path) if json_path and self.config.write_json else None + + configuration_json = self.config.model_dump() | { + valve: valve_model.model_dump() + for valve, valve_model in self.config.valve_info.items() + } + + async with httpx.AsyncClient(follow_redirects=True) as client: + response = await client.post( + self.api_route, + json={ + "action": self.action, + "complete": complete, + "pk": self.pk, + "start_time": date_json(event_times.start_time), + "end_time": date_json(event_times.end_time), + "purge_start": date_json(event_times.purge_start), + "purge_complete": date_json(event_times.purge_complete), + "fill_start": date_json(event_times.fill_start), + "fill_complete": date_json(event_times.fill_complete), + "fail_time": date_json(event_times.fail_time), + "abort_time": date_json(event_times.abort_time), + "failed": self.handler.failed, + "aborted": self.handler.aborted, + "plot_paths": {k: str(v) for k, v in plot_paths.items()}, + "log_file": str(log_path) if log_path else None, + "valve_times": self.handler.get_valve_times(as_string=True), + "json_file": json_file, + "log_data": self.get_log_data(), + "configuration": configuration_json, + "error": str(error) if error is not None else None, + }, + ) + + if response.status_code != 200: + if raise_on_error: + raise RuntimeError(f"Error writing to the DB: {response.text}") + else: + warnings.warn(f"Error writing to the DB: {response.text}") + + return self.pk + + self.pk = response.json() + + return self.pk