diff --git a/src/api/create.py b/src/api/create.py index 7a80b21..e774bea 100644 --- a/src/api/create.py +++ b/src/api/create.py @@ -9,6 +9,7 @@ import numpy as np import pandas as pd import pyarrow.parquet as pq +from psycopg2.extras import Json from sqlalchemy import MetaData, create_engine, text from sqlalchemy_utils.functions import ( create_database, @@ -17,7 +18,6 @@ ) from sqlmodel import SQLModel from tqdm import tqdm -from psycopg2.extras import Json # Do not remove. Sqlalchemy needs this import to create tables from . import models # noqa: F401 @@ -36,14 +36,16 @@ class Context(str, Enum): DQV = "http://www.w3.org/ns/dqv#" SDMX = "http://purl.org/linked-data/sdmx/2009/measure#" + base_context = { - "dct": Context.DCT, + "dct": Context.DCT, "schema": Context.SCHEMA, - "dqv": Context.DQV, - "sdmx-measure": Context.SDMX, - "dcat": Context.DCAT, + "dqv": Context.DQV, + "sdmx-measure": Context.SDMX, + "dcat": Context.DCAT, } + class URLType(Enum): """Enum type for different types of storage endpoint""" @@ -130,9 +132,8 @@ def create_cpf_summary(self, data_path: Path): dfs = [pd.read_parquet(path) for path in paths] df = pd.concat(dfs).reset_index(drop=True) df["context"] = [Json(base_context)] * len(df) - df = df.drop_duplicates(subset=['name']) + df = df.drop_duplicates(subset=["name"]) df.to_sql("cpf_summary", self.uri, if_exists="append") - def create_scenarios(self, data_path: Path): """Create the scenarios metadata table""" @@ -238,7 +239,15 @@ def create_sources(self, data_path: Path): + ".zarr/" + source_metadata["name"] ) - column_names = ["uuid", "shot_id", "name", "description", "quality", "url", "context"] + column_names = [ + "uuid", + "shot_id", + "name", + "description", + "quality", + "url", + "context", + ] source_metadata = source_metadata[column_names] source_metadata.to_sql("sources", self.uri, if_exists="append", index=False) diff --git a/src/api/crud.py b/src/api/crud.py index 2eb4ca9..0ed7fac 100644 --- a/src/api/crud.py +++ b/src/api/crud.py @@ -67,7 +67,7 @@ def apply_filters(query: Query, filters: str) -> Query: def apply_sorting(query: Query, sort: t.Optional[str] = None) -> Query: if sort is None: return query - + if sort.startswith("-"): sort = sort[1:] order = desc(column(sort)) diff --git a/src/api/main.py b/src/api/main.py index 0e2f569..c9a3e7c 100644 --- a/src/api/main.py +++ b/src/api/main.py @@ -1,15 +1,14 @@ import datetime import io +import json import os +import re import uuid from typing import List, Optional import pandas as pd import sqlmodel import ujson -import re -import json - from fastapi import ( Depends, FastAPI, @@ -22,8 +21,6 @@ from fastapi.encoders import jsonable_encoder from fastapi.exceptions import RequestValidationError from fastapi.responses import JSONResponse - - from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from fastapi_pagination import add_pagination @@ -36,12 +33,10 @@ from . import crud, graphql, models from .database import get_db -from .models import CPFSummaryModel, ScenarioModel, ShotModel, SignalModel, SourceModel templates = Jinja2Templates(directory="src/api/templates") - class JSONLDGraphQL(GraphQL): async def process_result( self, request: Request, result: ExecutionResult @@ -196,24 +191,24 @@ def __init__( class CustomJSONResponse(JSONResponse): """ - serializes the result of a database query (a dictionary) into a JSON-readable format + serializes the result of a database query (a dictionary) into a JSON-readable format """ - + media_type = "application/json" - - def render(self, content)-> bytes: + + def render(self, content) -> bytes: """ renders the output of the request """ content = self.convert_to_jsonld_terms(content) extracted_dict = {} edited_content = self.extract_meta_key(content, extracted_dict) - + # merge content with extracted context by placing context at the top merged_content = {**extracted_dict, **edited_content} - + return json.dumps(merged_content).encode() - + def convert_to_jsonld_terms(self, items): """ Replaces '__' with ':', and [A-Za-z_] with [@A-Za-z] in the mapping of terms (column names) to @@ -222,7 +217,6 @@ def convert_to_jsonld_terms(self, items): if not isinstance(items, dict): return items for key, val in list(items.items()): - # Recursive key modification if value is a dictionary or list object if isinstance(val, list): items[key] = [self.convert_to_jsonld_terms(item) for item in val] @@ -237,9 +231,9 @@ def convert_to_jsonld_terms(self, items): def extract_meta_key(self, content, extracted_dict): """ - Extract keys and values of @context and @type from the dictionary to - return them at the top of the dictionary as one entity for the whole dictionary, - rather than each for each item since they contain the same key and values + Extract keys and values of @context and @type from the dictionary to + return them at the top of the dictionary as one entity for the whole dictionary, + rather than each for each item since they contain the same key and values """ target_keys = ["@context", "@type", "dct:title"] for k, v in list(content.items()): @@ -255,7 +249,6 @@ def extract_meta_key(self, content, extracted_dict): self.extract_meta_key(item, extracted_dict) # return popped content return content - def apply_pagination( @@ -306,7 +299,7 @@ def query_aggregate( "/json/shots", description="Get information about experimental shots", response_model=CursorPage[models.ShotModel], - response_class=CustomJSONResponse + response_class=CustomJSONResponse, ) def get_shots( db: Session = Depends(get_db), params: QueryParams = Depends() @@ -314,10 +307,11 @@ def get_shots( if params.sort is None: params.sort = "shot_id" - query = crud.select_query(models.ShotModel, params.fields, params.filters, params.sort) - - return paginate(db, query) + query = crud.select_query( + models.ShotModel, params.fields, params.filters, params.sort + ) + return paginate(db, query) @app.get("/json/shots/aggregate") @@ -335,7 +329,7 @@ def get_shots_aggregate( "/json/shots/{shot_id}", description="Get information about a single experimental shot", response_model=models.ShotModel, - response_class=CustomJSONResponse + response_class=CustomJSONResponse, ) def get_shot(db: Session = Depends(get_db), shot_id: int = None) -> models.ShotModel: shot = crud.get_shot(shot_id) @@ -347,7 +341,7 @@ def get_shot(db: Session = Depends(get_db), shot_id: int = None) -> models.ShotM "/json/shots/{shot_id}/signals", description="Get information all signals for a single experimental shot", response_model=CursorPage[models.SignalModel], - response_class=CustomJSONResponse + response_class=CustomJSONResponse, ) def get_signals_for_shot( db: Session = Depends(get_db), @@ -362,21 +356,26 @@ def get_signals_for_shot( # Get signals for this shot params.filters.append(f"shot_id$eq:{shot['shot_id']}") - query = crud.select_query(models.SignalModel, params.fields, params.filters, params.sort) + query = crud.select_query( + models.SignalModel, params.fields, params.filters, params.sort + ) return paginate(db, query) + @app.get( "/json/signals", description="Get information about specific signals.", response_model=CursorPage[models.SignalModel], - response_class=CustomJSONResponse - ) + response_class=CustomJSONResponse, +) def get_signals( db: Session = Depends(get_db), params: QueryParams = Depends() - ) -> CursorPage[models.SignalModel]: +) -> CursorPage[models.SignalModel]: if params.sort is None: params.sort = "uuid" - query = crud.select_query(models.SignalModel, params.fields, params.filters, params.sort) + query = crud.select_query( + models.SignalModel, params.fields, params.filters, params.sort + ) return paginate(db, query) @@ -397,23 +396,23 @@ def get_signals_aggregate( description="Get information about a single signal", response_model_exclude_unset=True, response_model=models.SignalModel, - response_class=CustomJSONResponse + response_class=CustomJSONResponse, ) def get_signal( - db: Session = Depends(get_db), uuid_: uuid.UUID = None) -> models.SignalModel: + db: Session = Depends(get_db), uuid_: uuid.UUID = None +) -> models.SignalModel: signal = crud.get_signal(uuid_) signal = crud.execute_query_one(db, signal) return signal - @app.get( "/json/signals/{uuid_}/shot", description="Get information about the shot for a single signal", response_model_exclude_unset=True, response_model=models.ShotModel, - response_class=CustomJSONResponse + response_class=CustomJSONResponse, ) def get_shot_for_signal( db: Session = Depends(get_db), uuid_: uuid.UUID = None @@ -429,7 +428,7 @@ def get_shot_for_signal( "/json/cpf_summary", description="Get descriptions of CPF summary variables.", response_model=CursorPage[models.CPFSummaryModel], - response_class=CustomJSONResponse + response_class=CustomJSONResponse, ) def get_cpf_summary( db: Session = Depends(get_db), params: QueryParams = Depends() @@ -447,7 +446,7 @@ def get_cpf_summary( "/json/scenarios", description="Get information on different scenarios.", response_model=CursorPage[models.ScenarioModel], - response_class=CustomJSONResponse + response_class=CustomJSONResponse, ) def get_scenarios( db: Session = Depends(get_db), params: QueryParams = Depends() @@ -455,7 +454,9 @@ def get_scenarios( if params.sort is None: params.sort = "id" - query = crud.select_query(models.ScenarioModel, params.fields, params.filters, params.sort) + query = crud.select_query( + models.ScenarioModel, params.fields, params.filters, params.sort + ) return paginate(db, query) @@ -463,7 +464,7 @@ def get_scenarios( "/json/sources", description="Get information on different sources.", response_model=CursorPage[models.SourceModel], - response_class=CustomJSONResponse + response_class=CustomJSONResponse, ) def get_sources( db: Session = Depends(get_db), params: QueryParams = Depends() @@ -471,13 +472,17 @@ def get_sources( if params.sort is None: params.sort = "name" - query = crud.select_query(models.SourceModel, params.fields, params.filters, params.sort) + query = crud.select_query( + models.SourceModel, params.fields, params.filters, params.sort + ) return paginate(db, query) -@app.get("/json/sources/aggregate", - response_model=models.SourceModel, - response_class=CustomJSONResponse) +@app.get( + "/json/sources/aggregate", + response_model=models.SourceModel, + response_class=CustomJSONResponse, +) def get_sources_aggregate( request: Request, response: Response, @@ -492,7 +497,7 @@ def get_sources_aggregate( "/json/sources/{name}", description="Get information about a single signal", response_model=models.SourceModel, - response_class=CustomJSONResponse + response_class=CustomJSONResponse, ) def get_single_source( db: Session = Depends(get_db), name: str = None @@ -501,6 +506,7 @@ def get_single_source( source = db.execute(source).one()[0] return source + def ndjson_stream_query(db, query): STREAM_SIZE = 1000 offset = 0 diff --git a/src/api/models.py b/src/api/models.py index 00db352..df89782 100644 --- a/src/api/models.py +++ b/src/api/models.py @@ -1,26 +1,26 @@ -from typing import Optional, List, Dict +import datetime +import uuid as uuid_pkg +from typing import Dict, List, Optional + from sqlalchemy import ( + ARRAY, Column, + Enum, Integer, - ARRAY, Text, - Enum, ) from sqlalchemy.dialects.postgresql import JSONB -import datetime -import uuid as uuid_pkg +from sqlmodel import Field, Relationship, SQLModel from .types import ( + Comissioner, CurrentRange, DivertorConfig, - PlasmaShape, - Comissioner, Facility, - SignalType, + PlasmaShape, Quality, + SignalType, ) -from sqlmodel import Field, SQLModel, Relationship - class SignalModel(SQLModel, table=True): @@ -30,70 +30,77 @@ class SignalModel(SQLModel, table=True): sa_column=Column(JSONB), default={}, description="Context mapping vocabulary to IRIs", - alias= "context_") - - type: str = Field(sa_column_kwargs = {"server_default": "dcat:Dataset"}, - description="a structured set of data", alias="type_") + alias="context_", + ) - title: str = Field(sa_column_kwargs = {"server_default": "MASTU Signal dataset"}, - description="the title of the dataset", - alias="dct__title") + type: str = Field( + sa_column_kwargs={"server_default": "dcat:Dataset"}, + description="a structured set of data", + alias="type_", + ) + + title: str = Field( + sa_column_kwargs={"server_default": "MASTU Signal dataset"}, + description="the title of the dataset", + alias="dct__title", + ) uuid: uuid_pkg.UUID = Field( primary_key=True, default=None, description="UUID for a specific signal data", - alias="dct__identifier" + alias="dct__identifier", ) shot_id: int = Field( foreign_key="shots.shot_id", nullable=False, - description="ID of the shot this signal was produced by." + description="ID of the shot this signal was produced by.", ) name: str = Field( description="Human readable name of this specific signal. A combination of the signal type and the shot number e.g. AMC_PLASMA_CURRENT", - alias="schema__name") + alias="schema__name", + ) - version: int = Field(description="Version number of this dataset", - alias="schema__version") + version: int = Field( + description="Version number of this dataset", alias="schema__version" + ) rank: int = Field(description="Rank of the shape of this signal.") - url: str = Field(description="The URL for the location of this signal.", - alias="schema__url") + url: str = Field( + description="The URL for the location of this signal.", alias="schema__url" + ) source: str = Field( - description="Name of the source this signal belongs to.", - alias="dct__source") + description="Name of the source this signal belongs to.", alias="dct__source" + ) quality: Quality = Field( sa_column=Column( Enum(Quality, values_callable=lambda obj: [e.value for e in obj]) ), alias="dqv__QualityAnnotation", - description="Quality flag for this signal." + description="Quality flag for this signal.", ) shape: Optional[List[int]] = Field( sa_column=Column(ARRAY(Integer)), - description="Shape of each dimension of this signal. e.g. [10, 100, 3]" + description="Shape of each dimension of this signal. e.g. [10, 100, 3]", ) provenance: Optional[Dict] = Field( default={}, sa_column=Column(JSONB), - description="Information about the provenance graph that generated this signal in the PROV standard." + description="Information about the provenance graph that generated this signal in the PROV standard.", ) units: Optional[str] = Field( - description="The units of data contained within this dataset." ) description: str = Field( - sa_column=Column(Text), description="The description of the dataset." ) @@ -101,16 +108,17 @@ class SignalModel(SQLModel, table=True): sa_column=Column( Enum(SignalType, values_callable=lambda obj: [e.value for e in obj]) ), - description="The type of the signal dataset. e.g. 'Raw', 'Analysed'" + description="The type of the signal dataset. e.g. 'Raw', 'Analysed'", ) dimensions: Optional[List[str]] = Field( sa_column=Column(ARRAY(Text)), - description="The dimension names of the dataset, in order. e.g. ['time', 'radius']" + description="The dimension names of the dataset, in order. e.g. ['time', 'radius']", ) shot: "ShotModel" = Relationship(back_populates="signals") + class SourceModel(SQLModel, table=True): __tablename__ = "sources" @@ -118,21 +126,26 @@ class SourceModel(SQLModel, table=True): sa_column=Column(JSONB), default={}, description="Context mapping vocabulary to IRIs", - alias= "context_" + alias="context_", + ) + + type: str = Field( + sa_column_kwargs={"server_default": "dcat:Dataset"}, + description="a structured set of data", + alias="type_", + ) + + title: str = Field( + sa_column_kwargs={"server_default": "MASTU Source dataset"}, + description="the title of the dataset", + alias="dct__title", ) - type: str = Field(sa_column_kwargs = {"server_default": "dcat:Dataset"}, - description="a structured set of data", alias="type_") - - title: str = Field(sa_column_kwargs = {"server_default": "MASTU Source dataset"}, - description="the title of the dataset", - alias="dct__title") - uuid: uuid_pkg.UUID = Field( primary_key=True, default=None, description="UUID for a specific source data", - alias="dct__identifier", + alias="dct__identifier", ) shot_id: int = Field( @@ -142,17 +155,17 @@ class SourceModel(SQLModel, table=True): ) name: str = Field( - nullable=False, - description="Short name of the source.", - alias="schema__name" + nullable=False, description="Short name of the source.", alias="schema__name" ) - url: str = Field(description="The URL for the location of this source.", - alias="schema__url") + url: str = Field( + description="The URL for the location of this source.", alias="schema__url" + ) description: str = Field( - sa_column=Column(Text), description="Description of this source", - alias="dct__description" + sa_column=Column(Text), + description="Description of this source", + alias="dct__description", ) quality: Quality = Field( @@ -160,13 +173,12 @@ class SourceModel(SQLModel, table=True): Enum(Quality, values_callable=lambda obj: [e.value for e in obj]) ), description="Quality flag for this source.", - alias="dqv__QualityAnnotation" + alias="dqv__QualityAnnotation", ) shot: "ShotModel" = Relationship(back_populates="sources") - class CPFSummaryModel(SQLModel, table=True): __tablename__ = "cpf_summary" @@ -176,17 +188,30 @@ class CPFSummaryModel(SQLModel, table=True): sa_column=Column(JSONB), default={}, description="Context mapping vocabulary to IRIs", - alias= "context_") - - type: str = Field(sa_column_kwargs = {"server_default": "dcat:Dataset"}, - description="a structured set of data", alias="type_") - - title: str = Field(sa_column_kwargs = {"server_default": "MASTU CPF summary dataset"}, - description="the title of the dataset", - alias="dct__title") - - name: str = Field(sa_column=Column(Text), description="Name of the CPF variable.", alias="schema__name") - description: str = Field("Description of the CPF variable", alias="dct__description") + alias="context_", + ) + + type: str = Field( + sa_column_kwargs={"server_default": "dcat:Dataset"}, + description="a structured set of data", + alias="type_", + ) + + title: str = Field( + sa_column_kwargs={"server_default": "MASTU CPF summary dataset"}, + description="the title of the dataset", + alias="dct__title", + ) + + name: str = Field( + sa_column=Column(Text), + description="Name of the CPF variable.", + alias="schema__name", + ) + description: str = Field( + "Description of the CPF variable", alias="dct__description" + ) + class ScenarioModel(SQLModel, table=True): __tablename__ = "scenarios" @@ -195,34 +220,49 @@ class ScenarioModel(SQLModel, table=True): sa_column=Column(JSONB), default={}, description="Context mapping vocabulary to IRIs", - alias= "context_") - type: str = Field(sa_column_kwargs = {"server_default": "dcat:Dataset"}, - description="a structured set of data", alias="type_") - title: str = Field(sa_column_kwargs = {"server_default": "MASTU Scenario dataset"}, - description="the title of the dataset", - alias="dct__title") - - id: int = Field(primary_key=True, nullable=False, alias="dct__identifier",) - name: str = Field(description="Name of the scenario.", alias="schema__name") + alias="context_", + ) + type: str = Field( + sa_column_kwargs={"server_default": "dcat:Dataset"}, + description="a structured set of data", + alias="type_", + ) + title: str = Field( + sa_column_kwargs={"server_default": "MASTU Scenario dataset"}, + description="the title of the dataset", + alias="dct__title", + ) + id: int = Field( + primary_key=True, + nullable=False, + alias="dct__identifier", + ) + name: str = Field(description="Name of the scenario.", alias="schema__name") class ShotModel(SQLModel, table=True): __tablename__ = "shots" - + context: Dict = Field( sa_column=Column(JSONB), default={}, description="Context mapping vocabulary to IRIs", - alias= "context_") - - type: str = Field(sa_column_kwargs = {"server_default": "dcat:Dataset"}, - description="a structured set of data", alias="type_") - - title: str = Field(sa_column_kwargs = {"server_default": "MASTU Shot dataset"}, - description="the title of the dataset", - alias="dct__title") - + alias="context_", + ) + + type: str = Field( + sa_column_kwargs={"server_default": "dcat:Dataset"}, + description="a structured set of data", + alias="type_", + ) + + title: str = Field( + sa_column_kwargs={"server_default": "MASTU Shot dataset"}, + description="the title of the dataset", + alias="dct__title", + ) + shot_id: int = Field( primary_key=True, index=True, @@ -241,12 +281,12 @@ class ShotModel(SQLModel, table=True): url: str = Field( sa_column=Column(Text), description="The URL to this dataset", - alias="schema__url" + alias="schema__url", ) timestamp: datetime.datetime = Field( description='Time the shot was fired in ISO 8601 format. e.g. "2023‐08‐10T09:51:19+00:00"', - alias="dct__date" + alias="dct__date", ) preshot_description: str = Field( @@ -321,7 +361,6 @@ class ShotModel(SQLModel, table=True): description="The facility (tokamak) that produced this shot. e.g. 'MAST'", ) - signals: List["SignalModel"] = Relationship(back_populates="shot") sources: List["SourceModel"] = Relationship(back_populates="shot") diff --git a/tests/api/test_archive.py b/tests/api/test_archive.py index b7f65a2..0bbb732 100644 --- a/tests/api/test_archive.py +++ b/tests/api/test_archive.py @@ -2,7 +2,7 @@ import pytest -pyuda_import = pytest.importorskip("pyuda") +pyuda_import = pytest.importorskip("pyuda") from src.archive.main import ( # noqa: E402 DatasetReader, @@ -14,21 +14,17 @@ def test_write_diagnostic_signal(benchmark): - source = 'AMC' - output_path = f's3://mast/{source}' - fs_config = Path('~/.s3cfg').expanduser() + source = "AMC" + output_path = f"s3://mast/{source}" + fs_config = Path("~/.s3cfg").expanduser() config = read_config(fs_config) fs = get_file_system(config) reader = DatasetReader() writer = DatasetWriter(output_path, fs=fs) - metadata = { - 'shot': 30420, - 'dataset_item_uuid': 'abc', - 'status': 0 - } - name = 'AMC_PLASMA CURRENT' + metadata = {"shot": 30420, "dataset_item_uuid": "abc", "status": 0} + name = "AMC_PLASMA CURRENT" result = _do_write_signal(metadata, name, reader, writer, True) - assert isinstance(result, dict) \ No newline at end of file + assert isinstance(result, dict)