Skip to content

Commit

Permalink
enh: make sql query efficient and write to csv instead of iterator
Browse files Browse the repository at this point in the history
make download tests more robust by testing all reasonable combinations
  • Loading branch information
vkt1414 committed May 2, 2024
1 parent 84b0806 commit 2e2bf83
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 32 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ jobs:
- name: Test package
run: >-
python -m pytest -ra --cov --cov-report=xml --cov-report=term
--durations=20
--durations=20 -vv
# - name: Upload coverage report
# uses: codecov/[email protected]
Expand Down
39 changes: 24 additions & 15 deletions idc_index/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from tqdm import tqdm

logger = logging.getLogger(__name__)
logging.basicConfig(format="%(asctime)s - %(message)s", level=logging.DEBUG)

aws_endpoint_url = "https://s3.amazonaws.com"
gcp_endpoint_url = "https://storage.googleapis.com"
Expand Down Expand Up @@ -381,7 +382,7 @@ def get_viewer_URL(
available in IDC
viewer_selector: string containing the name of the viewer to use. Must be one of the following:
ohif_v2, ohif_v2, or slim. If not provided, default viewers will be used.
ohif_v2, ohif_v3, or slim. If not provided, default viewers will be used.
Returns:
string containing the IDC viewer URL for the given SeriesInstanceUID
Expand Down Expand Up @@ -487,6 +488,7 @@ def _validate_update_manifest_and_get_download_size(
"""
logger.debug("manifest validation is requested: " + str(validate_manifest))

print("Parsing the manifest. Please wait..")
# Read the manifest as a csv file
manifest_df = pd.read_csv(
manifestFile, comment="#", skip_blank_lines=True, header=None
Expand All @@ -506,19 +508,23 @@ def _validate_update_manifest_and_get_download_size(
PRAGMA disable_progress_bar;
with index_temp as
(select
*,
seriesInstanceUID,
series_aws_url,
series_size_MB,
regexp_extract(series_aws_url, '(?:.*?\\/){3}([^\\/?#]+)', 1) index_crdc_series_uuid
from index_df_copy),
manifest_temp as (
select
manifest_cp_cmd,
regexp_extract(manifest_cp_cmd, '(?:.*?\\/){3}([^\\/?#]+)', 1) as manifest_crdc_series_uuid,
regexp_replace(regexp_replace(manifest_cp_cmd, 'cp ', ''),' .','') as s3_url
regexp_replace(regexp_replace(manifest_cp_cmd, 'cp ', ''), '\\s[^\\s]*$', '') as s3_url,
from
manifest_df
)
select
*,
seriesInstanceuid,
s3_url,
series_size_MB,
index_crdc_series_uuid==manifest_crdc_series_uuid as crdc_series_uuid_match,
s3_url==series_aws_url as s3_url_match,
CASE WHEN s3_url==series_aws_url THEN 'aws' ELSE 'unknown' END as endpoint
Expand Down Expand Up @@ -584,9 +590,9 @@ def _validate_update_manifest_and_get_download_size(

# Write a temporary manifest file
with tempfile.NamedTemporaryFile(mode="w", delete=False) as temp_manifest_file:
for s3_url in merged_df["s3_url"]:
temp_manifest_file.write(f"cp {s3_url} {downloadDir}\n")

merged_df["s5cmd_cmd"] = "cp " + merged_df["s3_url"] + " " + downloadDir
merged_df["s5cmd_cmd"].to_csv(temp_manifest_file, header=False, index=False)
print("Parsing the manifest is finished. Download will begin soon")
return total_size, endpoint_to_use, Path(temp_manifest_file.name)

@staticmethod
Expand Down Expand Up @@ -694,7 +700,10 @@ def _parse_s5cmd_sync_output_and_generate_synced_manifest(
stdout_df
)
select
*
distinct
seriesInstanceUID,
series_size_MB,
s3_url
from
sync_temp
left join index_temp on index_temp.index_crdc_series_uuid = sync_temp.sync_crdc_instance_uuid
Expand All @@ -707,9 +716,9 @@ def _parse_s5cmd_sync_output_and_generate_synced_manifest(

# Write a temporary manifest file
with tempfile.NamedTemporaryFile(mode="w", delete=False) as synced_manifest:
for s3_url in merged_df["s3_url"]:
synced_manifest.write(f"sync {s3_url} {downloadDir}\n")
logger.info("Parsing the s5cmd sync dry run output finished")
merged_df["s5cmd_cmd"] = "sync " + merged_df["s3_url"] + " " + downloadDir
merged_df["s5cmd_cmd"].to_csv(synced_manifest, header=False, index=False)
logger.info("Parsing the s5cmd sync dry run output finished")
return Path(synced_manifest.name), sync_size_rounded

def _s5cmd_run(
Expand Down Expand Up @@ -1019,10 +1028,10 @@ def download_from_selection(
# Download the files
# make temporary file to store the list of files to download
with tempfile.NamedTemporaryFile(mode="w", delete=False) as manifest_file:
for index, row in result_df.iterrows():
manifest_file.write(
"cp " + row["series_aws_url"] + " " + downloadDir + "\n"
)
result_df["s5cmd_cmd"] = (
"cp " + result_df["series_aws_url"] + " " + downloadDir
)
result_df["s5cmd_cmd"].to_csv(manifest_file, header=False, index=False)
logger.info(
"""
Temporary download manifest is generated and is passed to self._s5cmd_run
Expand Down
113 changes: 97 additions & 16 deletions tests/idcindex.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import os
import tempfile
import unittest
from itertools import product

import pytest
from idc_index import index
Expand Down Expand Up @@ -87,34 +88,114 @@ def test_download_dicom_series(self):
self.assertNotEqual(len(os.listdir(temp_dir)), 0)

def test_download_from_selection(self):
with tempfile.TemporaryDirectory() as temp_dir:
self.client.download_from_selection(
studyInstanceUID="1.3.6.1.4.1.14519.5.2.1.6279.6001.175012972118199124641098335511",
downloadDir=temp_dir,
)
# Define the values for each optional parameter
dry_run_values = [True, False]
quiet_values = [True, False]
show_progress_bar_values = [True, False]
use_s5cmd_sync_dry_run_values = [True, False]

# Generate all combinations of optional parameters
combinations = product(
dry_run_values,
quiet_values,
show_progress_bar_values,
use_s5cmd_sync_dry_run_values,
)

self.assertNotEqual(len(os.listdir(temp_dir)), 0)
# Test each combination
for (
dry_run,
quiet,
show_progress_bar,
use_s5cmd_sync_dry_run,
) in combinations:
with tempfile.TemporaryDirectory() as temp_dir:
self.client.download_from_selection(
downloadDir=temp_dir,
dry_run=dry_run,
patientId=None,
studyInstanceUID="1.3.6.1.4.1.14519.5.2.1.6279.6001.175012972118199124641098335511",
seriesInstanceUID=None,
quiet=quiet,
show_progress_bar=show_progress_bar,
use_s5cmd_sync_dry_run=use_s5cmd_sync_dry_run,
)

if not dry_run:
self.assertNotEqual(len(os.listdir(temp_dir)), 0)

def test_sql_queries(self):
df = self.client.sql_query("SELECT DISTINCT(collection_id) FROM index")

self.assertIsNotNone(df)

def test_download_from_aws_manifest(self):
with tempfile.TemporaryDirectory() as temp_dir:
self.client.download_from_manifest(
manifestFile="./study_manifest_aws.s5cmd", downloadDir=temp_dir
)
# Define the values for each optional parameter
quiet_values = [True, False]
validate_manifest_values = [True, False]
show_progress_bar_values = [True, False]
use_s5cmd_sync_dry_run_values = [True, False]

# Generate all combinations of optional parameters
combinations = product(
quiet_values,
validate_manifest_values,
show_progress_bar_values,
use_s5cmd_sync_dry_run_values,
)

self.assertEqual(len(os.listdir(temp_dir)), 15)
# Test each combination
for (
quiet,
validate_manifest,
show_progress_bar,
use_s5cmd_sync_dry_run,
) in combinations:
with tempfile.TemporaryDirectory() as temp_dir:
self.client.download_from_manifest(
manifestFile="./study_manifest_aws.s5cmd",
downloadDir=temp_dir,
quiet=quiet,
validate_manifest=validate_manifest,
show_progress_bar=show_progress_bar,
use_s5cmd_sync_dry_run=use_s5cmd_sync_dry_run,
)

self.assertEqual(len(os.listdir(temp_dir)), 15)

def test_download_from_gcp_manifest(self):
with tempfile.TemporaryDirectory() as temp_dir:
self.client.download_from_manifest(
manifestFile="./study_manifest_gcs.s5cmd", downloadDir=temp_dir
)
# Define the values for each optional parameter
quiet_values = [True, False]
validate_manifest_values = [True, False]
show_progress_bar_values = [True, False]
use_s5cmd_sync_dry_run_values = [True, False]

# Generate all combinations of optional parameters
combinations = product(
quiet_values,
validate_manifest_values,
show_progress_bar_values,
use_s5cmd_sync_dry_run_values,
)

self.assertEqual(len(os.listdir(temp_dir)), 15)
# Test each combination
for (
quiet,
validate_manifest,
show_progress_bar,
use_s5cmd_sync_dry_run,
) in combinations:
with tempfile.TemporaryDirectory() as temp_dir:
self.client.download_from_manifest(
manifestFile="./study_manifest_gcs.s5cmd",
downloadDir=temp_dir,
quiet=quiet,
validate_manifest=validate_manifest,
show_progress_bar=show_progress_bar,
use_s5cmd_sync_dry_run=use_s5cmd_sync_dry_run,
)

self.assertEqual(len(os.listdir(temp_dir)), 15)


if __name__ == "__main__":
Expand Down

0 comments on commit 2e2bf83

Please sign in to comment.