Skip to content

Commit

Permalink
add link_sequencing_data_file method
Browse files Browse the repository at this point in the history
  • Loading branch information
mbthornton-lbl committed Dec 18, 2024
1 parent a653094 commit 12cd1ca
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 34 deletions.
46 changes: 46 additions & 0 deletions nmdc_automation/import_automation/activity_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,52 @@ def build_workflows_by_type(self) -> Dict:
"""Builds a dictionary of workflows by their type."""
return {wf["Type"]: wf for wf in self.import_data["Workflows"]}


def link_sequencing_data_file(self) -> Dict[str, dict]:
"""
Create a link to the sequencing file if it does not exist.
Return a dictionary with the sequencing data object record by md5 checksum.
"""
sequencing_types = ["Metagenome Raw Reads", ]
sequencing_import_data = [
d for d in self.import_data["Data Objects"]["Unique"] if d["data_object_type"] in sequencing_types
]
sequencing_data = {}
# make the root directory if it does not exist
try:
os.makedirs(self.root_dir)
except FileExistsError:
logger.info(f"{self.root_dir} already exists")
for data_object_dict in sequencing_import_data:
for import_file in self.file_list:
import_file = str(import_file)
if re.search(data_object_dict["import_suffix"], import_file):
file_destination_name = object_action(
import_file,
data_object_dict["action"],
self.nucelotide_sequencing_id,
data_object_dict["nmdc_suffix"],
)
export_file = os.path.join(self.root_dir, file_destination_name)
try:
os.link(import_file, export_file)
logger.info(f"Linked {import_file} to {export_file}")
except FileExistsError:
logger.info(f"{export_file} already exists")
md5 = get_md5(export_file)
sequencing_data[md5] = {
"name": file_destination_name,
"file_size_bytes": os.stat(export_file).st_size,
"md5_checksum": md5,
"data_object_type": data_object_dict["data_object_type"],
"description": data_object_dict["description"].replace(
"{id}", self.nucelotide_sequencing_id
)
}
return sequencing_data



def map_sequencing_data(self) -> Tuple[nmdc.Database, Dict]:
"""
Map sequencing data to an NMDC data object and create an update to be applied to the has_output
Expand Down
91 changes: 57 additions & 34 deletions nmdc_automation/run_process/run_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from nmdc_automation.import_automation import GoldMapper
from nmdc_automation.api import NmdcRuntimeApi
from nmdc_schema.nmdc import Database


logging.basicConfig(level=logging.INFO)
Expand Down Expand Up @@ -53,55 +54,77 @@ def import_projects(import_file, import_yaml, site_configuration, iteration):
project_path,
runtime,
)
db = Database()
data_generation_update = None

# Nucleotide sequencing data
logger.info(f"Checking for existing nucleotide_sequencing_id: {nucleotide_sequencing_id}")
nucleotide_sequencing = runtime.get_planned_process(nucleotide_sequencing_id)
if not nucleotide_sequencing:
logger.error(f"nucleotide_sequencing_id {nucleotide_sequencing_id} not found")
continue
# Check if the nucleotide sequencing has outputs
sequence_data = nucleotide_sequencing.get("has_output", [])
if sequence_data:
logger.info(f"nucleotide_sequencing_id {nucleotide_sequencing_id} has outputs")
logger.info(f"Output data objects: {sequence_data}")
continue
else:
logger.info(f"nucleotide_sequencing_id {nucleotide_sequencing_id} has no outputs")
# link sequencing data files and create data objects
# Initialize the db with the sequencing data object and create an update to be applied



# Initialize the db with the sequencing data and create an update to be applied
# to the sequencing data generation has_output list
logger.info("Mapping sequencing data")
db, data_generation_update = mapper.map_sequencing_data()
# logger.info("Mapping sequencing data")
# db, data_generation_update = mapper.map_sequencing_data()
# Map the rest of the data files - single files
logger.info("Mapping single data files")
db, do_mapping = mapper.map_data(db)
# logger.info("Mapping single data files")
# db, do_mapping = mapper.map_data(db)
# Map the rest of the data files - multiple files
logger.info("Mapping multiple data files")
db, do_mapping = mapper.map_data(db, unique=False)
# logger.info("Mapping multiple data files")
# db, do_mapping = mapper.map_data(db, unique=False)

# map the workflow executions
logger.info("Mapping workflow executions")
db = mapper.map_workflow_executions(db)
# db = mapper.map_workflow_executions(db)

# validate the database
logger.info("Validating imported data")
db_dict = yaml.safe_load(yaml_dumper.dumps(db))
del db # free up memory
del do_mapping # free up memory
validation_report = linkml.validator.validate(db_dict, nmdc_materialized)
if validation_report.results:
logger.error(f"Validation Failed")
for result in validation_report.results:
logger.error(result.message)
raise Exception("Validation Failed")
else:
logger.info("Validation Passed")
# logger.info("Validating imported data")
# db_dict = yaml.safe_load(yaml_dumper.dumps(db))
# del db # free up memory
# del do_mapping # free up memory
# validation_report = linkml.validator.validate(db_dict, nmdc_materialized)
# if validation_report.results:
# logger.error(f"Validation Failed")
# for result in validation_report.results:
# logger.error(result.message)
# raise Exception("Validation Failed")
# else:
# logger.info("Validation Passed")

# apply the update to the sequencing data generation has_output list
logger.info("Applying update to sequencing data generation")
try:
runtime.run_query(data_generation_update)
except Exception as e:
logger.error(f"Error applying update to sequencing data generation: {e}")
logger.error(data_generation_update)
raise e
# logger.info("Applying update to sequencing data generation")
# try:
# runtime.run_query(data_generation_update)
# except Exception as e:
# logger.error(f"Error applying update to sequencing data generation: {e}")
# logger.error(data_generation_update)
# raise e


# Post the data to the API
logger.info("Posting data to the API")
try:
runtime.post_objects(db_dict)
del db_dict # free up memory
except Exception as e:
logger.error(f"Error posting data to the API: {e}")
raise e

gc.collect()
# try:
# runtime.post_objects(db_dict)
# del db_dict # free up memory
# except Exception as e:
# logger.error(f"Error posting data to the API: {e}")
# raise e
#
# gc.collect()



Expand Down

0 comments on commit 12cd1ca

Please sign in to comment.