From 301aeb75ae173f3569626e78651f24c29b717a0a Mon Sep 17 00:00:00 2001 From: Vamsi Thiriveeedhi Date: Sat, 30 Mar 2024 23:21:34 -0400 Subject: [PATCH] test sync efficiency use popen fix: generate only csv for now fix: generate only csv for now fix: generate only csv for now fix: generate only csv for now fix: generate only csv for now fix: generate only csv for now fix: generate only csv for now fix: generate only csv for now fix: generate only csv for now fix: generate only csv for now fix: generate only csv for now fix: generate only csv for now fix: generate only csv for now fix: generate only csv for now fix: generate only csv for now fix: generate only csv for now fix: generate only csv for now fix: generate only csv for now fix: generate only csv for now fix: generate only csv for now fix: generate only csv for now fix: generate only csv for now enh: improve robustness of download progress maximize the potential of s5cmd sync to avoid downloading the same data again manifest validator function now also updates the input manifest with download directory to make optimal use of s5cmd sync, a dry run is performed before the actual download, the output from dry run sync is parsed to generate a synced manifest download_dicom_series utilizes download_from_selection to make of sync logic instead of duplicating the code add additional endpoints download_dicom_studies, download_dicom_patients, download_dicom_patients and download_collection, all of them routed to download_from_selection --- idc_index/index.py | 349 +++++++++++++++++++++++++++++---------------- 1 file changed, 224 insertions(+), 125 deletions(-) diff --git a/idc_index/index.py b/idc_index/index.py index 23e1b102..a8671a0f 100644 --- a/idc_index/index.py +++ b/idc_index/index.py @@ -98,7 +98,8 @@ def _filter_by_dicom_series_uid(df_index, dicom_series_uid): "SeriesInstanceUID", df_index, dicom_series_uid ) - def get_idc_version(self): + @staticmethod + def get_idc_version(): return f"v{idc_index_data.__version__}" def get_collections(self): @@ -267,8 +268,9 @@ def get_dicom_series(self, studyInstanceUID=None, outputFormat="dict"): return response + @staticmethod def _track_download_progress( - self, size_MB: int, downloadDir: str, process: subprocess.Popen + size_MB: int, downloadDir: str, process: subprocess.Popen ): """ Track progress by continuously checking the downloaded file size and updating the progress bar. @@ -315,42 +317,6 @@ def _track_download_progress( logger.debug("Successfully downloaded files to %s", str(downloadDir)) - def download_dicom_series(self, seriesInstanceUID: str, downloadDir: str) -> None: - """ - Download the files corresponding to the seriesInstanceUID to the specified directory. - - Returns: None - - """ - series_df = self.index[self.index["SeriesInstanceUID"] == seriesInstanceUID] - if series_df.empty: - error_message = ( - f"No series found with the SeriesInstanceUID '{seriesInstanceUID}'." - ) - raise ValueError(error_message) - - # Start the download process - series_url = self.index[self.index["SeriesInstanceUID"] == seriesInstanceUID][ - "series_aws_url" - ].iloc[0] - series_size_MB = self.index[ - self.index["SeriesInstanceUID"] == seriesInstanceUID - ]["series_size_MB"].iloc[0] - cmd = [ - self.s5cmdPath, - "--no-sign-request", - "--endpoint-url", - aws_endpoint_url, - "sync", - series_url, - downloadDir, - ] - with subprocess.Popen( - cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE, universal_newlines=True - ) as process: - # Track progress using tqdm - self._track_download_progress(series_size_MB, downloadDir, process) - def get_series_file_URLs(self, seriesInstanceUID): """ Get the URLs of the files corresponding to the DICOM instances in a given SeriesInstanceUID. @@ -521,9 +487,9 @@ def _get_series_size_from_crdc_series_uuid( """ return duckdb.query(series_size_sql).to_df().series_size_MB.iloc[0] - def _validate_manifest_and_get_download_size( - self, manifestFile: str - ) -> tuple[float, str]: + def _validate_update_manifest_and_get_download_size( + self, manifestFile, downloadDir + ) -> tuple[float, str, Path]: """ Validates the manifest file by checking the URLs and their availability. The function reads the manifest file line by line. For each line, it checks if @@ -531,58 +497,47 @@ def _validate_manifest_and_get_download_size( Uses the s5cmd to check the availability of the URLs in both AWS and GCP. If the URL is not accessible in either AWS or GCP, it raises a ValueError. In addition it also calculates the total size of all series in the manifest file. + Lastly, updates the manifest from generic download directory to explicitly mentioned directory Args: manifestFile (str): The path to the manifest file. + downloadDir (str): The path to the download directory. Returns: total_size (float): The total size of all series in the manifest file. endpoint_to_use (str): The endpoint URL to use (either AWS or GCP). + temp_manifest_file(Path): Path to the temporary manifest file for downstream steps Raises: ValueError: If the manifest file does not exist, if any URL in the manifest file is invalid, or if any URL is inaccessible in both AWS and GCP. Exception: If the manifest contains URLs from both AWS and GCP. """ - if not os.path.exists(manifestFile): - raise ValueError("Manifest does not exist.") endpoint_to_use = None aws_found = False gcp_found = False total_size = 0 - with open(manifestFile) as f: - for line in f: - if not line.startswith("#"): - series_folder_pattern = r"(s3:\/\/.*)\/\*" - match = re.search(series_folder_pattern, line) - if match is None: - raise ValueError("Invalid URL format in manifest file.") - folder_url = match.group(1) - - # Extract CRDC UUID from the line - crdc_series_uuid_pattern = r"(?:.*?\/){3}([^\/?#]+)" - match_uuid = re.search(crdc_series_uuid_pattern, line) - if match_uuid is None: - raise ValueError("Invalid URL format in manifest file.") - crdc_series_uuid = match_uuid.group(1) - - # Check AWS endpoint - cmd = [ - "s5cmd", - "--no-sign-request", - "--endpoint-url", - aws_endpoint_url, - "ls", - folder_url, - ] - process = subprocess.run( - cmd, capture_output=True, text=True, check=False - ) - if process.stderr and process.stderr.startswith("ERROR"): - # Check GCP endpoint + with tempfile.NamedTemporaryFile(mode="w", delete=False) as temp_manifest_file: + with open(manifestFile) as f: + for line in f: + if line and not line.startswith("#"): + series_folder_pattern = r"(s3:\/\/.*)\/\*" + match = re.search(series_folder_pattern, line) + if match is None: + raise ValueError("Invalid URL format in manifest file.") + folder_url = match.group(1) + + # Extract CRDC UUID from the line + crdc_series_uuid_pattern = r"(?:.*?\/){3}([^\/?#]+)" + match_uuid = re.search(crdc_series_uuid_pattern, line) + if match_uuid is None: + raise ValueError("Invalid URL format in manifest file.") + crdc_series_uuid = match_uuid.group(1) + + # Check AWS endpoint cmd = [ - "s5cmd", + self.s5cmdPath, "--no-sign-request", "--endpoint-url", - gcp_endpoint_url, + aws_endpoint_url, "ls", folder_url, ] @@ -590,94 +545,168 @@ def _validate_manifest_and_get_download_size( cmd, capture_output=True, text=True, check=False ) if process.stderr and process.stderr.startswith("ERROR"): - error_message = f"Manifest contains invalid or inaccessible URLs. Please check line '{line}'" - raise ValueError(error_message) + # Check GCP endpoint + cmd = [ + self.s5cmdPath, + "--no-sign-request", + "--endpoint-url", + gcp_endpoint_url, + "ls", + folder_url, + ] + process = subprocess.run( + cmd, capture_output=True, text=True, check=False + ) + if process.stderr and process.stderr.startswith("ERROR"): + error_message = f"Manifest contains invalid or inaccessible URLs. Please check line '{line}'" + raise ValueError(error_message) + else: + if aws_found: + raise RuntimeError( + "The manifest contains URLs from both AWS and GCP. Please use only one provider." + ) + endpoint_to_use = gcp_endpoint_url + gcp_found = True else: - if aws_found: + if gcp_found: raise RuntimeError( "The manifest contains URLs from both AWS and GCP. Please use only one provider." ) - endpoint_to_use = gcp_endpoint_url - gcp_found = True - else: - if gcp_found: - raise RuntimeError( - "The manifest contains URLs from both AWS and GCP. Please use only one provider." - ) - endpoint_to_use = aws_endpoint_url - aws_found = True + endpoint_to_use = aws_endpoint_url + aws_found = True - # Get the size of the series + temp_manifest_file.write( + " sync " + folder_url + "/* " + downloadDir + "\n" + ) + # Get the size of the series + series_size = self._get_series_size_from_crdc_series_uuid( + crdc_series_uuid + ) + total_size += series_size + if not endpoint_to_use: + raise ValueError("No valid URLs found in the manifest.") + + return total_size, endpoint_to_use, temp_manifest_file + + def _parse_s5cmd_sync_output_and_generate_synced_manifest( + self, stdout, downloadDir + ) -> Path: + """ + Parse the output of s5cmd sync --dry-run to extract distinct folders and generate a synced manifest. + + Args: + output (str): The output of s5cmd sync --dry-run command. + downloadDir (str): The directory to download the files to. + + Returns: + Path: The path to the generated synced manifest file. + float: Download size in MB + """ + distinct_folders = set() + distinct_uuids = set() + sync_size = 0 + for line in stdout.splitlines(): + match = re.match(r"cp (s3://[^/]+/[^/]+)/.*", line) + if match: + distinct_folders.add(match.group(1)) + crdc_series_uuid_pattern = r"(?:.*?\/){3}([^\/?#]+)" + match_uuid = re.search(crdc_series_uuid_pattern, line) + crdc_series_uuid = match_uuid.group(1) + if crdc_series_uuid not in distinct_uuids: + distinct_uuids.add(crdc_series_uuid) series_size = self._get_series_size_from_crdc_series_uuid( crdc_series_uuid ) - total_size += series_size - if not endpoint_to_use: - raise ValueError("No valid URLs found in the manifest.") + sync_size += series_size - return total_size, endpoint_to_use + # Generate synced manifest with distinct folders + with tempfile.NamedTemporaryFile(mode="w", delete=False) as temp: + synced_manifest = temp.name + with open(synced_manifest, "w") as f: + for folder_url in distinct_folders: + f.write(f"sync {folder_url}/* {downloadDir}\n") + return synced_manifest, sync_size def download_from_manifest( self, manifestFile: str, downloadDir: str, quiet: bool = True ) -> None: """ Download the manifest file. In a series of steps, the manifest file - is first validated to ensure every line contains a valid urls. It then - gets the total size to be downloaded and runs download process on one - process and download progress on another process. + is first validated to ensure every line contains valid URLs. It then + gets the total size to be downloaded and runs the download process. Args: manifestFile (str): The path to the manifest file. downloadDir (str): The directory to download the files to. - quiet (bool, optional): If True, suppresses the output of the subprocess. Defaults to True. + quiet (bool, optional): If True, suppresses the stderr Raises: ValueError: If the download directory does not exist. """ - total_size, endpoint_to_use = self._validate_manifest_and_get_download_size( - manifestFile - ) - print("Total size:" + str(total_size)) + if not os.path.exists(manifestFile): + raise ValueError("Manifest does not exist.") + downloadDir = os.path.abspath(downloadDir).replace("\\", "/") if not os.path.exists(downloadDir): raise ValueError("Download directory does not exist.") + ( + total_size, + endpoint_to_use, + temp_manifest_file, + ) = self._validate_update_manifest_and_get_download_size( + manifestFile, downloadDir + ) - # Create a temporary manifest file with updated destination directories - with tempfile.NamedTemporaryFile(mode="w", delete=False) as temp_manifest_file: - with open(manifestFile) as f: - for line in f: - if not line.startswith("#"): - pattern = r"s3:\/\/.*\*" - match = re.search(pattern, line) - if match is None: - raise ValueError( - "Could not find the bucket URL in the first line of the manifest file." - ) - folder_url = match.group(0) - temp_manifest_file.write( - " sync " + folder_url + " " + downloadDir + "\n" - ) - - cmd = [ - "s5cmd", + dry_run_cmd = [ + self.s5cmdPath, "--no-sign-request", + "--dry-run", "--endpoint-url", endpoint_to_use, "run", temp_manifest_file.name, ] + stdout = subprocess.PIPE if quiet: - stdout = subprocess.DEVNULL stderr = subprocess.DEVNULL else: stderr = subprocess.PIPE - stdout = subprocess.PIPE - with subprocess.Popen( - cmd, stderr=stderr, stdout=stdout, universal_newlines=True - ) as process: - # Track progress using tqdm - self._track_download_progress(total_size, downloadDir, process) + process = subprocess.run( + dry_run_cmd, stdout=stdout, stderr=stderr, text=True, check=False + ) + + if process.stdout: + # Some files need to be downloaded + ( + synced_manifest, + sync_size, + ) = self._parse_s5cmd_sync_output_and_generate_synced_manifest( + stdout=process.stdout, downloadDir=downloadDir + ) + cmd = [ + self.s5cmdPath, + "--no-sign-request", + "--endpoint-url", + endpoint_to_use, + "run", + synced_manifest, + ] + with subprocess.Popen( + cmd, stderr=stderr, stdout=stdout, universal_newlines=True + ) as process: + print("Total requested download size:", round(total_size, 2), " MB") + if sync_size < total_size: + existing_data_size = round(total_size - sync_size, 2) + print( + f"Requested total download size is {total_size}, however {existing_data_size} MB is already present,\n so downloading only remaining {sync_size} MB" + ) + self._track_download_progress(sync_size, downloadDir, process) + else: + self._track_download_progress(total_size, downloadDir, process) + else: + # All requested DICOM files are already present + print(f"All requested DICOM files are already present in {downloadDir}.") def download_from_selection( self, @@ -769,6 +798,76 @@ def download_from_selection( # Delete the manifest file after download os.remove(manifest_file) + def download_dicom_series(self, seriesInstanceUID, downloadDir) -> None: + """ + Download the files corresponding to the seriesInstanceUID to the specified directory. + + Args: + seriesInstanceUID: string or list of strings containing the values of DICOM SeriesInstanceUID to filter by + downloadDir: string containing the path to the directory to download the files to + + Returns: None + + Raises: + TypeError: If seriesInstanceUID(s) passed is(are) not a string or list + + """ + self.download_from_selection( + seriesInstanceUID=seriesInstanceUID, downloadDir=downloadDir + ) + + def download_dicom_studies(self, studyInstanceUID, downloadDir) -> None: + """ + Download the files corresponding to the studyInstanceUID to the specified directory. + + Args: + studyInstanceUID: string or list of strings containing the values of DICOM studyInstanceUID to filter by + downloadDir: string containing the path to the directory to download the files to + + Returns: None + + Raises: + TypeError: If seriesInstanceUID(s) passed is(are) not a string or list + + """ + self.download_from_selection( + studyInstanceUID=studyInstanceUID, downloadDir=downloadDir + ) + + def download_dicom_patients(self, patientId, downloadDir) -> None: + """ + Download the files corresponding to the studyInstanceUID to the specified directory. + + Args: + patientId: string or list of strings containing the values of DICOM patientId to filter by + downloadDir: string containing the path to the directory to download the files to + + Returns: None + + Raises: + TypeError: If patientId(s) passed is(are) not a string or list + + """ + self.download_from_selection(patientId=patientId, downloadDir=downloadDir) + + def download_collection(self, collection_id, downloadDir) -> None: + """ + Download the files corresponding to the studyInstanceUID to the specified directory. + + Args: + collection_id: string or list of strings containing the values of DICOM patientId to filter by + downloadDir: string containing the path to the directory to download the files to + + Returns: None + + Raises: + TypeError: If collection_id(s) passed is(are) not a string or list + + """ + self.download_from_selection( + collection_id=collection_id, downloadDir=downloadDir + ) + def sql_query(self, sql_query): """Execute SQL query against the table in the index using duckdb.