From 401c8414e803e2c789226779eb34905343ce84da Mon Sep 17 00:00:00 2001 From: Vamsi Thiriveedhi Date: Thu, 2 May 2024 06:46:48 -0400 Subject: [PATCH] enh: make sql query efficient and write to csv instead of iterator make download tests more robust by testing all reasonable combinations --- .github/workflows/ci.yml | 2 +- idc_index/index.py | 39 ++++++++------ tests/idcindex.py | 113 +++++++++++++++++++++++++++++++++------ 3 files changed, 122 insertions(+), 32 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 5f324efc..e71834f8 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -59,7 +59,7 @@ jobs: - name: Test package run: >- python -m pytest -ra --cov --cov-report=xml --cov-report=term - --durations=20 + --durations=20 -vv # - name: Upload coverage report # uses: codecov/codecov-action@v4.1.0 diff --git a/idc_index/index.py b/idc_index/index.py index 3083dc48..3771aa07 100644 --- a/idc_index/index.py +++ b/idc_index/index.py @@ -19,6 +19,7 @@ from tqdm import tqdm logger = logging.getLogger(__name__) +logging.basicConfig(format="%(asctime)s - %(message)s", level=logging.DEBUG) aws_endpoint_url = "https://s3.amazonaws.com" gcp_endpoint_url = "https://storage.googleapis.com" @@ -381,7 +382,7 @@ def get_viewer_URL( available in IDC viewer_selector: string containing the name of the viewer to use. Must be one of the following: - ohif_v2, ohif_v2, or slim. If not provided, default viewers will be used. + ohif_v2, ohif_v3, or slim. If not provided, default viewers will be used. Returns: string containing the IDC viewer URL for the given SeriesInstanceUID @@ -487,6 +488,7 @@ def _validate_update_manifest_and_get_download_size( """ logger.debug("manifest validation is requested: " + str(validate_manifest)) + print("Parsing the manifest. Please wait..") # Read the manifest as a csv file manifest_df = pd.read_csv( manifestFile, comment="#", skip_blank_lines=True, header=None @@ -506,19 +508,23 @@ def _validate_update_manifest_and_get_download_size( PRAGMA disable_progress_bar; with index_temp as (select - *, + seriesInstanceUID, + series_aws_url, + series_size_MB, regexp_extract(series_aws_url, '(?:.*?\\/){3}([^\\/?#]+)', 1) index_crdc_series_uuid from index_df_copy), manifest_temp as ( select manifest_cp_cmd, regexp_extract(manifest_cp_cmd, '(?:.*?\\/){3}([^\\/?#]+)', 1) as manifest_crdc_series_uuid, - regexp_replace(regexp_replace(manifest_cp_cmd, 'cp ', ''),' .','') as s3_url + regexp_replace(regexp_replace(manifest_cp_cmd, 'cp ', ''), '\\s[^\\s]*$', '') as s3_url, from manifest_df ) select - *, + seriesInstanceuid, + s3_url, + series_size_MB, index_crdc_series_uuid==manifest_crdc_series_uuid as crdc_series_uuid_match, s3_url==series_aws_url as s3_url_match, CASE WHEN s3_url==series_aws_url THEN 'aws' ELSE 'unknown' END as endpoint @@ -584,9 +590,9 @@ def _validate_update_manifest_and_get_download_size( # Write a temporary manifest file with tempfile.NamedTemporaryFile(mode="w", delete=False) as temp_manifest_file: - for s3_url in merged_df["s3_url"]: - temp_manifest_file.write(f"cp {s3_url} {downloadDir}\n") - + merged_df["s5cmd_cmd"] = "cp " + merged_df["s3_url"] + " " + downloadDir + merged_df["s5cmd_cmd"].to_csv(temp_manifest_file, header=False, index=False) + print("Parsing the manifest is finished. Download will begin soon") return total_size, endpoint_to_use, Path(temp_manifest_file.name) @staticmethod @@ -694,7 +700,10 @@ def _parse_s5cmd_sync_output_and_generate_synced_manifest( stdout_df ) select - * + distinct + seriesInstanceUID, + series_size_MB, + s3_url from sync_temp left join index_temp on index_temp.index_crdc_series_uuid = sync_temp.sync_crdc_instance_uuid @@ -707,9 +716,9 @@ def _parse_s5cmd_sync_output_and_generate_synced_manifest( # Write a temporary manifest file with tempfile.NamedTemporaryFile(mode="w", delete=False) as synced_manifest: - for s3_url in merged_df["s3_url"]: - synced_manifest.write(f"sync {s3_url} {downloadDir}\n") - logger.info("Parsing the s5cmd sync dry run output finished") + merged_df["s5cmd_cmd"] = "sync " + merged_df["s3_url"] + " " + downloadDir + merged_df["s5cmd_cmd"].to_csv(synced_manifest, header=False, index=False) + logger.info("Parsing the s5cmd sync dry run output finished") return Path(synced_manifest.name), sync_size_rounded def _s5cmd_run( @@ -1019,10 +1028,10 @@ def download_from_selection( # Download the files # make temporary file to store the list of files to download with tempfile.NamedTemporaryFile(mode="w", delete=False) as manifest_file: - for index, row in result_df.iterrows(): - manifest_file.write( - "cp " + row["series_aws_url"] + " " + downloadDir + "\n" - ) + result_df["s5cmd_cmd"] = ( + "cp " + result_df["series_aws_url"] + " " + downloadDir + ) + result_df["s5cmd_cmd"].to_csv(manifest_file, header=False, index=False) logger.info( """ Temporary download manifest is generated and is passed to self._s5cmd_run diff --git a/tests/idcindex.py b/tests/idcindex.py index 437faf4a..af3f66b4 100644 --- a/tests/idcindex.py +++ b/tests/idcindex.py @@ -4,6 +4,7 @@ import os import tempfile import unittest +from itertools import product import pytest from idc_index import index @@ -87,13 +88,41 @@ def test_download_dicom_series(self): self.assertNotEqual(len(os.listdir(temp_dir)), 0) def test_download_from_selection(self): - with tempfile.TemporaryDirectory() as temp_dir: - self.client.download_from_selection( - studyInstanceUID="1.3.6.1.4.1.14519.5.2.1.6279.6001.175012972118199124641098335511", - downloadDir=temp_dir, - ) + # Define the values for each optional parameter + dry_run_values = [True, False] + quiet_values = [True, False] + show_progress_bar_values = [True, False] + use_s5cmd_sync_dry_run_values = [True, False] + + # Generate all combinations of optional parameters + combinations = product( + dry_run_values, + quiet_values, + show_progress_bar_values, + use_s5cmd_sync_dry_run_values, + ) - self.assertNotEqual(len(os.listdir(temp_dir)), 0) + # Test each combination + for ( + dry_run, + quiet, + show_progress_bar, + use_s5cmd_sync_dry_run, + ) in combinations: + with tempfile.TemporaryDirectory() as temp_dir: + self.client.download_from_selection( + downloadDir=temp_dir, + dry_run=dry_run, + patientId=None, + studyInstanceUID="1.3.6.1.4.1.14519.5.2.1.6279.6001.175012972118199124641098335511", + seriesInstanceUID=None, + quiet=quiet, + show_progress_bar=show_progress_bar, + use_s5cmd_sync_dry_run=use_s5cmd_sync_dry_run, + ) + + if not dry_run: + self.assertNotEqual(len(os.listdir(temp_dir)), 0) def test_sql_queries(self): df = self.client.sql_query("SELECT DISTINCT(collection_id) FROM index") @@ -101,20 +130,72 @@ def test_sql_queries(self): self.assertIsNotNone(df) def test_download_from_aws_manifest(self): - with tempfile.TemporaryDirectory() as temp_dir: - self.client.download_from_manifest( - manifestFile="./study_manifest_aws.s5cmd", downloadDir=temp_dir - ) + # Define the values for each optional parameter + quiet_values = [True, False] + validate_manifest_values = [True, False] + show_progress_bar_values = [True, False] + use_s5cmd_sync_dry_run_values = [True, False] + + # Generate all combinations of optional parameters + combinations = product( + quiet_values, + validate_manifest_values, + show_progress_bar_values, + use_s5cmd_sync_dry_run_values, + ) - self.assertEqual(len(os.listdir(temp_dir)), 15) + # Test each combination + for ( + quiet, + validate_manifest, + show_progress_bar, + use_s5cmd_sync_dry_run, + ) in combinations: + with tempfile.TemporaryDirectory() as temp_dir: + self.client.download_from_manifest( + manifestFile="./study_manifest_aws.s5cmd", + downloadDir=temp_dir, + quiet=quiet, + validate_manifest=validate_manifest, + show_progress_bar=show_progress_bar, + use_s5cmd_sync_dry_run=use_s5cmd_sync_dry_run, + ) + + self.assertEqual(len(os.listdir(temp_dir)), 15) def test_download_from_gcp_manifest(self): - with tempfile.TemporaryDirectory() as temp_dir: - self.client.download_from_manifest( - manifestFile="./study_manifest_gcs.s5cmd", downloadDir=temp_dir - ) + # Define the values for each optional parameter + quiet_values = [True, False] + validate_manifest_values = [True, False] + show_progress_bar_values = [True, False] + use_s5cmd_sync_dry_run_values = [True, False] + + # Generate all combinations of optional parameters + combinations = product( + quiet_values, + validate_manifest_values, + show_progress_bar_values, + use_s5cmd_sync_dry_run_values, + ) - self.assertEqual(len(os.listdir(temp_dir)), 15) + # Test each combination + for ( + quiet, + validate_manifest, + show_progress_bar, + use_s5cmd_sync_dry_run, + ) in combinations: + with tempfile.TemporaryDirectory() as temp_dir: + self.client.download_from_manifest( + manifestFile="./study_manifest_gcs.s5cmd", + downloadDir=temp_dir, + quiet=quiet, + validate_manifest=validate_manifest, + show_progress_bar=show_progress_bar, + use_s5cmd_sync_dry_run=use_s5cmd_sync_dry_run, + ) + + self.assertEqual(len(os.listdir(temp_dir)), 15) if __name__ == "__main__":