diff --git a/python/lsst/consdb/pqserver.py b/python/lsst/consdb/pqserver.py index c2c3024a..81da069d 100644 --- a/python/lsst/consdb/pqserver.py +++ b/python/lsst/consdb/pqserver.py @@ -19,24 +19,96 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -from typing import Any, Iterable +from enum import Enum +from typing import Annotated, Any, Iterable, Optional +from flask import Flask, request +from fastapi import FastAPI, APIRouter, Depends, Path import sqlalchemy import sqlalchemy.dialects.postgresql -from flask import Flask, request +from pydantic import BaseModel, Field, validator +from safir.metadata import Metadata, get_metadata from .utils import setup_logging, setup_postgres -OBS_TYPE_LIST = ["exposure", "visit1", "ccdexposure", "ccdvisit1"] -DTYPE_LIST = ["bool", "int", "float", "str"] +internal_router = APIRouter() +external_router = APIRouter() + + +class ObsTypeEnum(str, Enum): + EXPOSURE = "exposure" + VISIT1 = "visit1" + CCD_EXPOSURE = "ccdexposure" + CCD_VISIT1 = "ccdvisit1" + + @classmethod + def _missing_(cls, value): + """Makes the enum case-insensitive, see https://docs.python.org/3/library/enum.html""" + value = value.lower() + for member in cls: + if member.value == value: + return member + return None + + +ObservationIdType = int + +AllowedFlexType = bool | int | float | str +AllowedFlexTypeEnum = Enum( + "AllowedFlexTypeEnum", {t.__name__.upper(): t.__name__ for t in AllowedFlexType.__args__} +) + + +def convert_to_flex_type(ty: AllowedFlexTypeEnum, v: str) -> AllowedFlexType: + """Converts a string containing a flex database value into the appropriate type. + + Raises + ====== + RuntimeError if ty does not match an allowed flex type + + ValueError if the conversion is invalid + """ + if ty.value == "bool": # Special case + return v.lower() in ("true", "t", "1") + m = [t for t in AllowedFlexType.__args__ if t.__name__ == ty] + assert len(m) == 1 + return m[0](v) + + +class ObsIdColname(str, Enum): + CCD_VISIT_ID = "ccdvisit_id" + VISIT_ID = "visit_id" + CCDEXPOSURE_ID = "ccdexposure_id" + EXPOSURE_ID = "exposure_id" + OBS_ID = "obs_id" + + @classmethod + def _missing_(cls, value): + value = value.lower() + for member in cls: + if member.value == value: + return member + return None + + +def validate_instrument_name( + instrument: str = Path(..., description="Must be a valid instrument name (e.g., ``LATISS``)"), +) -> str: + global instrument_tables + instrument_lower = instrument.lower() + if instrument_lower not in [i.lower() for i in instrument_tables.instrument_list]: + raise HTTPException( + status_code=400, + detail=f"Invalid instrument name {instrument}, must be one of " + + ",".join(instrument_tables.instrument_list), + ) + return instrument_lower -OBS_ID_COLNAME_LIST = ["ccdvisit_id", "visit_id", "ccdexposure_id", "exposure_id", "obs_id"] #################### # Global app setup # #################### - -app = Flask(__name__) +app = FastAPI() engine = setup_postgres() logger = setup_logging(__name__) @@ -51,9 +123,7 @@ class InstrumentTables: def __init__(self): inspector = sqlalchemy.inspect(engine) - self.instrument_list = [ - name[4:] for name in inspector.get_schema_names() if name.startswith("cdb_") - ] + self.instrument_list = [name[4:] for name in inspector.get_schema_names() if name.startswith("cdb_")] self.table_names = set() self.schemas = dict() self.flexible_metadata_schemas = dict() @@ -66,11 +136,13 @@ def __init__(self): self.obs_id_column[instrument] = dict() self.flexible_metadata_schemas[instrument] = dict() for table in md.tables: - for col_name in OBS_ID_COLNAME_LIST: + for col_name in ObsIdColname: + col_name = col_name.value if col_name in md.tables[table].columns: self.obs_id_column[instrument][table] = col_name break - for obs_type in OBS_TYPE_LIST: + for obs_type in ObsTypeEnum: + obs_type = obs_type.value table_name = f"cdb_{instrument}.{obs_type}_flexdata" schema_table_name = table_name + "_schema" if table_name in md.tables and schema_table_name in md.tables: @@ -326,6 +398,7 @@ def to_dict(self) -> dict[str, Any]: return data +''' @app.errorhandler(BadJsonException) def handle_bad_json(e: BadJsonException) -> tuple[dict[str, Any], int]: """Handle a BadJsonException by returning its content as JSON.""" @@ -342,6 +415,7 @@ def handle_bad_value(e: BadValueException) -> tuple[dict[str, Any], int]: def handle_sql_error(e: sqlalchemy.exc.SQLAlchemyError) -> tuple[dict[str, str], int]: """Handle a SQLAlchemyError by returning its content as JSON.""" return {"message": str(e)}, 500 +''' ################################### @@ -349,8 +423,15 @@ def handle_sql_error(e: sqlalchemy.exc.SQLAlchemyError) -> tuple[dict[str, str], ################################### -@app.get("/") -def root() -> dict[str, list[str]]: +@internal_router.get( + "/", + description="Metadata and health check endpoint.", + include_in_schema=False, + response_model=Metadata, + response_model_exclude_none=True, + summary="Application metadata", +) +async def internal_root() -> Metadata: """Root URL for liveness checks. Returns @@ -359,128 +440,123 @@ def root() -> dict[str, list[str]]: JSON response with a list of instruments, observation types, and data types. """ - global instrument_tables + return get_metadata( + package_name="consdb-pqserver", + application_name=config.name, + ) - # Don't log liveness checks. - data = { - "instruments": instrument_tables.instrument_list, - "obs_types": OBS_TYPE_LIST, - "dtypes": DTYPE_LIST, - } - return data +class Index(BaseModel): + """Metadata returned by the external root URL.""" -@app.get("/consdb") -def root2() -> dict[str, list[str]]: - """Application root URL. + instruments: list[str] = Field(..., title="Available instruments") + obs_types: list[str] = Field(..., title="Available observation types") + dtypes: list[str] = Field(..., title="Allowed data types in flexible metadata") - Returns - ------- - json_dict: `dict` [ `str`, `Any` ] - JSON response with a list of instruments, observation types, and - data types. - """ + +@external_router.get( + "/", + description="Application root", + response_model=Index, + response_model_exclude_none=True, + summary="Application root", +) +async def external_root() -> Index: + """Application root URL /consdb/.""" global instrument_tables logger.info(request) - data = { - "instruments": instrument_tables.instrument_list, - "obs_types": OBS_TYPE_LIST, - "dtypes": DTYPE_LIST, - } - return data - - -@app.post("/consdb/flex///addkey") -def add_flexible_metadata_key(instrument: str, obs_type: str) -> dict[str, Any] | tuple[dict[str, str], int]: - """Add a key to a flexible metadata table. - - Parameters - ---------- - instrument: `str` - Name of the instrument (e.g. ``LATISS``). - obs_type: `str` - Name of the observation type (e.g. ``Exposure``). - key: `str` - Key to add (POST JSON data). - dtype: `str` - Data type of key's value from ``DTYPE_LIST`` (POST JSON data). - doc: `str` - Documentation string (POST JSON data). - unit: `str`, optional - Unit for value (POST JSON data). - ucd: `str`, optional - IVOA Unified Content Descriptor (https://www.ivoa.net/documents/UCD1+/) - for value (POST JSON data). - - Returns - ------- - json_dict: `dict` [ `str`, `Any` ] - JSON response with 200 HTTP status on success. - - Raises - ------ - BadJsonException - Raised if JSON is absent or missing a required key. - - BadValueException - Raised if instrument or observation type is invalid. - """ + return Index( + instruments=instrument_tables.instrument_list, + obs_types=[o.value for o in ObsTypeEnum], + dtypes=[d.value for d in AllowedTypesEnum], + ) + + +class AddKeyRequestModel(BaseModel): + key: str = Field(..., title="The name of the added key") + dtype: AllowedFlexTypeEnum = Field(..., title="Data type for the added key") + doc: str = Field(..., title="Documentation string for the new key") + unit: Optional[str] = Field(..., title="Unit for value") + ucd: Optional[str] = Field( + ..., title="IVOA Unified Content Descriptor (https://www.ivoa.net/documents/UCD1+/)" + ) + + @validator("unit") + def validate_unit(cls, v): + try: + unit = astropy.units.Unit(v) + except ValueError: + raise ValueError(f"'{v}' is a not a valid unit.") + return v + + @validator("ucd") + def validate_ucd(cls, v): + if not astropy.io.votable.ucd.check_ucd(v): + raise ValueError(f"'{v}' is not a valid IVOA UCD.") + return v + + +class AddKeyResponseModel(BaseModel): + """Response model for the addkey endpoint.""" + + message: str = Field(..., title="Human-readable response message") + key: str = Field(..., title="The name of the added key") + instrument: str = (Depends(validate_instrument_name),) + obs_type: ObsTypeEnum = Field(..., title="The observation type that owns the new key") + + +@external_router.post( + "/flex/{instrument}/{obs_type}/addkey", + summary="Add a flexible metadata key for the specified instrument and obs_type.", + response_model=AddKeyResponseModel, +) +async def add_flexible_metadata_key( + instrument: Annotated[str, Depends(validate_instrument_name)], + obs_type: ObsTypeEnum, + data: AddKeyRequestModel, +) -> AddKeyResponseModel: + """Add a key to a flexible metadata table.""" global instrument_tables logger.info(f"{request} {request.json}") info = _check_json(request.json, "flex addkey", ("key", "dtype", "doc")) schema_table = instrument_tables.get_flexible_metadata_schema(instrument, obs_type) - key = info["key"] - dtype = info["dtype"] - if dtype not in DTYPE_LIST: - raise BadValueException("dtype", dtype, DTYPE_LIST) - doc = info["doc"] - unit = info.get("unit") - ucd = info.get("ucd") - stmt = sqlalchemy.insert(schema_table).values(key=key, dtype=dtype, doc=doc, unit=unit, ucd=ucd) + stmt = sqlalchemy.insert(schema_table).values( + key=data.key, + dtype=data.dtype, + doc=data.doc, + unit=data.unit, + ucd=data.ucd, + ) logger.debug(str(stmt)) with engine.connect() as conn: _ = conn.execute(stmt) conn.commit() # Update cached copy without re-querying database. instrument_tables.flexible_metadata_schemas[instrument.lower()][obs_type.lower()][key] = [ - dtype, - doc, - unit, - ucd, + data.dtype, + data.doc, + data.unit, + data.ucd, ] - return { - "message": "Key added to flexible metadata", - "key": key, - "instrument": instrument, - "obs_type": obs_type, - } - - -@app.get("/consdb/flex///schema") -def get_flexible_metadata_keys(instrument: str, obs_type: str) -> dict[str, list[str | None]]: - """Retrieve descriptions of keys for a flexible metadata table. - - Parameters - ---------- - instrument: `str` - Name of the instrument (e.g. ``LATISS``). - obs_type: `str` - Name of the observation type (e.g. ``Exposure``). - - Returns - ------- - json_dict: `dict` [ `str`, `list` [ `str` | `None` ] ] - JSON response with 200 HTTP status on success. - Response is a dictionary of ``dtype``, ``doc``, ``unit``, and ``ucd`` - strings for each key in the table. - - Raises - ------ - BadValueException - Raised if instrument or observation type is invalid. - """ + return AddKeyResponse( + message="Key added to flexible metadata", + key=data.key, + instrument=instrument, + obs_type=data.obs_type, + ) + + +@external_router.get( + "/flex/{instrument}/{obs_type}/schema", + description="Flex schema for the given instrument and observation type.", +) +async def get_flexible_metadata_keys( + instrument: Annotated[str, Depends(validate_instrument_name)], + obs_type: ObsTypeEnum, +) -> dict[str, list[str | None]]: + """Returns the flex schema for the given instrument and observation type.""" global instrument_tables logger.info(request) @@ -491,36 +567,19 @@ def get_flexible_metadata_keys(instrument: str, obs_type: str) -> dict[str, list return instrument_tables.flexible_metadata_schemas[instrument][obs_type] -@app.get("/consdb/flex///obs/") -def get_flexible_metadata(instrument: str, obs_type: str, obs_id: int) -> dict[str, Any]: - """Retrieve values for an observation from a flexible metadata table. - - Parameters - ---------- - instrument: `str` - Name of the instrument (e.g. ``LATISS``). - obs_type: `str` - Name of the observation type (e.g. ``Exposure``). - obs_id: `int` - Unique observation identifier. - - Returns - ------- - json_dict: `dict` [ `str`, `Any` ] - JSON response with 200 HTTP status on success. - Response is a dictionary of ``key``, ``value`` pairs with values - converted from strings. - - Raises - ------ - BadValueException - Raised if instrument or observation type is invalid. - """ +@external_router.get( + "/flex/{instrument}/{obs_type}/obs/{obs_id}", + description="Flex schema for the given instrument and observation type.", +) +async def get_flexible_metadata( + instrument: Annotated[str, Depends(validate_instrument_name)], + obs_type: ObsTypeEnum, + obs_id: ObservationIdType, +) -> dict[str, AllowedFlexType]: + """Retrieve values for an observation from a flexible metadata table.""" global instrument_tables logger.info(request) - instrument = instrument.lower() - obs_type = obs_type.lower() table = instrument_tables.get_flexible_metadata_table(instrument, obs_type) schema = instrument_tables.flexible_metadata_schemas[instrument][obs_type] result = dict() @@ -536,55 +595,29 @@ def get_flexible_metadata(instrument: str, obs_type: str, obs_id: int) -> dict[s instrument_tables.refresh_flexible_metadata_schema(instrument, obs_type) schema = instrument_tables.flexible_metadata_schemas[instrument][obs_type] dtype = schema[key][0] - if dtype == "bool": - result[key] = value == "True" - elif dtype == "int": - result[key] = int(value) - elif dtype == "float": - result[key] = float(value) - else: - result[key] = str(value) + result[key] = convert_to_flex_type(AllowedFlexTypeEnum(key), value) return result -@app.post("/consdb/flex///obs/") -def insert_flexible_metadata( - instrument: str, obs_type: str, obs_id: int -) -> dict[str, Any] | tuple[dict[str, str], int]: - """Insert or update key/value pairs in a flexible metadata table. +class GenericResponse(BaseModel): + message: str = Field(..., title="Human-readable response message") + instrument: str = Field(..., title="Instrument name (e.g., ``LATISS``)") + obs_type: ObsTypeEnum = Field(..., title="The observation type (e.g., ``exposure``)") + obs_id: ObservationIdType | list[ObservationIdType] = Field(..., title="Observation ID") + table: Optional[str] = Field(..., title="Table name") - Parameters - ---------- - instrument: `str` - Name of the instrument (e.g. ``LATISS``). - obs_type: `str` - Name of the observation type (e.g. ``Exposure``). - obs_id: `int` - Unique observation identifier. - u: `str` - Allow update if set to "1" (URL query parameter). - values: `dict` [ `str`, `Any` ] - Dictionary of key/value pairs to insert or update (JSON POST data). - Returns - ------- - json_dict: `dict` [ `str`, `Any` ] - JSON response with 200 HTTP status on success. - - Raises - ------ - BadJsonException - Raised if JSON is absent or missing a required key. - - BadValueException - Raised if instrument or observation type is invalid. - """ +@external_router.post("/flex/{instrument}/{obs_type}/obs/{obs_id}") +def insert_flexible_metadata( + instrument: Annotated[str, Depends(validate_instrument_name)], + obs_type: ObsTypeEnum, + obs_id: ObservationIdType, +) -> GenericResponse: + """Insert or update key/value pairs in a flexible metadata table.""" global instrument_tables logger.info(f"{request} {request.json}") info = _check_json(request.json, "flex obs", ("values",)) - instrument = instrument.lower() - obs_type = obs_type.lower() table = instrument_tables.get_flexible_metadata_table(instrument, obs_type) schema = instrument_tables.flexible_metadata_schemas[instrument][obs_type] @@ -598,14 +631,8 @@ def insert_flexible_metadata( # check value against dtype dtype = schema[key][0] - if dtype == "bool" and not isinstance(value, bool): - raise BadValueException("bool value", value) - elif dtype == "int" and not isinstance(value, int): - raise BadValueException("int value", value) - elif dtype == "float" and not isinstance(value, float): - raise BadValueException("float value", value) - elif dtype == "str" and not isinstance(value, str): - raise BadValueException("str value", value) + if dtype != type(value).__name__: + raise BadValueException(f"{dtype} value", value) with engine.connect() as conn: for key, value in value_dict.items(): @@ -623,44 +650,21 @@ def insert_flexible_metadata( _ = conn.execute(stmt) conn.commit() - return { - "message": "Flexible metadata inserted", - "obs_id": obs_id, - "instrument": instrument, - "obs_type": obs_type, - } - - -@app.post("/consdb/insert///obs/") -def insert(instrument: str, table: str, obs_id: int) -> dict[str, Any] | tuple[dict[str, str], int]: - """Insert or update column/value pairs in a ConsDB table. - - Parameters - ---------- - instrument: `str` - Name of the instrument (e.g. ``LATISS``). - table: `str` - Name of table to insert into. - obs_id: `int` - Unique observation identifier. - u: `str` - Allow update if set to "1" (URL query parameter). - values: `dict` [ `str`, `Any` ] - Dictionary of key/value pairs to insert or update (JSON POST data). - - Returns - ------- - json_dict: `dict` [ `str`, `Any` ] - JSON response with 200 HTTP status on success. - - Raises - ------ - BadJsonException - Raised if JSON is absent or missing a required key. - - BadValueException - Raised if instrument or observation type is invalid. - """ + return GenericResponse( + message="Flexible metadata inserted", + obs_id=obs_id, + instrument=instrument, + obs_type=obs_type, + ) + + +@external_router.post("/insert/{instrument}/{table}/obs/{obs_id}") +def insert( + instrument: Annotated[str, Depends(validate_instrument_name)], + table: str, + obs_id: ObservationIdType, +) -> GenericResponse: + """Insert or update column/value pairs in a ConsDB table.""" global instrument_tables logger.info(f"{request} {request.json}") @@ -690,16 +694,19 @@ def insert(instrument: str, table: str, obs_id: int) -> dict[str, Any] | tuple[d with engine.connect() as conn: _ = conn.execute(stmt) conn.commit() - return { - "message": "Data inserted", - "instrument": instrument, - "table": table_name, - "obs_id": obs_id, - } - - -@app.post("/consdb/insert//
") -def insert_multiple(instrument: str, table: str) -> dict[str, Any] | tuple[dict[str, str], int]: + return GenericResponse( + message="Data inserted", + instrument=instrument, + table=table_name, + obs_id=obs_id, + ) + + +@external_router.post("/insert/{instrument}/{table}") +def insert_multiple( + instrument: Annotated[str, Depends(validate_instrument_name)], + table: str, +) -> dict[str, Any] | tuple[dict[str, str], int]: """Insert or update multiple observations in a ConsDB table. Parameters @@ -745,7 +752,7 @@ def insert_multiple(instrument: str, table: str) -> dict[str, Any] | tuple[dict[ with engine.connect() as conn: for obs_id, valdict in info["obs_dict"]: - if not isinstance(obs_id, int): + if not isinstance(obs_id, ObservationIdType): raise BadValueException("obs_id value", obs_id) valdict[obs_id_colname] = obs_id @@ -763,18 +770,20 @@ def insert_multiple(instrument: str, table: str) -> dict[str, Any] | tuple[dict[ _ = conn.execute(stmt) conn.commit() - return { - "message": "Data inserted", - "table": table_name, - "instrument": instrument, - "obs_ids": info["obs_dict"].keys(), - } + return GenericResponse( + message="Data inserted", + table=table_name, + instrument=instrument, + obs_id=info["obs_dict"].keys(), + ) -@app.get("/consdb/query///obs/") +@external_router.get("/query/{instrument}/{obs_type}/obs/{obs_id}") def get_all_metadata( - instrument: str, obs_type: str, obs_id: int -) -> dict[str, Any] | tuple[dict[str, str], int]: + instrument: Annotated[str, Depends(validate_instrument_name)], + obs_type: ObsTypeEnum, + obs_id: ObservationIdType, +) -> dict[str, Any]: """Get all information about an observation. Parameters @@ -818,7 +827,7 @@ def get_all_metadata( return result -@app.post("/consdb/query") +@external_router.post("/query") def query() -> dict[str, Any] | tuple[dict[str, str], int]: """Query the ConsDB database. @@ -856,59 +865,29 @@ def query() -> dict[str, Any] | tuple[dict[str, str], int]: return result -@app.get("/consdb/schema") +@external_router.get("/schema") def list_instruments() -> list[str]: - """Retrieve the list of instruments available in ConsDB." - - Returns - ------- - json_list: `list` [ `str` ] - JSON response with 200 HTTP status on success. - Response is a list of instrument names. - - Raises - ------ - BadValueException - Raised if instrument is invalid. - """ + """Retrieve the list of instruments available in ConsDB.""" global instrument_tables logger.info(request) return instrument_tables.instrument_list -@app.get("/consdb/schema/") -def list_table(instrument: str) -> list[str]: - """Retrieve the list of tables for an instrument. - - Parameters - ---------- - instrument: `str` - Name of the instrument (e.g. ``LATISS``). - - Returns - ------- - json_list: `list` [ `str` ] - JSON response with 200 HTTP status on success. - Response is a list of table names. - - Raises - ------ - BadValueException - Raised if instrument is invalid. - """ +@external_router.get("/consdb/schema/{instrument}") +def list_table( + instrument: Annotated[str, Depends(validate_instrument_name)], +) -> list[str]: + """Retrieve the list of tables for an instrument.""" global instrument_tables logger.info(request) - instrument = instrument.lower() - if instrument not in instrument_tables.schemas: - raise BadValueException("instrument", instrument, list(instrument_tables.schemas.keys())) schema = instrument_tables.schemas[instrument] return list(schema.tables.keys()) -@app.get("/consdb/schema//
") -def schema(instrument: str, table: str) -> dict[str, list[str]]: +@external_router.get("/schema/{instrument}/
") +def schema(instrument: Annotated[str, Depends(validate_instrument_name)], table: str) -> dict[str, list[str]]: """Retrieve the descriptions of columns in a ConsDB table. Parameters @@ -933,11 +912,8 @@ def schema(instrument: str, table: str) -> dict[str, list[str]]: global instrument_tables logger.info(request) - instrument = instrument.lower() - if instrument not in instrument_tables.schemas: - raise BadValueException("instrument", instrument, list(instrument_tables.schemas.keys())) schema = instrument_tables.schemas[instrument] - if not table.startswith(f"cdb_{instrument}"): + if not table.startswith(f"cdb_{instrument}."): table = f"cdb_{instrument}.{table}" table = table.lower() if table not in schema.tables: