From 164535c5fad326b3161f1a17ec282ce6d66331df Mon Sep 17 00:00:00 2001 From: Brian Brondel Date: Mon, 29 Jul 2024 16:49:53 -0700 Subject: [PATCH 01/13] A very rough first pass changing Flask to FastAPI --- python/lsst/consdb/pqserver.py | 561 ++++++++++++++++----------------- 1 file changed, 269 insertions(+), 292 deletions(-) diff --git a/python/lsst/consdb/pqserver.py b/python/lsst/consdb/pqserver.py index 238fcacb..81da069d 100644 --- a/python/lsst/consdb/pqserver.py +++ b/python/lsst/consdb/pqserver.py @@ -19,25 +19,96 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -from typing import Any, Iterable +from enum import Enum +from typing import Annotated, Any, Iterable, Optional +from flask import Flask, request +from fastapi import FastAPI, APIRouter, Depends, Path import sqlalchemy import sqlalchemy.dialects.postgresql -from flask import Flask, request - +from pydantic import BaseModel, Field, validator +from safir.metadata import Metadata, get_metadata from .utils import setup_logging, setup_postgres -OBS_TYPE_LIST = ["exposure", "visit1", "ccdexposure", "ccdvisit1"] -DTYPE_LIST = ["bool", "int", "float", "str"] +internal_router = APIRouter() +external_router = APIRouter() + + +class ObsTypeEnum(str, Enum): + EXPOSURE = "exposure" + VISIT1 = "visit1" + CCD_EXPOSURE = "ccdexposure" + CCD_VISIT1 = "ccdvisit1" + + @classmethod + def _missing_(cls, value): + """Makes the enum case-insensitive, see https://docs.python.org/3/library/enum.html""" + value = value.lower() + for member in cls: + if member.value == value: + return member + return None + + +ObservationIdType = int + +AllowedFlexType = bool | int | float | str +AllowedFlexTypeEnum = Enum( + "AllowedFlexTypeEnum", {t.__name__.upper(): t.__name__ for t in AllowedFlexType.__args__} +) + + +def convert_to_flex_type(ty: AllowedFlexTypeEnum, v: str) -> AllowedFlexType: + """Converts a string containing a flex database value into the appropriate type. + + Raises + ====== + RuntimeError if ty does not match an allowed flex type + + ValueError if the conversion is invalid + """ + if ty.value == "bool": # Special case + return v.lower() in ("true", "t", "1") + m = [t for t in AllowedFlexType.__args__ if t.__name__ == ty] + assert len(m) == 1 + return m[0](v) + + +class ObsIdColname(str, Enum): + CCD_VISIT_ID = "ccdvisit_id" + VISIT_ID = "visit_id" + CCDEXPOSURE_ID = "ccdexposure_id" + EXPOSURE_ID = "exposure_id" + OBS_ID = "obs_id" + + @classmethod + def _missing_(cls, value): + value = value.lower() + for member in cls: + if member.value == value: + return member + return None + + +def validate_instrument_name( + instrument: str = Path(..., description="Must be a valid instrument name (e.g., ``LATISS``)"), +) -> str: + global instrument_tables + instrument_lower = instrument.lower() + if instrument_lower not in [i.lower() for i in instrument_tables.instrument_list]: + raise HTTPException( + status_code=400, + detail=f"Invalid instrument name {instrument}, must be one of " + + ",".join(instrument_tables.instrument_list), + ) + return instrument_lower -OBS_ID_COLNAME_LIST = ["ccdvisit_id", "visit_id", "ccdexposure_id", "exposure_id", "obs_id"] #################### # Global app setup # #################### - -app = Flask(__name__) +app = FastAPI() engine = setup_postgres() logger = setup_logging(__name__) @@ -65,11 +136,13 @@ def __init__(self): self.obs_id_column[instrument] = dict() self.flexible_metadata_schemas[instrument] = dict() for table in md.tables: - for col_name in OBS_ID_COLNAME_LIST: + for col_name in ObsIdColname: + col_name = col_name.value if col_name in md.tables[table].columns: self.obs_id_column[instrument][table] = col_name break - for obs_type in OBS_TYPE_LIST: + for obs_type in ObsTypeEnum: + obs_type = obs_type.value table_name = f"cdb_{instrument}.{obs_type}_flexdata" schema_table_name = table_name + "_schema" if table_name in md.tables and schema_table_name in md.tables: @@ -325,6 +398,7 @@ def to_dict(self) -> dict[str, Any]: return data +''' @app.errorhandler(BadJsonException) def handle_bad_json(e: BadJsonException) -> tuple[dict[str, Any], int]: """Handle a BadJsonException by returning its content as JSON.""" @@ -341,6 +415,7 @@ def handle_bad_value(e: BadValueException) -> tuple[dict[str, Any], int]: def handle_sql_error(e: sqlalchemy.exc.SQLAlchemyError) -> tuple[dict[str, str], int]: """Handle a SQLAlchemyError by returning its content as JSON.""" return {"message": str(e)}, 500 +''' ################################### @@ -348,8 +423,15 @@ def handle_sql_error(e: sqlalchemy.exc.SQLAlchemyError) -> tuple[dict[str, str], ################################### -@app.get("/") -def root() -> dict[str, list[str]]: +@internal_router.get( + "/", + description="Metadata and health check endpoint.", + include_in_schema=False, + response_model=Metadata, + response_model_exclude_none=True, + summary="Application metadata", +) +async def internal_root() -> Metadata: """Root URL for liveness checks. Returns @@ -358,128 +440,123 @@ def root() -> dict[str, list[str]]: JSON response with a list of instruments, observation types, and data types. """ - global instrument_tables + return get_metadata( + package_name="consdb-pqserver", + application_name=config.name, + ) - # Don't log liveness checks. - data = { - "instruments": instrument_tables.instrument_list, - "obs_types": OBS_TYPE_LIST, - "dtypes": DTYPE_LIST, - } - return data +class Index(BaseModel): + """Metadata returned by the external root URL.""" -@app.get("/consdb") -def root2() -> dict[str, list[str]]: - """Application root URL. + instruments: list[str] = Field(..., title="Available instruments") + obs_types: list[str] = Field(..., title="Available observation types") + dtypes: list[str] = Field(..., title="Allowed data types in flexible metadata") - Returns - ------- - json_dict: `dict` [ `str`, `Any` ] - JSON response with a list of instruments, observation types, and - data types. - """ + +@external_router.get( + "/", + description="Application root", + response_model=Index, + response_model_exclude_none=True, + summary="Application root", +) +async def external_root() -> Index: + """Application root URL /consdb/.""" global instrument_tables logger.info(request) - data = { - "instruments": instrument_tables.instrument_list, - "obs_types": OBS_TYPE_LIST, - "dtypes": DTYPE_LIST, - } - return data - - -@app.post("/consdb/flex///addkey") -def add_flexible_metadata_key(instrument: str, obs_type: str) -> dict[str, Any] | tuple[dict[str, str], int]: - """Add a key to a flexible metadata table. - - Parameters - ---------- - instrument: `str` - Name of the instrument (e.g. ``LATISS``). - obs_type: `str` - Name of the observation type (e.g. ``Exposure``). - key: `str` - Key to add (POST JSON data). - dtype: `str` - Data type of key's value from ``DTYPE_LIST`` (POST JSON data). - doc: `str` - Documentation string (POST JSON data). - unit: `str`, optional - Unit for value (POST JSON data). - ucd: `str`, optional - IVOA Unified Content Descriptor (https://www.ivoa.net/documents/UCD1+/) - for value (POST JSON data). - - Returns - ------- - json_dict: `dict` [ `str`, `Any` ] - JSON response with 200 HTTP status on success. - - Raises - ------ - BadJsonException - Raised if JSON is absent or missing a required key. - - BadValueException - Raised if instrument or observation type is invalid. - """ + return Index( + instruments=instrument_tables.instrument_list, + obs_types=[o.value for o in ObsTypeEnum], + dtypes=[d.value for d in AllowedTypesEnum], + ) + + +class AddKeyRequestModel(BaseModel): + key: str = Field(..., title="The name of the added key") + dtype: AllowedFlexTypeEnum = Field(..., title="Data type for the added key") + doc: str = Field(..., title="Documentation string for the new key") + unit: Optional[str] = Field(..., title="Unit for value") + ucd: Optional[str] = Field( + ..., title="IVOA Unified Content Descriptor (https://www.ivoa.net/documents/UCD1+/)" + ) + + @validator("unit") + def validate_unit(cls, v): + try: + unit = astropy.units.Unit(v) + except ValueError: + raise ValueError(f"'{v}' is a not a valid unit.") + return v + + @validator("ucd") + def validate_ucd(cls, v): + if not astropy.io.votable.ucd.check_ucd(v): + raise ValueError(f"'{v}' is not a valid IVOA UCD.") + return v + + +class AddKeyResponseModel(BaseModel): + """Response model for the addkey endpoint.""" + + message: str = Field(..., title="Human-readable response message") + key: str = Field(..., title="The name of the added key") + instrument: str = (Depends(validate_instrument_name),) + obs_type: ObsTypeEnum = Field(..., title="The observation type that owns the new key") + + +@external_router.post( + "/flex/{instrument}/{obs_type}/addkey", + summary="Add a flexible metadata key for the specified instrument and obs_type.", + response_model=AddKeyResponseModel, +) +async def add_flexible_metadata_key( + instrument: Annotated[str, Depends(validate_instrument_name)], + obs_type: ObsTypeEnum, + data: AddKeyRequestModel, +) -> AddKeyResponseModel: + """Add a key to a flexible metadata table.""" global instrument_tables logger.info(f"{request} {request.json}") info = _check_json(request.json, "flex addkey", ("key", "dtype", "doc")) schema_table = instrument_tables.get_flexible_metadata_schema(instrument, obs_type) - key = info["key"] - dtype = info["dtype"] - if dtype not in DTYPE_LIST: - raise BadValueException("dtype", dtype, DTYPE_LIST) - doc = info["doc"] - unit = info.get("unit") - ucd = info.get("ucd") - stmt = sqlalchemy.insert(schema_table).values(key=key, dtype=dtype, doc=doc, unit=unit, ucd=ucd) + stmt = sqlalchemy.insert(schema_table).values( + key=data.key, + dtype=data.dtype, + doc=data.doc, + unit=data.unit, + ucd=data.ucd, + ) logger.debug(str(stmt)) with engine.connect() as conn: _ = conn.execute(stmt) conn.commit() # Update cached copy without re-querying database. instrument_tables.flexible_metadata_schemas[instrument.lower()][obs_type.lower()][key] = [ - dtype, - doc, - unit, - ucd, + data.dtype, + data.doc, + data.unit, + data.ucd, ] - return { - "message": "Key added to flexible metadata", - "key": key, - "instrument": instrument, - "obs_type": obs_type, - } - - -@app.get("/consdb/flex///schema") -def get_flexible_metadata_keys(instrument: str, obs_type: str) -> dict[str, list[str | None]]: - """Retrieve descriptions of keys for a flexible metadata table. - - Parameters - ---------- - instrument: `str` - Name of the instrument (e.g. ``LATISS``). - obs_type: `str` - Name of the observation type (e.g. ``Exposure``). - - Returns - ------- - json_dict: `dict` [ `str`, `list` [ `str` | `None` ] ] - JSON response with 200 HTTP status on success. - Response is a dictionary of ``dtype``, ``doc``, ``unit``, and ``ucd`` - strings for each key in the table. - - Raises - ------ - BadValueException - Raised if instrument or observation type is invalid. - """ + return AddKeyResponse( + message="Key added to flexible metadata", + key=data.key, + instrument=instrument, + obs_type=data.obs_type, + ) + + +@external_router.get( + "/flex/{instrument}/{obs_type}/schema", + description="Flex schema for the given instrument and observation type.", +) +async def get_flexible_metadata_keys( + instrument: Annotated[str, Depends(validate_instrument_name)], + obs_type: ObsTypeEnum, +) -> dict[str, list[str | None]]: + """Returns the flex schema for the given instrument and observation type.""" global instrument_tables logger.info(request) @@ -490,36 +567,19 @@ def get_flexible_metadata_keys(instrument: str, obs_type: str) -> dict[str, list return instrument_tables.flexible_metadata_schemas[instrument][obs_type] -@app.get("/consdb/flex///obs/") -def get_flexible_metadata(instrument: str, obs_type: str, obs_id: int) -> dict[str, Any]: - """Retrieve values for an observation from a flexible metadata table. - - Parameters - ---------- - instrument: `str` - Name of the instrument (e.g. ``LATISS``). - obs_type: `str` - Name of the observation type (e.g. ``Exposure``). - obs_id: `int` - Unique observation identifier. - - Returns - ------- - json_dict: `dict` [ `str`, `Any` ] - JSON response with 200 HTTP status on success. - Response is a dictionary of ``key``, ``value`` pairs with values - converted from strings. - - Raises - ------ - BadValueException - Raised if instrument or observation type is invalid. - """ +@external_router.get( + "/flex/{instrument}/{obs_type}/obs/{obs_id}", + description="Flex schema for the given instrument and observation type.", +) +async def get_flexible_metadata( + instrument: Annotated[str, Depends(validate_instrument_name)], + obs_type: ObsTypeEnum, + obs_id: ObservationIdType, +) -> dict[str, AllowedFlexType]: + """Retrieve values for an observation from a flexible metadata table.""" global instrument_tables logger.info(request) - instrument = instrument.lower() - obs_type = obs_type.lower() table = instrument_tables.get_flexible_metadata_table(instrument, obs_type) schema = instrument_tables.flexible_metadata_schemas[instrument][obs_type] result = dict() @@ -535,55 +595,29 @@ def get_flexible_metadata(instrument: str, obs_type: str, obs_id: int) -> dict[s instrument_tables.refresh_flexible_metadata_schema(instrument, obs_type) schema = instrument_tables.flexible_metadata_schemas[instrument][obs_type] dtype = schema[key][0] - if dtype == "bool": - result[key] = value == "True" - elif dtype == "int": - result[key] = int(value) - elif dtype == "float": - result[key] = float(value) - else: - result[key] = str(value) + result[key] = convert_to_flex_type(AllowedFlexTypeEnum(key), value) return result -@app.post("/consdb/flex///obs/") -def insert_flexible_metadata( - instrument: str, obs_type: str, obs_id: int -) -> dict[str, Any] | tuple[dict[str, str], int]: - """Insert or update key/value pairs in a flexible metadata table. +class GenericResponse(BaseModel): + message: str = Field(..., title="Human-readable response message") + instrument: str = Field(..., title="Instrument name (e.g., ``LATISS``)") + obs_type: ObsTypeEnum = Field(..., title="The observation type (e.g., ``exposure``)") + obs_id: ObservationIdType | list[ObservationIdType] = Field(..., title="Observation ID") + table: Optional[str] = Field(..., title="Table name") - Parameters - ---------- - instrument: `str` - Name of the instrument (e.g. ``LATISS``). - obs_type: `str` - Name of the observation type (e.g. ``Exposure``). - obs_id: `int` - Unique observation identifier. - u: `str` - Allow update if set to "1" (URL query parameter). - values: `dict` [ `str`, `Any` ] - Dictionary of key/value pairs to insert or update (JSON POST data). - Returns - ------- - json_dict: `dict` [ `str`, `Any` ] - JSON response with 200 HTTP status on success. - - Raises - ------ - BadJsonException - Raised if JSON is absent or missing a required key. - - BadValueException - Raised if instrument or observation type is invalid. - """ +@external_router.post("/flex/{instrument}/{obs_type}/obs/{obs_id}") +def insert_flexible_metadata( + instrument: Annotated[str, Depends(validate_instrument_name)], + obs_type: ObsTypeEnum, + obs_id: ObservationIdType, +) -> GenericResponse: + """Insert or update key/value pairs in a flexible metadata table.""" global instrument_tables logger.info(f"{request} {request.json}") info = _check_json(request.json, "flex obs", ("values",)) - instrument = instrument.lower() - obs_type = obs_type.lower() table = instrument_tables.get_flexible_metadata_table(instrument, obs_type) schema = instrument_tables.flexible_metadata_schemas[instrument][obs_type] @@ -597,14 +631,8 @@ def insert_flexible_metadata( # check value against dtype dtype = schema[key][0] - if dtype == "bool" and not isinstance(value, bool): - raise BadValueException("bool value", value) - elif dtype == "int" and not isinstance(value, int): - raise BadValueException("int value", value) - elif dtype == "float" and not isinstance(value, float): - raise BadValueException("float value", value) - elif dtype == "str" and not isinstance(value, str): - raise BadValueException("str value", value) + if dtype != type(value).__name__: + raise BadValueException(f"{dtype} value", value) with engine.connect() as conn: for key, value in value_dict.items(): @@ -622,44 +650,21 @@ def insert_flexible_metadata( _ = conn.execute(stmt) conn.commit() - return { - "message": "Flexible metadata inserted", - "obs_id": obs_id, - "instrument": instrument, - "obs_type": obs_type, - } - - -@app.post("/consdb/insert///obs/") -def insert(instrument: str, table: str, obs_id: int) -> dict[str, Any] | tuple[dict[str, str], int]: - """Insert or update column/value pairs in a ConsDB table. - - Parameters - ---------- - instrument: `str` - Name of the instrument (e.g. ``LATISS``). - table: `str` - Name of table to insert into. - obs_id: `int` - Unique observation identifier. - u: `str` - Allow update if set to "1" (URL query parameter). - values: `dict` [ `str`, `Any` ] - Dictionary of key/value pairs to insert or update (JSON POST data). - - Returns - ------- - json_dict: `dict` [ `str`, `Any` ] - JSON response with 200 HTTP status on success. - - Raises - ------ - BadJsonException - Raised if JSON is absent or missing a required key. - - BadValueException - Raised if instrument or observation type is invalid. - """ + return GenericResponse( + message="Flexible metadata inserted", + obs_id=obs_id, + instrument=instrument, + obs_type=obs_type, + ) + + +@external_router.post("/insert/{instrument}/{table}/obs/{obs_id}") +def insert( + instrument: Annotated[str, Depends(validate_instrument_name)], + table: str, + obs_id: ObservationIdType, +) -> GenericResponse: + """Insert or update column/value pairs in a ConsDB table.""" global instrument_tables logger.info(f"{request} {request.json}") @@ -689,16 +694,19 @@ def insert(instrument: str, table: str, obs_id: int) -> dict[str, Any] | tuple[d with engine.connect() as conn: _ = conn.execute(stmt) conn.commit() - return { - "message": "Data inserted", - "instrument": instrument, - "table": table_name, - "obs_id": obs_id, - } - - -@app.post("/consdb/insert//
") -def insert_multiple(instrument: str, table: str) -> dict[str, Any] | tuple[dict[str, str], int]: + return GenericResponse( + message="Data inserted", + instrument=instrument, + table=table_name, + obs_id=obs_id, + ) + + +@external_router.post("/insert/{instrument}/{table}") +def insert_multiple( + instrument: Annotated[str, Depends(validate_instrument_name)], + table: str, +) -> dict[str, Any] | tuple[dict[str, str], int]: """Insert or update multiple observations in a ConsDB table. Parameters @@ -744,7 +752,7 @@ def insert_multiple(instrument: str, table: str) -> dict[str, Any] | tuple[dict[ with engine.connect() as conn: for obs_id, valdict in info["obs_dict"]: - if not isinstance(obs_id, int): + if not isinstance(obs_id, ObservationIdType): raise BadValueException("obs_id value", obs_id) valdict[obs_id_colname] = obs_id @@ -762,18 +770,20 @@ def insert_multiple(instrument: str, table: str) -> dict[str, Any] | tuple[dict[ _ = conn.execute(stmt) conn.commit() - return { - "message": "Data inserted", - "table": table_name, - "instrument": instrument, - "obs_ids": info["obs_dict"].keys(), - } + return GenericResponse( + message="Data inserted", + table=table_name, + instrument=instrument, + obs_id=info["obs_dict"].keys(), + ) -@app.get("/consdb/query///obs/") +@external_router.get("/query/{instrument}/{obs_type}/obs/{obs_id}") def get_all_metadata( - instrument: str, obs_type: str, obs_id: int -) -> dict[str, Any] | tuple[dict[str, str], int]: + instrument: Annotated[str, Depends(validate_instrument_name)], + obs_type: ObsTypeEnum, + obs_id: ObservationIdType, +) -> dict[str, Any]: """Get all information about an observation. Parameters @@ -817,7 +827,7 @@ def get_all_metadata( return result -@app.post("/consdb/query") +@external_router.post("/query") def query() -> dict[str, Any] | tuple[dict[str, str], int]: """Query the ConsDB database. @@ -855,59 +865,29 @@ def query() -> dict[str, Any] | tuple[dict[str, str], int]: return result -@app.get("/consdb/schema") +@external_router.get("/schema") def list_instruments() -> list[str]: - """Retrieve the list of instruments available in ConsDB." - - Returns - ------- - json_list: `list` [ `str` ] - JSON response with 200 HTTP status on success. - Response is a list of instrument names. - - Raises - ------ - BadValueException - Raised if instrument is invalid. - """ + """Retrieve the list of instruments available in ConsDB.""" global instrument_tables logger.info(request) return instrument_tables.instrument_list -@app.get("/consdb/schema/") -def list_table(instrument: str) -> list[str]: - """Retrieve the list of tables for an instrument. - - Parameters - ---------- - instrument: `str` - Name of the instrument (e.g. ``LATISS``). - - Returns - ------- - json_list: `list` [ `str` ] - JSON response with 200 HTTP status on success. - Response is a list of table names. - - Raises - ------ - BadValueException - Raised if instrument is invalid. - """ +@external_router.get("/consdb/schema/{instrument}") +def list_table( + instrument: Annotated[str, Depends(validate_instrument_name)], +) -> list[str]: + """Retrieve the list of tables for an instrument.""" global instrument_tables logger.info(request) - instrument = instrument.lower() - if instrument not in instrument_tables.schemas: - raise BadValueException("instrument", instrument, list(instrument_tables.schemas.keys())) schema = instrument_tables.schemas[instrument] return list(schema.tables.keys()) -@app.get("/consdb/schema//
") -def schema(instrument: str, table: str) -> dict[str, list[str]]: +@external_router.get("/schema/{instrument}/
") +def schema(instrument: Annotated[str, Depends(validate_instrument_name)], table: str) -> dict[str, list[str]]: """Retrieve the descriptions of columns in a ConsDB table. Parameters @@ -932,11 +912,8 @@ def schema(instrument: str, table: str) -> dict[str, list[str]]: global instrument_tables logger.info(request) - instrument = instrument.lower() - if instrument not in instrument_tables.schemas: - raise BadValueException("instrument", instrument, list(instrument_tables.schemas.keys())) schema = instrument_tables.schemas[instrument] - if not table.startswith(f"cdb_{instrument}"): + if not table.startswith(f"cdb_{instrument}."): table = f"cdb_{instrument}.{table}" table = table.lower() if table not in schema.tables: From ed1a3dc61536ea748bbad271d87b9eb5b5f37ed0 Mon Sep 17 00:00:00 2001 From: Brian Brondel Date: Mon, 29 Jul 2024 17:02:29 -0700 Subject: [PATCH 02/13] Move from deprecated validator decorator to field_decorator --- python/lsst/consdb/pqserver.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/python/lsst/consdb/pqserver.py b/python/lsst/consdb/pqserver.py index 81da069d..1d30167d 100644 --- a/python/lsst/consdb/pqserver.py +++ b/python/lsst/consdb/pqserver.py @@ -26,7 +26,7 @@ from fastapi import FastAPI, APIRouter, Depends, Path import sqlalchemy import sqlalchemy.dialects.postgresql -from pydantic import BaseModel, Field, validator +from pydantic import BaseModel, Field, field_validator from safir.metadata import Metadata, get_metadata from .utils import setup_logging, setup_postgres @@ -482,16 +482,16 @@ class AddKeyRequestModel(BaseModel): ..., title="IVOA Unified Content Descriptor (https://www.ivoa.net/documents/UCD1+/)" ) - @validator("unit") - def validate_unit(cls, v): + @field_validator("unit") + def validate_unit(v): try: unit = astropy.units.Unit(v) except ValueError: raise ValueError(f"'{v}' is a not a valid unit.") return v - @validator("ucd") - def validate_ucd(cls, v): + @field_validator("ucd") + def validate_ucd(v): if not astropy.io.votable.ucd.check_ucd(v): raise ValueError(f"'{v}' is not a valid IVOA UCD.") return v From aa2d69f20b9f910043fc4f74e1468957e7c98575 Mon Sep 17 00:00:00 2001 From: Brian Brondel Date: Wed, 31 Jul 2024 09:21:28 -0700 Subject: [PATCH 03/13] Compatibility fixes for FastAPI --- Dockerfile.pqserver | 6 ++--- python/lsst/consdb/pqserver.py | 49 +++++++++++++++++++++------------- tests/test_pqserver.py | 42 +++++++++++++++-------------- 3 files changed, 56 insertions(+), 41 deletions(-) diff --git a/Dockerfile.pqserver b/Dockerfile.pqserver index 22197876..5fdae7db 100644 --- a/Dockerfile.pqserver +++ b/Dockerfile.pqserver @@ -1,12 +1,12 @@ FROM python:3.11 -RUN pip install flask gunicorn sqlalchemy psycopg2 +RUN pip install fastapi safir astropy uvicorn gunicorn sqlalchemy psycopg2 WORKDIR / -COPY python/lsst/consdb/__init__.py python/lsst/consdb/pqserver.py python/lsst/consdb/utils.py /consdb-pq/ +COPY python/lsst/consdb/__init__.py python/lsst/consdb/pqserver.py python/lsst/consdb/utils.py /consdb_pq/ # Environment variables that must be set: # DB_HOST DB_PASS DB_USER DB_NAME or POSTGRES_URL # Expose the port. EXPOSE 8080 -ENTRYPOINT [ "gunicorn", "-b", "0.0.0.0:8080", "-w", "2", "consdb-pq.pqserver:app" ] +ENTRYPOINT [ "gunicorn", "-b", "0.0.0.0:8080", "-w", "2", "-k", "uvicorn.workers.UvicornWorker", "consdb_pq.pqserver:app" ] diff --git a/python/lsst/consdb/pqserver.py b/python/lsst/consdb/pqserver.py index 1d30167d..de030042 100644 --- a/python/lsst/consdb/pqserver.py +++ b/python/lsst/consdb/pqserver.py @@ -20,14 +20,15 @@ # along with this program. If not, see . from enum import Enum +from importlib.metadata import metadata, version from typing import Annotated, Any, Iterable, Optional -from flask import Flask, request from fastapi import FastAPI, APIRouter, Depends, Path import sqlalchemy import sqlalchemy.dialects.postgresql from pydantic import BaseModel, Field, field_validator from safir.metadata import Metadata, get_metadata +from safir.middleware.x_forwarded import XForwardedMiddleware from .utils import setup_logging, setup_postgres internal_router = APIRouter() @@ -108,10 +109,21 @@ def validate_instrument_name( # Global app setup # #################### -app = FastAPI() +path_prefix = "/consdb" +app = FastAPI( + title="consdb-pqserver", + description="HTTP API for consdb", + openapi_url=f"{path_prefix}/openapi.json", + docs_url=f"{path_prefix}/docs", + redoc_url=f"{path_prefix}/redoc", +) engine = setup_postgres() logger = setup_logging(__name__) +app.include_router(internal_router) +app.include_router(external_router, prefix=path_prefix) +app.add_middleware(XForwardedMiddleware) + ######################## # Schema preload class # @@ -423,15 +435,13 @@ def handle_sql_error(e: sqlalchemy.exc.SQLAlchemyError) -> tuple[dict[str, str], ################################### -@internal_router.get( +@app.get( "/", description="Metadata and health check endpoint.", include_in_schema=False, - response_model=Metadata, - response_model_exclude_none=True, summary="Application metadata", ) -async def internal_root() -> Metadata: +async def internal_root() -> dict[str, Any]: """Root URL for liveness checks. Returns @@ -440,10 +450,13 @@ async def internal_root() -> Metadata: JSON response with a list of instruments, observation types, and data types. """ - return get_metadata( - package_name="consdb-pqserver", - application_name=config.name, - ) + return { + "instruments": [ "foo", "bar", "baz" ], + } +# get_metadata( +# package_name="consdb-pqserver", +# application_name=config.name, +# ) class Index(BaseModel): @@ -608,7 +621,7 @@ class GenericResponse(BaseModel): @external_router.post("/flex/{instrument}/{obs_type}/obs/{obs_id}") -def insert_flexible_metadata( +async def insert_flexible_metadata( instrument: Annotated[str, Depends(validate_instrument_name)], obs_type: ObsTypeEnum, obs_id: ObservationIdType, @@ -659,7 +672,7 @@ def insert_flexible_metadata( @external_router.post("/insert/{instrument}/{table}/obs/{obs_id}") -def insert( +async def insert( instrument: Annotated[str, Depends(validate_instrument_name)], table: str, obs_id: ObservationIdType, @@ -703,7 +716,7 @@ def insert( @external_router.post("/insert/{instrument}/{table}") -def insert_multiple( +async def insert_multiple( instrument: Annotated[str, Depends(validate_instrument_name)], table: str, ) -> dict[str, Any] | tuple[dict[str, str], int]: @@ -779,7 +792,7 @@ def insert_multiple( @external_router.get("/query/{instrument}/{obs_type}/obs/{obs_id}") -def get_all_metadata( +async def get_all_metadata( instrument: Annotated[str, Depends(validate_instrument_name)], obs_type: ObsTypeEnum, obs_id: ObservationIdType, @@ -828,7 +841,7 @@ def get_all_metadata( @external_router.post("/query") -def query() -> dict[str, Any] | tuple[dict[str, str], int]: +async def query() -> dict[str, Any] | tuple[dict[str, str], int]: """Query the ConsDB database. Parameters @@ -866,7 +879,7 @@ def query() -> dict[str, Any] | tuple[dict[str, str], int]: @external_router.get("/schema") -def list_instruments() -> list[str]: +async def list_instruments() -> list[str]: """Retrieve the list of instruments available in ConsDB.""" global instrument_tables @@ -875,7 +888,7 @@ def list_instruments() -> list[str]: @external_router.get("/consdb/schema/{instrument}") -def list_table( +async def list_table( instrument: Annotated[str, Depends(validate_instrument_name)], ) -> list[str]: """Retrieve the list of tables for an instrument.""" @@ -887,7 +900,7 @@ def list_table( @external_router.get("/schema/{instrument}/
") -def schema(instrument: Annotated[str, Depends(validate_instrument_name)], table: str) -> dict[str, list[str]]: +async def schema(instrument: Annotated[str, Depends(validate_instrument_name)], table: str) -> dict[str, list[str]]: """Retrieve the descriptions of columns in a ConsDB table. Parameters diff --git a/tests/test_pqserver.py b/tests/test_pqserver.py index a26fe540..2bb0a82b 100644 --- a/tests/test_pqserver.py +++ b/tests/test_pqserver.py @@ -4,12 +4,14 @@ import tempfile from pathlib import Path +from fastapi.testclient import TestClient + import pytest from requests import Response def _assert_http_status(response: Response, status: int): - assert response.status_code == status, f"{response.status_code} {response.json}" + assert response.status_code == status, f"{response.status_code} {response.json()}" @pytest.fixture @@ -60,12 +62,12 @@ def app(db, scope="module"): @pytest.fixture def client(app, scope="module"): # NOTE: all tests share the same client, app, and database. - return app.test_client() + return TestClient(app) def test_root(client): response = client.get("/") - result = response.json + result = response.json() assert "instruments" in result assert "obs_types" in result assert "dtypes" in result @@ -73,7 +75,7 @@ def test_root(client): def test_root2(client): response = client.get("/consdb") - result = response.json + result = response.json() assert "instruments" in result assert "latiss" in result["instruments"] assert "obs_types" in result @@ -88,7 +90,7 @@ def test_flexible_metadata(client): json={"key": "foo", "dtype": "bool", "doc": "bool key"}, ) _assert_http_status(response, 200) - result = response.json + result = response.json() assert result == { "message": "Key added to flexible metadata", "key": "foo", @@ -101,7 +103,7 @@ def test_flexible_metadata(client): json={"key": "bar", "dtype": "int", "doc": "int key"}, ) _assert_http_status(response, 200) - result = response.json + result = response.json() assert result == { "message": "Key added to flexible metadata", "key": "bar", @@ -114,7 +116,7 @@ def test_flexible_metadata(client): json={"key": "baz", "dtype": "float", "doc": "float key"}, ) _assert_http_status(response, 200) - result = response.json + result = response.json() assert result["obs_type"] == "Exposure" response = client.post( @@ -122,7 +124,7 @@ def test_flexible_metadata(client): json={"key": "quux", "dtype": "str", "doc": "str key"}, ) _assert_http_status(response, 404) - result = response.json + result = response.json() assert result == { "message": "Unknown instrument", "value": "bad_instrument", @@ -131,7 +133,7 @@ def test_flexible_metadata(client): response = client.get("/consdb/flex/latiss/exposure/schema") _assert_http_status(response, 200) - result = response.json + result = response.json() assert "foo" in result assert "bar" in result assert "baz" in result @@ -142,7 +144,7 @@ def test_flexible_metadata(client): json={"values": {"foo": True, "bar": 42, "baz": 3.14159}}, ) _assert_http_status(response, 200) - result = response.json + result = response.json() assert result["message"] == "Flexible metadata inserted" assert result["obs_id"] == 2024032100002 @@ -151,7 +153,7 @@ def test_flexible_metadata(client): json={"values": {"foo": True, "bar": 42, "baz": 3.14159}}, ) _assert_http_status(response, 500) - result = response.json + result = response.json() assert "UNIQUE" in result["message"] response = client.post( @@ -159,18 +161,18 @@ def test_flexible_metadata(client): json={"values": {"bad_key": 2.71828}}, ) _assert_http_status(response, 404) - result = response.json + result = response.json() assert result["message"] == "Unknown key" assert result["value"] == "bad_key" response = client.get("/consdb/flex/latiss/exposure/obs/2024032100002") _assert_http_status(response, 200) - result = response.json + result = response.json() assert result == {"foo": True, "bar": 42, "baz": 3.14159} response = client.get("/consdb/flex/latiss/exposure/obs/2024032100002?k=bar&k=baz") _assert_http_status(response, 200) - result = response.json + result = response.json() assert result == {"bar": 42, "baz": 3.14159} response = client.post( @@ -178,22 +180,22 @@ def test_flexible_metadata(client): json={"values": {"foo": False, "bar": 34, "baz": 2.71828}}, ) _assert_http_status(response, 200) - result = response.json + result = response.json() assert result["message"] == "Flexible metadata inserted" response = client.get("/consdb/flex/latiss/exposure/obs/2024032100002") _assert_http_status(response, 200) - result = response.json + result = response.json() assert result == {"foo": False, "bar": 34, "baz": 2.71828} response = client.get("/consdb/flex/latiss/exposure/obs/2024032100002?k=baz") _assert_http_status(response, 200) - result = response.json + result = response.json() assert result == {"baz": 2.71828} response = client.post("/consdb/flex/latiss/exposure/obs/2024032100002", json={}) _assert_http_status(response, 404) - result = response.json + result = response.json() assert "Invalid JSON" in result["message"] assert result["required_keys"] == ["values"] @@ -209,7 +211,7 @@ def test_flexible_metadata(client): }, ) _assert_http_status(response, 200) - result = response.json + result = response.json() assert result == { "message": "Data inserted", "table": "cdb_latiss.exposure", @@ -219,7 +221,7 @@ def test_flexible_metadata(client): response = client.post("/consdb/query", json={"query": "SELECT * FROM exposure ORDER BY day_obs;"}) _assert_http_status(response, 200) - result = response.json + result = response.json() assert len(result) == 2 assert "exposure_id" in result["columns"] assert 20240321 in result["data"][0] From a09c3713915a0c095363b5311fc575005e04c015 Mon Sep 17 00:00:00 2001 From: Brian Brondel Date: Wed, 31 Jul 2024 09:34:52 -0700 Subject: [PATCH 04/13] Lose the routers; pqserver doesn't really need them anyway --- python/lsst/consdb/pqserver.py | 46 ++++++++++++++-------------------- 1 file changed, 19 insertions(+), 27 deletions(-) diff --git a/python/lsst/consdb/pqserver.py b/python/lsst/consdb/pqserver.py index de030042..ac76322f 100644 --- a/python/lsst/consdb/pqserver.py +++ b/python/lsst/consdb/pqserver.py @@ -23,7 +23,8 @@ from importlib.metadata import metadata, version from typing import Annotated, Any, Iterable, Optional -from fastapi import FastAPI, APIRouter, Depends, Path +import astropy +from fastapi import Depends, HTTPException, FastAPI, Path import sqlalchemy import sqlalchemy.dialects.postgresql from pydantic import BaseModel, Field, field_validator @@ -31,10 +32,6 @@ from safir.middleware.x_forwarded import XForwardedMiddleware from .utils import setup_logging, setup_postgres -internal_router = APIRouter() -external_router = APIRouter() - - class ObsTypeEnum(str, Enum): EXPOSURE = "exposure" VISIT1 = "visit1" @@ -120,11 +117,6 @@ def validate_instrument_name( engine = setup_postgres() logger = setup_logging(__name__) -app.include_router(internal_router) -app.include_router(external_router, prefix=path_prefix) -app.add_middleware(XForwardedMiddleware) - - ######################## # Schema preload class # ######################## @@ -467,8 +459,8 @@ class Index(BaseModel): dtypes: list[str] = Field(..., title="Allowed data types in flexible metadata") -@external_router.get( - "/", +@app.get( + "/consdb/", description="Application root", response_model=Index, response_model_exclude_none=True, @@ -519,8 +511,8 @@ class AddKeyResponseModel(BaseModel): obs_type: ObsTypeEnum = Field(..., title="The observation type that owns the new key") -@external_router.post( - "/flex/{instrument}/{obs_type}/addkey", +@app.post( + "/consdb/flex/{instrument}/{obs_type}/addkey", summary="Add a flexible metadata key for the specified instrument and obs_type.", response_model=AddKeyResponseModel, ) @@ -532,7 +524,7 @@ async def add_flexible_metadata_key( """Add a key to a flexible metadata table.""" global instrument_tables - logger.info(f"{request} {request.json}") + logger.info(f"{data.key} {data.dtype}") info = _check_json(request.json, "flex addkey", ("key", "dtype", "doc")) schema_table = instrument_tables.get_flexible_metadata_schema(instrument, obs_type) stmt = sqlalchemy.insert(schema_table).values( @@ -561,8 +553,8 @@ async def add_flexible_metadata_key( ) -@external_router.get( - "/flex/{instrument}/{obs_type}/schema", +@app.get( + "/consdb/flex/{instrument}/{obs_type}/schema", description="Flex schema for the given instrument and observation type.", ) async def get_flexible_metadata_keys( @@ -580,8 +572,8 @@ async def get_flexible_metadata_keys( return instrument_tables.flexible_metadata_schemas[instrument][obs_type] -@external_router.get( - "/flex/{instrument}/{obs_type}/obs/{obs_id}", +@app.get( + "/consdb/flex/{instrument}/{obs_type}/obs/{obs_id}", description="Flex schema for the given instrument and observation type.", ) async def get_flexible_metadata( @@ -620,7 +612,7 @@ class GenericResponse(BaseModel): table: Optional[str] = Field(..., title="Table name") -@external_router.post("/flex/{instrument}/{obs_type}/obs/{obs_id}") +@app.post("/consdb/flex/{instrument}/{obs_type}/obs/{obs_id}") async def insert_flexible_metadata( instrument: Annotated[str, Depends(validate_instrument_name)], obs_type: ObsTypeEnum, @@ -671,7 +663,7 @@ async def insert_flexible_metadata( ) -@external_router.post("/insert/{instrument}/{table}/obs/{obs_id}") +@app.post("/consdb/insert/{instrument}/{table}/obs/{obs_id}") async def insert( instrument: Annotated[str, Depends(validate_instrument_name)], table: str, @@ -715,7 +707,7 @@ async def insert( ) -@external_router.post("/insert/{instrument}/{table}") +@app.post("/consdb/insert/{instrument}/{table}") async def insert_multiple( instrument: Annotated[str, Depends(validate_instrument_name)], table: str, @@ -791,7 +783,7 @@ async def insert_multiple( ) -@external_router.get("/query/{instrument}/{obs_type}/obs/{obs_id}") +@app.get("/consdb/query/{instrument}/{obs_type}/obs/{obs_id}") async def get_all_metadata( instrument: Annotated[str, Depends(validate_instrument_name)], obs_type: ObsTypeEnum, @@ -840,7 +832,7 @@ async def get_all_metadata( return result -@external_router.post("/query") +@app.post("/consdb/query") async def query() -> dict[str, Any] | tuple[dict[str, str], int]: """Query the ConsDB database. @@ -878,7 +870,7 @@ async def query() -> dict[str, Any] | tuple[dict[str, str], int]: return result -@external_router.get("/schema") +@app.get("/consdb/schema") async def list_instruments() -> list[str]: """Retrieve the list of instruments available in ConsDB.""" global instrument_tables @@ -887,7 +879,7 @@ async def list_instruments() -> list[str]: return instrument_tables.instrument_list -@external_router.get("/consdb/schema/{instrument}") +@app.get("/consdb/schema/{instrument}") async def list_table( instrument: Annotated[str, Depends(validate_instrument_name)], ) -> list[str]: @@ -899,7 +891,7 @@ async def list_table( return list(schema.tables.keys()) -@external_router.get("/schema/{instrument}/
") +@app.get("/consdb/schema/{instrument}/
") async def schema(instrument: Annotated[str, Depends(validate_instrument_name)], table: str) -> dict[str, list[str]]: """Retrieve the descriptions of columns in a ConsDB table. From f61aae81ca9c387ae1001aa7263a6410b490c349 Mon Sep 17 00:00:00 2001 From: Brian Brondel Date: Wed, 31 Jul 2024 16:40:29 -0700 Subject: [PATCH 05/13] Fix broken tests --- python/lsst/consdb/pqserver.py | 389 +++++++++++++++------------------ tests/test_pqserver.py | 19 +- 2 files changed, 180 insertions(+), 228 deletions(-) diff --git a/python/lsst/consdb/pqserver.py b/python/lsst/consdb/pqserver.py index ac76322f..5889b7c5 100644 --- a/python/lsst/consdb/pqserver.py +++ b/python/lsst/consdb/pqserver.py @@ -20,18 +20,18 @@ # along with this program. If not, see . from enum import Enum -from importlib.metadata import metadata, version from typing import Annotated, Any, Iterable, Optional import astropy -from fastapi import Depends, HTTPException, FastAPI, Path +from fastapi import Body, Depends, HTTPException, FastAPI, Path, Query, Request, status +from fastapi.exceptions import RequestValidationError +from fastapi.responses import JSONResponse import sqlalchemy import sqlalchemy.dialects.postgresql from pydantic import BaseModel, Field, field_validator -from safir.metadata import Metadata, get_metadata -from safir.middleware.x_forwarded import XForwardedMiddleware from .utils import setup_logging, setup_postgres + class ObsTypeEnum(str, Enum): EXPOSURE = "exposure" VISIT1 = "visit1" @@ -40,7 +40,9 @@ class ObsTypeEnum(str, Enum): @classmethod def _missing_(cls, value): - """Makes the enum case-insensitive, see https://docs.python.org/3/library/enum.html""" + """Makes the enum case-insensitive, see + https://docs.python.org/3/library/enum.html + """ value = value.lower() for member in cls: if member.value == value: @@ -50,24 +52,30 @@ def _missing_(cls, value): ObservationIdType = int + +# This shenanigan makes flake8 recognize AllowedFlexTypeEnum as a type. +class AllowedFlexTypeEnum(Enum): + pass + + AllowedFlexType = bool | int | float | str -AllowedFlexTypeEnum = Enum( - "AllowedFlexTypeEnum", {t.__name__.upper(): t.__name__ for t in AllowedFlexType.__args__} +AllowedFlexTypeEnumBase = Enum( + "AllowedFlexTypeEnumBase", {t.__name__.upper(): t.__name__ for t in AllowedFlexType.__args__} ) +AllowedFlexTypeEnum = AllowedFlexTypeEnumBase def convert_to_flex_type(ty: AllowedFlexTypeEnum, v: str) -> AllowedFlexType: - """Converts a string containing a flex database value into the appropriate type. + """Converts a string containing a flex database value into the + appropriate type. Raises ====== - RuntimeError if ty does not match an allowed flex type - ValueError if the conversion is invalid """ if ty.value == "bool": # Special case return v.lower() in ("true", "t", "1") - m = [t for t in AllowedFlexType.__args__ if t.__name__ == ty] + m = [t for t in AllowedFlexType.__args__ if t.__name__ == ty.value] assert len(m) == 1 return m[0](v) @@ -283,10 +291,6 @@ def compute_wide_view_name(self, instrument: str, obs_type: str) -> str: view_nae: `str` Name of the appropriate wide view. """ - instrument = instrument.lower() - obs_type = obs_type.lower() - if instrument not in self.schemas: - raise BadValueException("instrument", instrument, list(self.schemas.keys())) view_name = f"cdb_{instrument}.{obs_type}_wide_view" if view_name not in self.schemas[instrument].tables: obs_type_list = [ @@ -306,62 +310,6 @@ def compute_wide_view_name(self, instrument: str, obs_type: str) -> str: ################## -class BadJsonException(Exception): - """Exception raised for invalid JSON. - - Reports the list of required keys. - - Parameters - ---------- - method: `str` - Name of the method being invoked. - keys: `Iterable` [ `str` ] - List of keys required in the JSON object. - """ - - status_code = 404 - - def __init__(self, method: str, keys: Iterable[str]): - self.method = method - self.keys = keys - - def to_dict(self) -> dict[str, Any]: - """Convert the exception to a dictionary for JSON conversion. - - Returns - ------- - json_dict: `dict` [ `str`, `Any` ] - Dictionary with a message and list of required keys. - """ - data = { - "message": f"Invalid JSON for {self.method}", - "required_keys": self.keys, - } - return data - - -def _check_json(json: dict | None, method: str, keys: Iterable[str]) -> dict[str, Any]: - """Check a JSON object for the presence of required keys. - - Parameters - ---------- - json: `dict` - The decoded JSON object. - method: `str` - The name of the Web service method being invoked. - keys: `Iterable` [ `str` ] - The keys required to be in the JSON object. - - Raises - ------ - BadJsonException - Raised if any key is missing. - """ - if not json or any(x not in json for x in keys): - raise BadJsonException(method, keys) - return json - - class BadValueException(Exception): """Exception raised for an invalid value. @@ -402,38 +350,48 @@ def to_dict(self) -> dict[str, Any]: return data -''' -@app.errorhandler(BadJsonException) -def handle_bad_json(e: BadJsonException) -> tuple[dict[str, Any], int]: - """Handle a BadJsonException by returning its content as JSON.""" - return e.to_dict(), e.status_code - - -@app.errorhandler(BadValueException) -def handle_bad_value(e: BadValueException) -> tuple[dict[str, Any], int]: - """Handle a BadValueException by returning its content as JSON.""" - return e.to_dict(), e.status_code - +@app.exception_handler(RequestValidationError) +async def validation_exception_handler(request: Request, exc: RequestValidationError): + exc_str = f'{exc}'.replace('\n', ' ').replace(' ', ' ') + logger.error(f"RequestValidationError {request}: {exc_str}") + content = {"message": "Validation error", "detail": exc.errors()} + return JSONResponse(content=content, status_code=status.HTTP_422_UNPROCESSABLE_ENTITY) -@app.errorhandler(sqlalchemy.exc.SQLAlchemyError) -def handle_sql_error(e: sqlalchemy.exc.SQLAlchemyError) -> tuple[dict[str, str], int]: - """Handle a SQLAlchemyError by returning its content as JSON.""" - return {"message": str(e)}, 500 -''' +@app.exception_handler(BadValueException) +async def bad_value_exception_handler(request: Request, exc: BadValueException): + exc_str = f'{exc}'.replace('\n', ' ').replace(' ', ' ') + logger.error(f"BadValueException {request}: {exc_str}") + return JSONResponse(content=exc.to_dict(), status_code=status.HTTP_404_NOT_FOUND) +@app.exception_handler(sqlalchemy.exc.SQLAlchemyError) +async def sqlalchemy_exception_handler(request: Request, exc: sqlalchemy.exc.SQLAlchemyError): + exc_str = f'{exc}'.replace('\n', ' ').replace(' ', ' ') + logger.error(f"SQLAlchemyError {request}: {exc_str}") + content={"message": str(exc)} + return JSONResponse(content=content, status_code=status.HTTP_500_INTERNAL_SERVER_ERROR) ################################### # Web service application methods # ################################### +class IndexResponseModel(BaseModel): + """Metadata returned by the external root URL.""" + + instruments: list[str] = Field(..., title="Available instruments") + obs_types: list[str] = Field(..., title="Available observation types") + dtypes: list[str] = Field(..., title="Allowed data types in flexible metadata") + + @app.get( "/", description="Metadata and health check endpoint.", include_in_schema=False, + response_model=IndexResponseModel, + response_model_exclude_none=True, summary="Application metadata", ) -async def internal_root() -> dict[str, Any]: +async def internal_root() -> IndexResponseModel: """Root URL for liveness checks. Returns @@ -442,55 +400,46 @@ async def internal_root() -> dict[str, Any]: JSON response with a list of instruments, observation types, and data types. """ - return { - "instruments": [ "foo", "bar", "baz" ], - } -# get_metadata( -# package_name="consdb-pqserver", -# application_name=config.name, -# ) - - -class Index(BaseModel): - """Metadata returned by the external root URL.""" - - instruments: list[str] = Field(..., title="Available instruments") - obs_types: list[str] = Field(..., title="Available observation types") - dtypes: list[str] = Field(..., title="Allowed data types in flexible metadata") + logger.info("GET /") + return IndexResponseModel( + instruments=instrument_tables.instrument_list, + obs_types=[o.value for o in ObsTypeEnum], + dtypes=[d.value for d in AllowedFlexTypeEnum], + ) @app.get( "/consdb/", description="Application root", - response_model=Index, + response_model=IndexResponseModel, response_model_exclude_none=True, summary="Application root", ) -async def external_root() -> Index: +async def external_root() -> IndexResponseModel: """Application root URL /consdb/.""" global instrument_tables - logger.info(request) - return Index( + logger.info("GET /consdb") + return IndexResponseModel( instruments=instrument_tables.instrument_list, obs_types=[o.value for o in ObsTypeEnum], - dtypes=[d.value for d in AllowedTypesEnum], + dtypes=[d.value for d in AllowedFlexTypeEnum], ) class AddKeyRequestModel(BaseModel): key: str = Field(..., title="The name of the added key") dtype: AllowedFlexTypeEnum = Field(..., title="Data type for the added key") - doc: str = Field(..., title="Documentation string for the new key") - unit: Optional[str] = Field(..., title="Unit for value") - ucd: Optional[str] = Field( - ..., title="IVOA Unified Content Descriptor (https://www.ivoa.net/documents/UCD1+/)" + doc: Optional[str] = Field("", title="Documentation string for the new key") + unit: Optional[str | None] = Field(None, title="Unit for value") + ucd: Optional[str | None] = Field( + None, title="IVOA Unified Content Descriptor (https://www.ivoa.net/documents/UCD1+/)" ) @field_validator("unit") def validate_unit(v): try: - unit = astropy.units.Unit(v) + _ = astropy.units.Unit(v) except ValueError: raise ValueError(f"'{v}' is a not a valid unit.") return v @@ -525,11 +474,10 @@ async def add_flexible_metadata_key( global instrument_tables logger.info(f"{data.key} {data.dtype}") - info = _check_json(request.json, "flex addkey", ("key", "dtype", "doc")) schema_table = instrument_tables.get_flexible_metadata_schema(instrument, obs_type) stmt = sqlalchemy.insert(schema_table).values( key=data.key, - dtype=data.dtype, + dtype=data.dtype.value, doc=data.doc, unit=data.unit, ucd=data.ucd, @@ -539,17 +487,17 @@ async def add_flexible_metadata_key( _ = conn.execute(stmt) conn.commit() # Update cached copy without re-querying database. - instrument_tables.flexible_metadata_schemas[instrument.lower()][obs_type.lower()][key] = [ - data.dtype, + instrument_tables.flexible_metadata_schemas[instrument.lower()][obs_type.lower()][data.key] = [ + data.dtype.value, data.doc, data.unit, data.ucd, ] - return AddKeyResponse( + return AddKeyResponseModel( message="Key added to flexible metadata", key=data.key, instrument=instrument, - obs_type=data.obs_type, + obs_type=obs_type, ) @@ -561,10 +509,12 @@ async def get_flexible_metadata_keys( instrument: Annotated[str, Depends(validate_instrument_name)], obs_type: ObsTypeEnum, ) -> dict[str, list[str | None]]: - """Returns the flex schema for the given instrument and observation type.""" + """Returns the flex schema for the given instrument and + observation type. + """ global instrument_tables - logger.info(request) + logger.info(f"GET /consdb/flex/{instrument}/{obs_type}/schema") instrument = instrument.lower() obs_type = obs_type.lower() _ = instrument_tables.compute_flexible_metadata_table_name(instrument, obs_type) @@ -578,20 +528,20 @@ async def get_flexible_metadata_keys( ) async def get_flexible_metadata( instrument: Annotated[str, Depends(validate_instrument_name)], - obs_type: ObsTypeEnum, - obs_id: ObservationIdType, + obs_type: ObsTypeEnum = Path(..., title="Observation type"), + obs_id: ObservationIdType = Path(..., title="Observation ID"), + k: list[str] = Query([], title="Columns to retrieve"), ) -> dict[str, AllowedFlexType]: """Retrieve values for an observation from a flexible metadata table.""" global instrument_tables - logger.info(request) + logger.info(f"GET /consdb/flex/{instrument}/{obs_type}/obs/{obs_id}") table = instrument_tables.get_flexible_metadata_table(instrument, obs_type) schema = instrument_tables.flexible_metadata_schemas[instrument][obs_type] result = dict() stmt = sqlalchemy.select(table.c["key", "value"]).where(table.c.obs_id == obs_id) - if request.args and "k" in request.args: - cols = request.args.getlist("k") - stmt = stmt.where(table.c.key.in_(cols)) + if len(k) > 0: + stmt = stmt.where(table.c.key.in_(k)) logger.debug(str(stmt)) with engine.connect() as conn: for row in conn.execute(stmt): @@ -600,16 +550,20 @@ async def get_flexible_metadata( instrument_tables.refresh_flexible_metadata_schema(instrument, obs_type) schema = instrument_tables.flexible_metadata_schemas[instrument][obs_type] dtype = schema[key][0] - result[key] = convert_to_flex_type(AllowedFlexTypeEnum(key), value) + result[key] = convert_to_flex_type(AllowedFlexTypeEnum(dtype), value) return result -class GenericResponse(BaseModel): +class InsertDataModel(BaseModel): + """ This model can be used for either flex or regular data. """ + values: dict[str, AllowedFlexType] = Field(..., title="Data to insert or update") + + +class InsertFlexDataResponse(BaseModel): message: str = Field(..., title="Human-readable response message") instrument: str = Field(..., title="Instrument name (e.g., ``LATISS``)") obs_type: ObsTypeEnum = Field(..., title="The observation type (e.g., ``exposure``)") obs_id: ObservationIdType | list[ObservationIdType] = Field(..., title="Observation ID") - table: Optional[str] = Field(..., title="Table name") @app.post("/consdb/flex/{instrument}/{obs_type}/obs/{obs_id}") @@ -617,16 +571,17 @@ async def insert_flexible_metadata( instrument: Annotated[str, Depends(validate_instrument_name)], obs_type: ObsTypeEnum, obs_id: ObservationIdType, -) -> GenericResponse: + data: InsertDataModel = Body(..., title="Data to insert or update"), + u: Optional[int] = Query(0, title="Update if exists"), +) -> InsertFlexDataResponse: """Insert or update key/value pairs in a flexible metadata table.""" global instrument_tables - logger.info(f"{request} {request.json}") - info = _check_json(request.json, "flex obs", ("values",)) + logger.info(f"POST /consdb/flex/{instrument}/{obs_type}/obs/{obs_id}") table = instrument_tables.get_flexible_metadata_table(instrument, obs_type) schema = instrument_tables.flexible_metadata_schemas[instrument][obs_type] - value_dict = info["values"] + value_dict = data.values if any(key not in schema for key in value_dict): instrument_tables.refresh_flexible_metadata_schema(instrument, obs_type) schema = instrument_tables.flexible_metadata_schemas[instrument][obs_type] @@ -642,68 +597,81 @@ async def insert_flexible_metadata( with engine.connect() as conn: for key, value in value_dict.items(): value_str = str(value) + stmt: sqlalchemy.sql.dml.Insert - if request.args and request.args.get("u") == "1": - stmt = ( - sqlalchemy.dialects.postgresql.insert(table) - .values(obs_id=obs_id, key=key, value=value_str) - .on_conflict_do_update(index_elements=["obs_id", "key"], set_={"value": value_str}) - ) - else: - stmt = sqlalchemy.insert(table).values(obs_id=obs_id, key=key, value=value_str) + stmt = sqlalchemy.dialects.postgresql.insert(table).values( + obs_id=obs_id, key=key, value=value_str + ) + logger.error(f"{u=}") + if u != 0: + stmt = stmt.on_conflict_do_update(index_elements=["obs_id", "key"], set_={"value": value_str}) + logger.debug(str(stmt)) _ = conn.execute(stmt) conn.commit() - return GenericResponse( + return InsertFlexDataResponse( message="Flexible metadata inserted", - obs_id=obs_id, instrument=instrument, obs_type=obs_type, + obs_id=obs_id, ) +class GenericResponse(BaseModel): + message: str = Field(..., title="Human-readable response message") + instrument: str = Field(..., title="Instrument name (e.g., ``LATISS``)") + obs_type: ObsTypeEnum = Field(..., title="The observation type (e.g., ``exposure``)") + obs_id: ObservationIdType | list[ObservationIdType] = Field(..., title="Observation ID") + table: Optional[str] = Field(..., title="Table name") + +class InsertDataResponse(BaseModel): + message: str = Field(..., title="Human-readable response message") + instrument: str = Field(..., title="Instrument name (e.g., ``LATISS``)") + obs_id: ObservationIdType | list[ObservationIdType] = Field(..., title="Observation ID") + table: Optional[str] = Field(..., title="Table name") + + @app.post("/consdb/insert/{instrument}/{table}/obs/{obs_id}") async def insert( instrument: Annotated[str, Depends(validate_instrument_name)], table: str, obs_id: ObservationIdType, -) -> GenericResponse: + data: InsertDataModel = Body(..., title="Data to insert or update"), + u: Optional[int] = Query(0, title="Update if data already exist"), +) -> InsertDataResponse: """Insert or update column/value pairs in a ConsDB table.""" global instrument_tables - logger.info(f"{request} {request.json}") - instrument = instrument.lower() - if instrument not in instrument_tables.schemas: - raise BadValueException("instrument", instrument, list(instrument_tables.schemas.keys())) - info = _check_json(request.json, "insert", ("values",)) + logger.info(f"POST /consdb/insert/{instrument}/{table}/obs/{obs_id}") schema = f"cdb_{instrument}." table_name = table.lower() if not table.lower().startswith(schema): table_name = schema + table_name table_obj = instrument_tables.schemas[instrument].tables[table_name] - valdict = info["values"] + valdict = data.values obs_id_colname = instrument_tables.obs_id_column[instrument][table_name] valdict[obs_id_colname] = obs_id stmt: sqlalchemy.sql.dml.Insert - if request.args and request.args.get("u") == "1": - stmt = ( - sqlalchemy.dialects.postgresql.insert(table_obj) - .values(valdict) - .on_conflict_do_update(index_elements=[obs_id_colname], set_=valdict) - ) - else: - stmt = sqlalchemy.insert(table_obj).values(valdict) + stmt = sqlalchemy.dialects.postgresql.insert(table_obj).values(valdict) + if u != 0: + stmt = stmt.on_conflict_do_update(index_elements=[obs_id_colname], set_=valdict) logger.debug(str(stmt)) with engine.connect() as conn: _ = conn.execute(stmt) conn.commit() - return GenericResponse( + return InsertDataResponse( message="Data inserted", instrument=instrument, - table=table_name, obs_id=obs_id, + table=table_name, + ) + + +class InsertMultipleRequestModel(BaseModel): + obs_dict: dict[ObservationIdType, dict[str, AllowedFlexType]] = Field( + ..., title="Observation ID and key/value pairs to insert or update" ) @@ -711,26 +679,11 @@ async def insert( async def insert_multiple( instrument: Annotated[str, Depends(validate_instrument_name)], table: str, + data: InsertMultipleRequestModel = Body(..., title="Data to insert or update"), + u: Optional[int] = Query(0, title="Update if data already exist"), ) -> dict[str, Any] | tuple[dict[str, str], int]: """Insert or update multiple observations in a ConsDB table. - Parameters - ---------- - instrument: `str` - Name of the instrument (e.g. ``LATISS``). - table: `str` - Name of table to insert into. - u: `str` - Allow update if set to "1" (URL query parameter). - obs_dict: `dict` [ `int`, `dict` [ `str`, `Any` ] ] - Dictionary of unique observation ids and key/value pairs to insert or - update (JSON POST data). - - Returns - ------- - json_dict: `dict` [ `str`, `Any` ] - JSON response with 200 HTTP status on success. - Raises ------ BadJsonException @@ -741,35 +694,24 @@ async def insert_multiple( """ global instrument_tables - logger.info(f"{request} {request.json}") - instrument = instrument.lower() - if instrument not in instrument_tables.schemas: - raise BadValueException("instrument", instrument, list(instrument_tables.schemas.keys())) - info = _check_json(request.json, "insert", ("obs_dict")) + logger.info(f"POST /consdb/insert/{instrument}/{table}") schema = f"cdb_{instrument}." table_name = table.lower() if not table.lower().startswith(schema): table_name = schema + table_name table_obj = instrument_tables.schemas[instrument].tables[table_name] - table_name = f"cdb_{instrument}." + info["table"].lower() + table_name = f"cdb_{instrument}." + table.lower() table = instrument_tables.schemas[instrument].tables[table_name] obs_id_colname = instrument_tables.obs_id_column[instrument][table_name] with engine.connect() as conn: - for obs_id, valdict in info["obs_dict"]: - if not isinstance(obs_id, ObservationIdType): - raise BadValueException("obs_id value", obs_id) + for obs_id, valdict in data.obs_dict.items(): valdict[obs_id_colname] = obs_id stmt: sqlalchemy.sql.dml.Insert - if request.args and request.args.get("u") == "1": - stmt = ( - sqlalchemy.dialects.postgresql.insert(table_obj) - .values(valdict) - .on_conflict_do_update(index_elements=[obs_id_colname], set_=valdict) - ) - else: - stmt = sqlalchemy.insert(table_obj).values(valdict) + stmt = sqlalchemy.dialects.postgresql.insert(table_obj).values(valdict) + if u != 0: + stmt = stmt.on_conflict_do_update(index_elements=[obs_id_colname], set_=valdict) logger.debug(str(stmt)) # TODO: optimize as executemany _ = conn.execute(stmt) @@ -779,7 +721,7 @@ async def insert_multiple( message="Data inserted", table=table_name, instrument=instrument, - obs_id=info["obs_dict"].keys(), + obs_id=data.obs_dict.keys(), ) @@ -788,6 +730,7 @@ async def get_all_metadata( instrument: Annotated[str, Depends(validate_instrument_name)], obs_type: ObsTypeEnum, obs_id: ObservationIdType, + flex: Optional[int] = Query(0, title="Include flexible metadata"), ) -> dict[str, Any]: """Get all information about an observation. @@ -813,7 +756,7 @@ async def get_all_metadata( """ global instrument_tables - logger.info(request) + logger.info(f"GET /consdb/query/{instrument}/{obs_type}/obs/{obs_id}") instrument = instrument.lower() obs_type = obs_type.lower() view_name = instrument_tables.compute_wide_view_name(instrument, obs_type) @@ -826,14 +769,25 @@ async def get_all_metadata( rows = conn.execute(stmt).all() assert len(rows) == 1 result = dict(rows[0]._mapping) - if request.args and "flex" in request.args and request.args["flex"] == "1": + if flex != 0: flex_result = get_flexible_metadata(instrument, obs_type, obs_id) result.update(flex_result) return result +class QueryRequestModel(BaseModel): + query: str = Field(..., title="SQL query string") + + +class QueryResponseModel(BaseModel): + columns: list[str] = Field(..., title="Column names") + data: list[Any] = Field(..., title="Data rows") + + @app.post("/consdb/query") -async def query() -> dict[str, Any] | tuple[dict[str, str], int]: +async def query( + data: QueryRequestModel = Body(..., title="SQL query string"), +) -> QueryResponseModel: """Query the ConsDB database. Parameters @@ -848,26 +802,25 @@ async def query() -> dict[str, Any] | tuple[dict[str, str], int]: Response is a dict with a ``columns`` key with value being a list of string column names and a ``data`` key with value being a list of rows. - - Raises - ------ - BadJsonException - Raised if JSON is absent or missing a required key. """ - logger.info(f"{request} {request.json}") - info = _check_json(request.json, "query", ("query",)) + logger.info("POST /consdb/query") + + columns = [] + rows = [] with engine.connect() as conn: - cursor = conn.exec_driver_sql(info["query"]) + cursor = conn.exec_driver_sql(data.query) first = True - result: dict[str, Any] = {} - rows = [] for row in cursor: + logger.info(row) if first: - result["columns"] = list(row._fields) + columns.extend(row._fields) first = False rows.append(list(row)) - result["data"] = rows - return result + + return QueryResponseModel( + columns=columns, + data=rows, + ) @app.get("/consdb/schema") @@ -875,7 +828,7 @@ async def list_instruments() -> list[str]: """Retrieve the list of instruments available in ConsDB.""" global instrument_tables - logger.info(request) + logger.info("GET /consdb/schema") return instrument_tables.instrument_list @@ -886,13 +839,15 @@ async def list_table( """Retrieve the list of tables for an instrument.""" global instrument_tables - logger.info(request) + logger.info(f"GET /consdb/schema/{instrument}") schema = instrument_tables.schemas[instrument] return list(schema.tables.keys()) @app.get("/consdb/schema/{instrument}/
") -async def schema(instrument: Annotated[str, Depends(validate_instrument_name)], table: str) -> dict[str, list[str]]: +async def schema( + instrument: Annotated[str, Depends(validate_instrument_name)], table: str +) -> dict[str, list[str]]: """Retrieve the descriptions of columns in a ConsDB table. Parameters @@ -916,7 +871,7 @@ async def schema(instrument: Annotated[str, Depends(validate_instrument_name)], """ global instrument_tables - logger.info(request) + logger.info("GET /consdb/schema/{instrument}/{table}") schema = instrument_tables.schemas[instrument] if not table.startswith(f"cdb_{instrument}."): table = f"cdb_{instrument}.{table}" diff --git a/tests/test_pqserver.py b/tests/test_pqserver.py index 2bb0a82b..0da9e5a5 100644 --- a/tests/test_pqserver.py +++ b/tests/test_pqserver.py @@ -107,7 +107,7 @@ def test_flexible_metadata(client): assert result == { "message": "Key added to flexible metadata", "key": "bar", - "instrument": "LATISS", + "instrument": "latiss", "obs_type": "exposure", } @@ -117,19 +117,15 @@ def test_flexible_metadata(client): ) _assert_http_status(response, 200) result = response.json() - assert result["obs_type"] == "Exposure" + assert result["obs_type"] == "exposure" response = client.post( "/consdb/flex/bad_instrument/exposure/addkey", json={"key": "quux", "dtype": "str", "doc": "str key"}, ) - _assert_http_status(response, 404) + _assert_http_status(response, 400) result = response.json() - assert result == { - "message": "Unknown instrument", - "value": "bad_instrument", - "valid": ["latiss"], - } + assert "Invalid instrument" in result["detail"] response = client.get("/consdb/flex/latiss/exposure/schema") _assert_http_status(response, 200) @@ -194,10 +190,11 @@ def test_flexible_metadata(client): assert result == {"baz": 2.71828} response = client.post("/consdb/flex/latiss/exposure/obs/2024032100002", json={}) - _assert_http_status(response, 404) + _assert_http_status(response, 422) result = response.json() - assert "Invalid JSON" in result["message"] - assert result["required_keys"] == ["values"] + assert "Validation error" in result["message"] + assert result["detail"][0]["type"] == "missing" + assert "values" in result["detail"][0]["loc"] response = client.post( "/consdb/insert/latiss/exposure/obs/2024032100003", From 1b0c4398252ac4b10b85bf0e72b8761e5bd7379f Mon Sep 17 00:00:00 2001 From: Brian Brondel Date: Wed, 31 Jul 2024 16:44:23 -0700 Subject: [PATCH 06/13] lint and isort --- python/lsst/consdb/pqserver.py | 28 +++++++++++++++------------- tests/test_pqserver.py | 3 +-- 2 files changed, 16 insertions(+), 15 deletions(-) diff --git a/python/lsst/consdb/pqserver.py b/python/lsst/consdb/pqserver.py index 5889b7c5..663b2145 100644 --- a/python/lsst/consdb/pqserver.py +++ b/python/lsst/consdb/pqserver.py @@ -20,15 +20,16 @@ # along with this program. If not, see . from enum import Enum -from typing import Annotated, Any, Iterable, Optional +from typing import Annotated, Any, Optional import astropy -from fastapi import Body, Depends, HTTPException, FastAPI, Path, Query, Request, status -from fastapi.exceptions import RequestValidationError -from fastapi.responses import JSONResponse import sqlalchemy import sqlalchemy.dialects.postgresql +from fastapi import Body, Depends, FastAPI, HTTPException, Path, Query, Request, status +from fastapi.exceptions import RequestValidationError +from fastapi.responses import JSONResponse from pydantic import BaseModel, Field, field_validator + from .utils import setup_logging, setup_postgres @@ -54,10 +55,6 @@ def _missing_(cls, value): # This shenanigan makes flake8 recognize AllowedFlexTypeEnum as a type. -class AllowedFlexTypeEnum(Enum): - pass - - AllowedFlexType = bool | int | float | str AllowedFlexTypeEnumBase = Enum( "AllowedFlexTypeEnumBase", {t.__name__.upper(): t.__name__ for t in AllowedFlexType.__args__} @@ -352,24 +349,27 @@ def to_dict(self) -> dict[str, Any]: @app.exception_handler(RequestValidationError) async def validation_exception_handler(request: Request, exc: RequestValidationError): - exc_str = f'{exc}'.replace('\n', ' ').replace(' ', ' ') + exc_str = f"{exc}".replace("\n", " ").replace(" ", " ") logger.error(f"RequestValidationError {request}: {exc_str}") content = {"message": "Validation error", "detail": exc.errors()} return JSONResponse(content=content, status_code=status.HTTP_422_UNPROCESSABLE_ENTITY) + @app.exception_handler(BadValueException) async def bad_value_exception_handler(request: Request, exc: BadValueException): - exc_str = f'{exc}'.replace('\n', ' ').replace(' ', ' ') + exc_str = f"{exc}".replace("\n", " ").replace(" ", " ") logger.error(f"BadValueException {request}: {exc_str}") return JSONResponse(content=exc.to_dict(), status_code=status.HTTP_404_NOT_FOUND) + @app.exception_handler(sqlalchemy.exc.SQLAlchemyError) async def sqlalchemy_exception_handler(request: Request, exc: sqlalchemy.exc.SQLAlchemyError): - exc_str = f'{exc}'.replace('\n', ' ').replace(' ', ' ') + exc_str = f"{exc}".replace("\n", " ").replace(" ", " ") logger.error(f"SQLAlchemyError {request}: {exc_str}") - content={"message": str(exc)} + content = {"message": str(exc)} return JSONResponse(content=content, status_code=status.HTTP_500_INTERNAL_SERVER_ERROR) + ################################### # Web service application methods # ################################### @@ -555,7 +555,8 @@ async def get_flexible_metadata( class InsertDataModel(BaseModel): - """ This model can be used for either flex or regular data. """ + """This model can be used for either flex or regular data.""" + values: dict[str, AllowedFlexType] = Field(..., title="Data to insert or update") @@ -625,6 +626,7 @@ class GenericResponse(BaseModel): obs_id: ObservationIdType | list[ObservationIdType] = Field(..., title="Observation ID") table: Optional[str] = Field(..., title="Table name") + class InsertDataResponse(BaseModel): message: str = Field(..., title="Human-readable response message") instrument: str = Field(..., title="Instrument name (e.g., ``LATISS``)") diff --git a/tests/test_pqserver.py b/tests/test_pqserver.py index 0da9e5a5..9904681d 100644 --- a/tests/test_pqserver.py +++ b/tests/test_pqserver.py @@ -4,9 +4,8 @@ import tempfile from pathlib import Path -from fastapi.testclient import TestClient - import pytest +from fastapi.testclient import TestClient from requests import Response From 8999914e64caef59be632d4033f2a8473e79954e Mon Sep 17 00:00:00 2001 From: Brian Brondel Date: Wed, 31 Jul 2024 17:11:11 -0700 Subject: [PATCH 07/13] Add dependencies for pytest --- .github/workflows/pytest.yaml | 2 +- pyproject.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/pytest.yaml b/.github/workflows/pytest.yaml index 90846b56..bf5e7a46 100644 --- a/.github/workflows/pytest.yaml +++ b/.github/workflows/pytest.yaml @@ -25,7 +25,7 @@ jobs: - name: Editable mode install run: | python -m pip install uv - uv pip install --system pytest pytest-cov pytest-html + uv pip install --system pytest pytest-cov pytest-html pytest-asyncio uv pip install --system -e . - name: Test with pytest run: | diff --git a/pyproject.toml b/pyproject.toml index da54bf0a..8cdc7cd5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,7 +6,7 @@ build-backend = "setuptools.build_meta" name = "consdb" description = "consdb provides support for the Consolidated Database for the `Vera C. Rubin Observatory `_." license = { text = "GPL" } -dependencies = ["flask", "requests", "sqlalchemy"] +dependencies = ["fastapi", "requests", "sqlalchemy", "astropy"] readme = "README.rst" urls = { documentation = "https://consdb.lsst.io", source_code = "https://github.com/lsst-dm/consdb"} dynamic = ["version"] From 0dfe6611c9781594817276cd2cd428802f79330a Mon Sep 17 00:00:00 2001 From: Brian Brondel Date: Wed, 31 Jul 2024 17:57:19 -0700 Subject: [PATCH 08/13] Expand on the documentation --- python/lsst/consdb/pqserver.py | 33 +++++++++++++++++++++++++-------- tests/test_pqserver.py | 1 + 2 files changed, 26 insertions(+), 8 deletions(-) diff --git a/python/lsst/consdb/pqserver.py b/python/lsst/consdb/pqserver.py index 663b2145..f2084fdb 100644 --- a/python/lsst/consdb/pqserver.py +++ b/python/lsst/consdb/pqserver.py @@ -462,8 +462,9 @@ class AddKeyResponseModel(BaseModel): @app.post( "/consdb/flex/{instrument}/{obs_type}/addkey", - summary="Add a flexible metadata key for the specified instrument and obs_type.", + summary="Add a flexible metadata key", response_model=AddKeyResponseModel, + description="Add a flexible metadata key for the specified instrument and obs_type.", ) async def add_flexible_metadata_key( instrument: Annotated[str, Depends(validate_instrument_name)], @@ -500,15 +501,18 @@ async def add_flexible_metadata_key( obs_type=obs_type, ) +class FlexMetadataSchemaResponseModel(BaseModel): + schema: dict[str, tuple[AllowedFlexTypeEnum, str, str | None, str | None]] = Field(..., title="Dictionary containing each flex key name and its associated data type, documentation, unit, and UCD") @app.get( "/consdb/flex/{instrument}/{obs_type}/schema", + summary="Get all flexible metadata keys", description="Flex schema for the given instrument and observation type.", ) async def get_flexible_metadata_keys( - instrument: Annotated[str, Depends(validate_instrument_name)], - obs_type: ObsTypeEnum, -) -> dict[str, list[str | None]]: + instrument: Annotated[str, Depends(validate_instrument_name)] = Path(..., title="Instrument name"), + obs_type: ObsTypeEnum = Path(..., title="Observation type"), +) -> FlexMetadataSchemaResponseModel: """Returns the flex schema for the given instrument and observation type. """ @@ -519,7 +523,10 @@ async def get_flexible_metadata_keys( obs_type = obs_type.lower() _ = instrument_tables.compute_flexible_metadata_table_name(instrument, obs_type) instrument_tables.refresh_flexible_metadata_schema(instrument, obs_type) - return instrument_tables.flexible_metadata_schemas[instrument][obs_type] + schema = instrument_tables.flexible_metadata_schemas[instrument][obs_type] + + return FlexMetadataSchemaResponseModel( + schema=instrument_tables.flexible_metadata_schemas[instrument][obs_type]) @app.get( @@ -634,7 +641,10 @@ class InsertDataResponse(BaseModel): table: Optional[str] = Field(..., title="Table name") -@app.post("/consdb/insert/{instrument}/{table}/obs/{obs_id}") +@app.post( + "/consdb/insert/{instrument}/{table}/obs/{obs_id}", + summary="Insert data row", +) async def insert( instrument: Annotated[str, Depends(validate_instrument_name)], table: str, @@ -677,7 +687,10 @@ class InsertMultipleRequestModel(BaseModel): ) -@app.post("/consdb/insert/{instrument}/{table}") +@app.post( + "/consdb/insert/{instrument}/{table}", + summary="Insert multiple data rows", +) async def insert_multiple( instrument: Annotated[str, Depends(validate_instrument_name)], table: str, @@ -727,7 +740,11 @@ async def insert_multiple( ) -@app.get("/consdb/query/{instrument}/{obs_type}/obs/{obs_id}") +@app.get( + "/consdb/query/{instrument}/{obs_type}/obs/{obs_id}", + summary="Get all metadata", + description="Get all metadata for a given observation.", +) async def get_all_metadata( instrument: Annotated[str, Depends(validate_instrument_name)], obs_type: ObsTypeEnum, diff --git a/tests/test_pqserver.py b/tests/test_pqserver.py index 9904681d..c2375d8d 100644 --- a/tests/test_pqserver.py +++ b/tests/test_pqserver.py @@ -129,6 +129,7 @@ def test_flexible_metadata(client): response = client.get("/consdb/flex/latiss/exposure/schema") _assert_http_status(response, 200) result = response.json() + result = result["schema"] assert "foo" in result assert "bar" in result assert "baz" in result From 7a84fbf13b84db128ab90f9d56f68f5cc8fedad6 Mon Sep 17 00:00:00 2001 From: Brian Brondel Date: Wed, 31 Jul 2024 18:00:18 -0700 Subject: [PATCH 09/13] lint and isort --- python/lsst/consdb/pqserver.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/python/lsst/consdb/pqserver.py b/python/lsst/consdb/pqserver.py index f2084fdb..27dae173 100644 --- a/python/lsst/consdb/pqserver.py +++ b/python/lsst/consdb/pqserver.py @@ -501,8 +501,16 @@ async def add_flexible_metadata_key( obs_type=obs_type, ) + class FlexMetadataSchemaResponseModel(BaseModel): - schema: dict[str, tuple[AllowedFlexTypeEnum, str, str | None, str | None]] = Field(..., title="Dictionary containing each flex key name and its associated data type, documentation, unit, and UCD") + schema: dict[str, tuple[AllowedFlexTypeEnum, str, str | None, str | None]] = Field( + ..., + title=""" + Dictionary containing each flex key name + and its associated data type, documentation, unit, and UCD + """, + ) + @app.get( "/consdb/flex/{instrument}/{obs_type}/schema", @@ -523,10 +531,10 @@ async def get_flexible_metadata_keys( obs_type = obs_type.lower() _ = instrument_tables.compute_flexible_metadata_table_name(instrument, obs_type) instrument_tables.refresh_flexible_metadata_schema(instrument, obs_type) - schema = instrument_tables.flexible_metadata_schemas[instrument][obs_type] return FlexMetadataSchemaResponseModel( - schema=instrument_tables.flexible_metadata_schemas[instrument][obs_type]) + schema=instrument_tables.flexible_metadata_schemas[instrument][obs_type] + ) @app.get( From 558bcbed46ef4c2b8577fdb736a7ba2c875b1c92 Mon Sep 17 00:00:00 2001 From: Brian Brondel Date: Fri, 2 Aug 2024 15:34:06 -0700 Subject: [PATCH 10/13] Changes in response to comments by @dhirving --- Dockerfile.pqserver | 3 +- python/lsst/consdb/pqserver.py | 228 ++++++++++++++++----------------- tests/test_pqserver.py | 2 +- 3 files changed, 112 insertions(+), 121 deletions(-) diff --git a/Dockerfile.pqserver b/Dockerfile.pqserver index 5fdae7db..ba25b98c 100644 --- a/Dockerfile.pqserver +++ b/Dockerfile.pqserver @@ -8,5 +8,4 @@ COPY python/lsst/consdb/__init__.py python/lsst/consdb/pqserver.py python/lsst/c # Expose the port. EXPOSE 8080 -ENTRYPOINT [ "gunicorn", "-b", "0.0.0.0:8080", "-w", "2", "-k", "uvicorn.workers.UvicornWorker", "consdb_pq.pqserver:app" ] - +ENTRYPOINT [ "uvicorn", "consdb_pq.pqserver:app", "--host", "0.0.0.0", "--port", "8080" ] diff --git a/python/lsst/consdb/pqserver.py b/python/lsst/consdb/pqserver.py index 27dae173..b1c7ec7c 100644 --- a/python/lsst/consdb/pqserver.py +++ b/python/lsst/consdb/pqserver.py @@ -19,21 +19,21 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -from enum import Enum +from enum import StrEnum from typing import Annotated, Any, Optional import astropy import sqlalchemy import sqlalchemy.dialects.postgresql -from fastapi import Body, Depends, FastAPI, HTTPException, Path, Query, Request, status +from fastapi import Body, FastAPI, HTTPException, Path, Query, Request, status from fastapi.exceptions import RequestValidationError from fastapi.responses import JSONResponse -from pydantic import BaseModel, Field, field_validator +from pydantic import AfterValidator, BaseModel, Field, field_validator from .utils import setup_logging, setup_postgres -class ObsTypeEnum(str, Enum): +class ObsTypeEnum(StrEnum): EXPOSURE = "exposure" VISIT1 = "visit1" CCD_EXPOSURE = "ccdexposure" @@ -56,7 +56,7 @@ def _missing_(cls, value): # This shenanigan makes flake8 recognize AllowedFlexTypeEnum as a type. AllowedFlexType = bool | int | float | str -AllowedFlexTypeEnumBase = Enum( +AllowedFlexTypeEnumBase = StrEnum( "AllowedFlexTypeEnumBase", {t.__name__.upper(): t.__name__ for t in AllowedFlexType.__args__} ) AllowedFlexTypeEnum = AllowedFlexTypeEnumBase @@ -77,7 +77,7 @@ def convert_to_flex_type(ty: AllowedFlexTypeEnum, v: str) -> AllowedFlexType: return m[0](v) -class ObsIdColname(str, Enum): +class ObsIdColname(StrEnum): CCD_VISIT_ID = "ccdvisit_id" VISIT_ID = "visit_id" CCDEXPOSURE_ID = "ccdexposure_id" @@ -94,7 +94,7 @@ def _missing_(cls, value): def validate_instrument_name( - instrument: str = Path(..., description="Must be a valid instrument name (e.g., ``LATISS``)"), + instrument: str = Path(description="Must be a valid instrument name (e.g., ``LATISS``)"), ) -> str: global instrument_tables instrument_lower = instrument.lower() @@ -104,7 +104,9 @@ def validate_instrument_name( detail=f"Invalid instrument name {instrument}, must be one of " + ",".join(instrument_tables.instrument_list), ) - return instrument_lower + return instrument + +InstrumentName = Annotated[str, AfterValidator(validate_instrument_name)] #################### @@ -288,6 +290,8 @@ def compute_wide_view_name(self, instrument: str, obs_type: str) -> str: view_nae: `str` Name of the appropriate wide view. """ + instrument = instrument.lower() + obs_type = obs_type.lower() view_name = f"cdb_{instrument}.{obs_type}_wide_view" if view_name not in self.schemas[instrument].tables: obs_type_list = [ @@ -348,7 +352,7 @@ def to_dict(self) -> dict[str, Any]: @app.exception_handler(RequestValidationError) -async def validation_exception_handler(request: Request, exc: RequestValidationError): +def validation_exception_handler(request: Request, exc: RequestValidationError): exc_str = f"{exc}".replace("\n", " ").replace(" ", " ") logger.error(f"RequestValidationError {request}: {exc_str}") content = {"message": "Validation error", "detail": exc.errors()} @@ -356,14 +360,14 @@ async def validation_exception_handler(request: Request, exc: RequestValidationE @app.exception_handler(BadValueException) -async def bad_value_exception_handler(request: Request, exc: BadValueException): +def bad_value_exception_handler(request: Request, exc: BadValueException): exc_str = f"{exc}".replace("\n", " ").replace(" ", " ") logger.error(f"BadValueException {request}: {exc_str}") return JSONResponse(content=exc.to_dict(), status_code=status.HTTP_404_NOT_FOUND) @app.exception_handler(sqlalchemy.exc.SQLAlchemyError) -async def sqlalchemy_exception_handler(request: Request, exc: sqlalchemy.exc.SQLAlchemyError): +def sqlalchemy_exception_handler(request: Request, exc: sqlalchemy.exc.SQLAlchemyError): exc_str = f"{exc}".replace("\n", " ").replace(" ", " ") logger.error(f"SQLAlchemyError {request}: {exc_str}") content = {"message": str(exc)} @@ -378,20 +382,18 @@ async def sqlalchemy_exception_handler(request: Request, exc: sqlalchemy.exc.SQL class IndexResponseModel(BaseModel): """Metadata returned by the external root URL.""" - instruments: list[str] = Field(..., title="Available instruments") - obs_types: list[str] = Field(..., title="Available observation types") - dtypes: list[str] = Field(..., title="Allowed data types in flexible metadata") + instruments: list[str] = Field(title="Available instruments") + obs_types: list[str] = Field(title="Available observation types") + dtypes: list[str] = Field(title="Allowed data types in flexible metadata") @app.get( "/", description="Metadata and health check endpoint.", include_in_schema=False, - response_model=IndexResponseModel, - response_model_exclude_none=True, summary="Application metadata", ) -async def internal_root() -> IndexResponseModel: +def internal_root() -> IndexResponseModel: """Root URL for liveness checks. Returns @@ -400,7 +402,6 @@ async def internal_root() -> IndexResponseModel: JSON response with a list of instruments, observation types, and data types. """ - logger.info("GET /") return IndexResponseModel( instruments=instrument_tables.instrument_list, obs_types=[o.value for o in ObsTypeEnum], @@ -411,15 +412,12 @@ async def internal_root() -> IndexResponseModel: @app.get( "/consdb/", description="Application root", - response_model=IndexResponseModel, - response_model_exclude_none=True, summary="Application root", ) -async def external_root() -> IndexResponseModel: +def external_root() -> IndexResponseModel: """Application root URL /consdb/.""" global instrument_tables - logger.info("GET /consdb") return IndexResponseModel( instruments=instrument_tables.instrument_list, obs_types=[o.value for o in ObsTypeEnum], @@ -428,11 +426,11 @@ async def external_root() -> IndexResponseModel: class AddKeyRequestModel(BaseModel): - key: str = Field(..., title="The name of the added key") - dtype: AllowedFlexTypeEnum = Field(..., title="Data type for the added key") + key: str = Field(title="The name of the added key") + dtype: AllowedFlexTypeEnum = Field(title="Data type for the added key") doc: Optional[str] = Field("", title="Documentation string for the new key") - unit: Optional[str | None] = Field(None, title="Unit for value") - ucd: Optional[str | None] = Field( + unit: Optional[str] = Field(None, title="Unit for value") + ucd: Optional[str] = Field( None, title="IVOA Unified Content Descriptor (https://www.ivoa.net/documents/UCD1+/)" ) @@ -454,28 +452,27 @@ def validate_ucd(v): class AddKeyResponseModel(BaseModel): """Response model for the addkey endpoint.""" - message: str = Field(..., title="Human-readable response message") - key: str = Field(..., title="The name of the added key") - instrument: str = (Depends(validate_instrument_name),) - obs_type: ObsTypeEnum = Field(..., title="The observation type that owns the new key") + message: str = Field(title="Human-readable response message") + key: str = Field(title="The name of the added key") + instrument: InstrumentName = Field(title="The instrument name") + obs_type: ObsTypeEnum = Field(title="The observation type that owns the new key") @app.post( "/consdb/flex/{instrument}/{obs_type}/addkey", summary="Add a flexible metadata key", - response_model=AddKeyResponseModel, description="Add a flexible metadata key for the specified instrument and obs_type.", ) -async def add_flexible_metadata_key( - instrument: Annotated[str, Depends(validate_instrument_name)], +def add_flexible_metadata_key( + instrument: InstrumentName, obs_type: ObsTypeEnum, data: AddKeyRequestModel, ) -> AddKeyResponseModel: """Add a key to a flexible metadata table.""" global instrument_tables - logger.info(f"{data.key} {data.dtype}") - schema_table = instrument_tables.get_flexible_metadata_schema(instrument, obs_type) + instrument_l = instrument.lower() + schema_table = instrument_tables.get_flexible_metadata_schema(instrument_l, obs_type) stmt = sqlalchemy.insert(schema_table).values( key=data.key, dtype=data.dtype.value, @@ -488,7 +485,7 @@ async def add_flexible_metadata_key( _ = conn.execute(stmt) conn.commit() # Update cached copy without re-querying database. - instrument_tables.flexible_metadata_schemas[instrument.lower()][obs_type.lower()][data.key] = [ + instrument_tables.flexible_metadata_schemas[instrument_l][obs_type.lower()][data.key] = [ data.dtype.value, data.doc, data.unit, @@ -503,12 +500,12 @@ async def add_flexible_metadata_key( class FlexMetadataSchemaResponseModel(BaseModel): - schema: dict[str, tuple[AllowedFlexTypeEnum, str, str | None, str | None]] = Field( - ..., + schema_: dict[str, tuple[AllowedFlexTypeEnum, str, str | None, str | None]] = Field( title=""" Dictionary containing each flex key name and its associated data type, documentation, unit, and UCD """, + alias="schema", ) @@ -517,23 +514,22 @@ class FlexMetadataSchemaResponseModel(BaseModel): summary="Get all flexible metadata keys", description="Flex schema for the given instrument and observation type.", ) -async def get_flexible_metadata_keys( - instrument: Annotated[str, Depends(validate_instrument_name)] = Path(..., title="Instrument name"), - obs_type: ObsTypeEnum = Path(..., title="Observation type"), +def get_flexible_metadata_keys( + instrument: InstrumentName = Path(title="Instrument name"), + obs_type: ObsTypeEnum = Path(title="Observation type"), ) -> FlexMetadataSchemaResponseModel: """Returns the flex schema for the given instrument and observation type. """ global instrument_tables - logger.info(f"GET /consdb/flex/{instrument}/{obs_type}/schema") instrument = instrument.lower() obs_type = obs_type.lower() _ = instrument_tables.compute_flexible_metadata_table_name(instrument, obs_type) instrument_tables.refresh_flexible_metadata_schema(instrument, obs_type) return FlexMetadataSchemaResponseModel( - schema=instrument_tables.flexible_metadata_schemas[instrument][obs_type] + schema=instrument_tables.flexible_metadata_schemas[instrument.lower()][obs_type] ) @@ -541,18 +537,18 @@ async def get_flexible_metadata_keys( "/consdb/flex/{instrument}/{obs_type}/obs/{obs_id}", description="Flex schema for the given instrument and observation type.", ) -async def get_flexible_metadata( - instrument: Annotated[str, Depends(validate_instrument_name)], - obs_type: ObsTypeEnum = Path(..., title="Observation type"), - obs_id: ObservationIdType = Path(..., title="Observation ID"), +def get_flexible_metadata( + instrument: InstrumentName, + obs_type: ObsTypeEnum = Path(title="Observation type"), + obs_id: ObservationIdType = Path(title="Observation ID"), k: list[str] = Query([], title="Columns to retrieve"), ) -> dict[str, AllowedFlexType]: """Retrieve values for an observation from a flexible metadata table.""" global instrument_tables - logger.info(f"GET /consdb/flex/{instrument}/{obs_type}/obs/{obs_id}") - table = instrument_tables.get_flexible_metadata_table(instrument, obs_type) - schema = instrument_tables.flexible_metadata_schemas[instrument][obs_type] + instrument_l = instrument.lower() + table = instrument_tables.get_flexible_metadata_table(instrument_l, obs_type) + schema = instrument_tables.flexible_metadata_schemas[instrument_l][obs_type] result = dict() stmt = sqlalchemy.select(table.c["key", "value"]).where(table.c.obs_id == obs_id) if len(k) > 0: @@ -562,8 +558,8 @@ async def get_flexible_metadata( for row in conn.execute(stmt): key, value = row if key not in schema: - instrument_tables.refresh_flexible_metadata_schema(instrument, obs_type) - schema = instrument_tables.flexible_metadata_schemas[instrument][obs_type] + instrument_tables.refresh_flexible_metadata_schema(instrument_l, obs_type) + schema = instrument_tables.flexible_metadata_schemas[instrument_l][obs_type] dtype = schema[key][0] result[key] = convert_to_flex_type(AllowedFlexTypeEnum(dtype), value) return result @@ -572,35 +568,35 @@ async def get_flexible_metadata( class InsertDataModel(BaseModel): """This model can be used for either flex or regular data.""" - values: dict[str, AllowedFlexType] = Field(..., title="Data to insert or update") + values: dict[str, AllowedFlexType] = Field(title="Data to insert or update") class InsertFlexDataResponse(BaseModel): - message: str = Field(..., title="Human-readable response message") - instrument: str = Field(..., title="Instrument name (e.g., ``LATISS``)") - obs_type: ObsTypeEnum = Field(..., title="The observation type (e.g., ``exposure``)") - obs_id: ObservationIdType | list[ObservationIdType] = Field(..., title="Observation ID") + message: str = Field(title="Human-readable response message") + instrument: str = Field(title="Instrument name (e.g., ``LATISS``)") + obs_type: ObsTypeEnum = Field(title="The observation type (e.g., ``exposure``)") + obs_id: ObservationIdType | list[ObservationIdType] = Field(title="Observation ID") @app.post("/consdb/flex/{instrument}/{obs_type}/obs/{obs_id}") -async def insert_flexible_metadata( - instrument: Annotated[str, Depends(validate_instrument_name)], +def insert_flexible_metadata( + instrument: InstrumentName, obs_type: ObsTypeEnum, obs_id: ObservationIdType, - data: InsertDataModel = Body(..., title="Data to insert or update"), + data: InsertDataModel = Body(title="Data to insert or update"), u: Optional[int] = Query(0, title="Update if exists"), ) -> InsertFlexDataResponse: """Insert or update key/value pairs in a flexible metadata table.""" global instrument_tables - logger.info(f"POST /consdb/flex/{instrument}/{obs_type}/obs/{obs_id}") - table = instrument_tables.get_flexible_metadata_table(instrument, obs_type) - schema = instrument_tables.flexible_metadata_schemas[instrument][obs_type] + instrument_l = instrument.lower() + table = instrument_tables.get_flexible_metadata_table(instrument_l, obs_type) + schema = instrument_tables.flexible_metadata_schemas[instrument_l][obs_type] value_dict = data.values if any(key not in schema for key in value_dict): - instrument_tables.refresh_flexible_metadata_schema(instrument, obs_type) - schema = instrument_tables.flexible_metadata_schemas[instrument][obs_type] + instrument_tables.refresh_flexible_metadata_schema(instrument_l, obs_type) + schema = instrument_tables.flexible_metadata_schemas[instrument_l][obs_type] for key, value in value_dict.items(): if key not in schema: raise BadValueException("key", key, list(schema.keys())) @@ -634,43 +630,35 @@ async def insert_flexible_metadata( ) -class GenericResponse(BaseModel): - message: str = Field(..., title="Human-readable response message") - instrument: str = Field(..., title="Instrument name (e.g., ``LATISS``)") - obs_type: ObsTypeEnum = Field(..., title="The observation type (e.g., ``exposure``)") - obs_id: ObservationIdType | list[ObservationIdType] = Field(..., title="Observation ID") - table: Optional[str] = Field(..., title="Table name") - - class InsertDataResponse(BaseModel): - message: str = Field(..., title="Human-readable response message") - instrument: str = Field(..., title="Instrument name (e.g., ``LATISS``)") - obs_id: ObservationIdType | list[ObservationIdType] = Field(..., title="Observation ID") - table: Optional[str] = Field(..., title="Table name") + message: str = Field(title="Human-readable response message") + instrument: str = Field(title="Instrument name (e.g., ``LATISS``)") + obs_id: ObservationIdType | list[ObservationIdType] = Field(title="Observation ID") + table: str = Field(title="Table name") @app.post( "/consdb/insert/{instrument}/{table}/obs/{obs_id}", summary="Insert data row", ) -async def insert( - instrument: Annotated[str, Depends(validate_instrument_name)], +def insert( + instrument: InstrumentName, table: str, obs_id: ObservationIdType, - data: InsertDataModel = Body(..., title="Data to insert or update"), + data: InsertDataModel = Body(title="Data to insert or update"), u: Optional[int] = Query(0, title="Update if data already exist"), ) -> InsertDataResponse: """Insert or update column/value pairs in a ConsDB table.""" global instrument_tables - logger.info(f"POST /consdb/insert/{instrument}/{table}/obs/{obs_id}") - schema = f"cdb_{instrument}." + instrument_l = instrument.lower() + schema = f"cdb_{instrument_l}." table_name = table.lower() if not table.lower().startswith(schema): table_name = schema + table_name - table_obj = instrument_tables.schemas[instrument].tables[table_name] + table_obj = instrument_tables.schemas[instrument_l].tables[table_name] valdict = data.values - obs_id_colname = instrument_tables.obs_id_column[instrument][table_name] + obs_id_colname = instrument_tables.obs_id_column[instrument_l][table_name] valdict[obs_id_colname] = obs_id stmt: sqlalchemy.sql.dml.Insert @@ -691,20 +679,28 @@ async def insert( class InsertMultipleRequestModel(BaseModel): obs_dict: dict[ObservationIdType, dict[str, AllowedFlexType]] = Field( - ..., title="Observation ID and key/value pairs to insert or update" + title="Observation ID and key/value pairs to insert or update" ) +class InsertMultipleResponseModel(BaseModel): + message: str = Field(title="Human-readable response message") + instrument: str = Field(title="Instrument name (e.g., ``LATISS``)") + obs_type: ObsTypeEnum = Field(title="The observation type (e.g., ``exposure``)") + obs_id: ObservationIdType | list[ObservationIdType] = Field(title="Observation ID") + table: str = Field(title="Table name") + + @app.post( "/consdb/insert/{instrument}/{table}", summary="Insert multiple data rows", ) -async def insert_multiple( - instrument: Annotated[str, Depends(validate_instrument_name)], +def insert_multiple( + instrument: InstrumentName, table: str, - data: InsertMultipleRequestModel = Body(..., title="Data to insert or update"), + data: InsertMultipleRequestModel = Body(title="Data to insert or update"), u: Optional[int] = Query(0, title="Update if data already exist"), -) -> dict[str, Any] | tuple[dict[str, str], int]: +) -> InsertMultipleResponseModel: """Insert or update multiple observations in a ConsDB table. Raises @@ -717,15 +713,15 @@ async def insert_multiple( """ global instrument_tables - logger.info(f"POST /consdb/insert/{instrument}/{table}") - schema = f"cdb_{instrument}." + instrument_l = instrument.lower() + schema = f"cdb_{instrument_l}." table_name = table.lower() if not table.lower().startswith(schema): table_name = schema + table_name - table_obj = instrument_tables.schemas[instrument].tables[table_name] - table_name = f"cdb_{instrument}." + table.lower() - table = instrument_tables.schemas[instrument].tables[table_name] - obs_id_colname = instrument_tables.obs_id_column[instrument][table_name] + table_obj = instrument_tables.schemas[instrument_l].tables[table_name] + table_name = f"cdb_{instrument_l}." + table.lower() + table = instrument_tables.schemas[instrument_l].tables[table_name] + obs_id_colname = instrument_tables.obs_id_column[instrument_l][table_name] with engine.connect() as conn: for obs_id, valdict in data.obs_dict.items(): @@ -740,7 +736,7 @@ async def insert_multiple( _ = conn.execute(stmt) conn.commit() - return GenericResponse( + return InsertMultipleResponseModel( message="Data inserted", table=table_name, instrument=instrument, @@ -753,8 +749,8 @@ async def insert_multiple( summary="Get all metadata", description="Get all metadata for a given observation.", ) -async def get_all_metadata( - instrument: Annotated[str, Depends(validate_instrument_name)], +def get_all_metadata( + instrument: InstrumentName, obs_type: ObsTypeEnum, obs_id: ObservationIdType, flex: Optional[int] = Query(0, title="Include flexible metadata"), @@ -783,7 +779,6 @@ async def get_all_metadata( """ global instrument_tables - logger.info(f"GET /consdb/query/{instrument}/{obs_type}/obs/{obs_id}") instrument = instrument.lower() obs_type = obs_type.lower() view_name = instrument_tables.compute_wide_view_name(instrument, obs_type) @@ -803,17 +798,17 @@ async def get_all_metadata( class QueryRequestModel(BaseModel): - query: str = Field(..., title="SQL query string") + query: str = Field(title="SQL query string") class QueryResponseModel(BaseModel): - columns: list[str] = Field(..., title="Column names") - data: list[Any] = Field(..., title="Data rows") + columns: list[str] = Field(title="Column names") + data: list[Any] = Field(title="Data rows") @app.post("/consdb/query") -async def query( - data: QueryRequestModel = Body(..., title="SQL query string"), +def query( + data: QueryRequestModel = Body(title="SQL query string"), ) -> QueryResponseModel: """Query the ConsDB database. @@ -830,7 +825,6 @@ async def query( of string column names and a ``data`` key with value being a list of rows. """ - logger.info("POST /consdb/query") columns = [] rows = [] @@ -838,7 +832,7 @@ async def query( cursor = conn.exec_driver_sql(data.query) first = True for row in cursor: - logger.info(row) + logger.debug(row) if first: columns.extend(row._fields) first = False @@ -851,29 +845,27 @@ async def query( @app.get("/consdb/schema") -async def list_instruments() -> list[str]: +def list_instruments() -> list[str]: """Retrieve the list of instruments available in ConsDB.""" global instrument_tables - logger.info("GET /consdb/schema") return instrument_tables.instrument_list @app.get("/consdb/schema/{instrument}") -async def list_table( - instrument: Annotated[str, Depends(validate_instrument_name)], +def list_table( + instrument: InstrumentName, ) -> list[str]: """Retrieve the list of tables for an instrument.""" global instrument_tables - logger.info(f"GET /consdb/schema/{instrument}") - schema = instrument_tables.schemas[instrument] + schema = instrument_tables.schemas[instrument.lower()] return list(schema.tables.keys()) @app.get("/consdb/schema/{instrument}/
") -async def schema( - instrument: Annotated[str, Depends(validate_instrument_name)], table: str +def schema( + instrument: InstrumentName, ) -> dict[str, list[str]]: """Retrieve the descriptions of columns in a ConsDB table. @@ -898,10 +890,10 @@ async def schema( """ global instrument_tables - logger.info("GET /consdb/schema/{instrument}/{table}") - schema = instrument_tables.schemas[instrument] - if not table.startswith(f"cdb_{instrument}."): - table = f"cdb_{instrument}.{table}" + instrument_l = instrument.lower() + schema = instrument_tables.schemas[instrument_l] + if not table.startswith(f"cdb_{instrument_l}."): + table = f"cdb_{instrument_l}.{table}" table = table.lower() if table not in schema.tables: raise BadValueException("table", table, list(schema.tables.keys())) diff --git a/tests/test_pqserver.py b/tests/test_pqserver.py index c2375d8d..95289b5a 100644 --- a/tests/test_pqserver.py +++ b/tests/test_pqserver.py @@ -106,7 +106,7 @@ def test_flexible_metadata(client): assert result == { "message": "Key added to flexible metadata", "key": "bar", - "instrument": "latiss", + "instrument": "LATISS", "obs_type": "exposure", } From 4bf464ff9210adf27850d8f22f62e7411a632ee2 Mon Sep 17 00:00:00 2001 From: Brian Brondel Date: Fri, 2 Aug 2024 15:37:12 -0700 Subject: [PATCH 11/13] Lint and isort --- python/lsst/consdb/pqserver.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/python/lsst/consdb/pqserver.py b/python/lsst/consdb/pqserver.py index b1c7ec7c..7168421b 100644 --- a/python/lsst/consdb/pqserver.py +++ b/python/lsst/consdb/pqserver.py @@ -106,6 +106,7 @@ def validate_instrument_name( ) return instrument + InstrumentName = Annotated[str, AfterValidator(validate_instrument_name)] @@ -863,9 +864,10 @@ def list_table( return list(schema.tables.keys()) -@app.get("/consdb/schema/{instrument}/
") +@app.get("/consdb/schema/{instrument}/{table}") def schema( instrument: InstrumentName, + table: str = Field(title="Table name to retrieve schema"), ) -> dict[str, list[str]]: """Retrieve the descriptions of columns in a ConsDB table. From 23b6677258ab2c949a6766d575a7943e630a3da7 Mon Sep 17 00:00:00 2001 From: Brian Brondel Date: Fri, 2 Aug 2024 15:40:32 -0700 Subject: [PATCH 12/13] Fix broken tests --- python/lsst/consdb/pqserver.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/lsst/consdb/pqserver.py b/python/lsst/consdb/pqserver.py index 7168421b..0aec75d1 100644 --- a/python/lsst/consdb/pqserver.py +++ b/python/lsst/consdb/pqserver.py @@ -866,8 +866,8 @@ def list_table( @app.get("/consdb/schema/{instrument}/{table}") def schema( - instrument: InstrumentName, - table: str = Field(title="Table name to retrieve schema"), + instrument: InstrumentName = Path(description="Instrument name"), + table: str = Path(description="Table name to retrieve schema"), ) -> dict[str, list[str]]: """Retrieve the descriptions of columns in a ConsDB table. From fb4015c44235c5d5013cca5231bef0ace054da9d Mon Sep 17 00:00:00 2001 From: Brian Brondel Date: Fri, 2 Aug 2024 15:42:57 -0700 Subject: [PATCH 13/13] Install httpx for pytest --- .github/workflows/pytest.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/pytest.yaml b/.github/workflows/pytest.yaml index bf5e7a46..32382a70 100644 --- a/.github/workflows/pytest.yaml +++ b/.github/workflows/pytest.yaml @@ -25,7 +25,7 @@ jobs: - name: Editable mode install run: | python -m pip install uv - uv pip install --system pytest pytest-cov pytest-html pytest-asyncio + uv pip install --system pytest pytest-cov pytest-html pytest-asyncio httpx uv pip install --system -e . - name: Test with pytest run: |