diff --git a/nmdc_automation/import_automation/activity_mapper.py b/nmdc_automation/import_automation/activity_mapper.py index 87970b51..4240504f 100644 --- a/nmdc_automation/import_automation/activity_mapper.py +++ b/nmdc_automation/import_automation/activity_mapper.py @@ -63,6 +63,52 @@ def build_workflows_by_type(self) -> Dict: """Builds a dictionary of workflows by their type.""" return {wf["Type"]: wf for wf in self.import_data["Workflows"]} + + def link_sequencing_data_file(self) -> Dict[str, dict]: + """ + Create a link to the sequencing file if it does not exist. + Return a dictionary with the sequencing data object record by md5 checksum. + """ + sequencing_types = ["Metagenome Raw Reads", ] + sequencing_import_data = [ + d for d in self.import_data["Data Objects"]["Unique"] if d["data_object_type"] in sequencing_types + ] + sequencing_data = {} + # make the root directory if it does not exist + try: + os.makedirs(self.root_dir) + except FileExistsError: + logger.info(f"{self.root_dir} already exists") + for data_object_dict in sequencing_import_data: + for import_file in self.file_list: + import_file = str(import_file) + if re.search(data_object_dict["import_suffix"], import_file): + file_destination_name = object_action( + import_file, + data_object_dict["action"], + self.nucelotide_sequencing_id, + data_object_dict["nmdc_suffix"], + ) + export_file = os.path.join(self.root_dir, file_destination_name) + try: + os.link(import_file, export_file) + logger.info(f"Linked {import_file} to {export_file}") + except FileExistsError: + logger.info(f"{export_file} already exists") + md5 = get_md5(export_file) + sequencing_data[md5] = { + "name": file_destination_name, + "file_size_bytes": os.stat(export_file).st_size, + "md5_checksum": md5, + "data_object_type": data_object_dict["data_object_type"], + "description": data_object_dict["description"].replace( + "{id}", self.nucelotide_sequencing_id + ) + } + return sequencing_data + + + def map_sequencing_data(self) -> Tuple[nmdc.Database, Dict]: """ Map sequencing data to an NMDC data object and create an update to be applied to the has_output diff --git a/nmdc_automation/run_process/run_import.py b/nmdc_automation/run_process/run_import.py index 6301b3aa..a51e0797 100644 --- a/nmdc_automation/run_process/run_import.py +++ b/nmdc_automation/run_process/run_import.py @@ -11,6 +11,7 @@ from nmdc_automation.import_automation import GoldMapper from nmdc_automation.api import NmdcRuntimeApi +from nmdc_schema.nmdc import Database logging.basicConfig(level=logging.INFO) @@ -53,55 +54,77 @@ def import_projects(import_file, import_yaml, site_configuration, iteration): project_path, runtime, ) + db = Database() + data_generation_update = None + + # Nucleotide sequencing data + logger.info(f"Checking for existing nucleotide_sequencing_id: {nucleotide_sequencing_id}") + nucleotide_sequencing = runtime.get_planned_process(nucleotide_sequencing_id) + if not nucleotide_sequencing: + logger.error(f"nucleotide_sequencing_id {nucleotide_sequencing_id} not found") + continue + # Check if the nucleotide sequencing has outputs + sequence_data = nucleotide_sequencing.get("has_output", []) + if sequence_data: + logger.info(f"nucleotide_sequencing_id {nucleotide_sequencing_id} has outputs") + logger.info(f"Output data objects: {sequence_data}") + continue + else: + logger.info(f"nucleotide_sequencing_id {nucleotide_sequencing_id} has no outputs") + # link sequencing data files and create data objects + # Initialize the db with the sequencing data object and create an update to be applied + + + # Initialize the db with the sequencing data and create an update to be applied # to the sequencing data generation has_output list - logger.info("Mapping sequencing data") - db, data_generation_update = mapper.map_sequencing_data() + # logger.info("Mapping sequencing data") + # db, data_generation_update = mapper.map_sequencing_data() # Map the rest of the data files - single files - logger.info("Mapping single data files") - db, do_mapping = mapper.map_data(db) + # logger.info("Mapping single data files") + # db, do_mapping = mapper.map_data(db) # Map the rest of the data files - multiple files - logger.info("Mapping multiple data files") - db, do_mapping = mapper.map_data(db, unique=False) + # logger.info("Mapping multiple data files") + # db, do_mapping = mapper.map_data(db, unique=False) # map the workflow executions logger.info("Mapping workflow executions") - db = mapper.map_workflow_executions(db) + # db = mapper.map_workflow_executions(db) # validate the database - logger.info("Validating imported data") - db_dict = yaml.safe_load(yaml_dumper.dumps(db)) - del db # free up memory - del do_mapping # free up memory - validation_report = linkml.validator.validate(db_dict, nmdc_materialized) - if validation_report.results: - logger.error(f"Validation Failed") - for result in validation_report.results: - logger.error(result.message) - raise Exception("Validation Failed") - else: - logger.info("Validation Passed") + # logger.info("Validating imported data") + # db_dict = yaml.safe_load(yaml_dumper.dumps(db)) + # del db # free up memory + # del do_mapping # free up memory + # validation_report = linkml.validator.validate(db_dict, nmdc_materialized) + # if validation_report.results: + # logger.error(f"Validation Failed") + # for result in validation_report.results: + # logger.error(result.message) + # raise Exception("Validation Failed") + # else: + # logger.info("Validation Passed") # apply the update to the sequencing data generation has_output list - logger.info("Applying update to sequencing data generation") - try: - runtime.run_query(data_generation_update) - except Exception as e: - logger.error(f"Error applying update to sequencing data generation: {e}") - logger.error(data_generation_update) - raise e + # logger.info("Applying update to sequencing data generation") + # try: + # runtime.run_query(data_generation_update) + # except Exception as e: + # logger.error(f"Error applying update to sequencing data generation: {e}") + # logger.error(data_generation_update) + # raise e # Post the data to the API logger.info("Posting data to the API") - try: - runtime.post_objects(db_dict) - del db_dict # free up memory - except Exception as e: - logger.error(f"Error posting data to the API: {e}") - raise e - - gc.collect() + # try: + # runtime.post_objects(db_dict) + # del db_dict # free up memory + # except Exception as e: + # logger.error(f"Error posting data to the API: {e}") + # raise e + # + # gc.collect()