From 76e7c29f7682881ef54b0ad12c9734b4ba9f3dd6 Mon Sep 17 00:00:00 2001 From: Yasin Tatar Date: Thu, 17 Aug 2023 14:49:56 +0200 Subject: [PATCH] refactoring: clean up code, extract methods, document some others --- quetz/dao.py | 12 ++++-- quetz/pkgstores.py | 4 ++ quetz/tasks/mirror.py | 95 +++++++++++++++++++++++++++---------------- 3 files changed, 73 insertions(+), 38 deletions(-) diff --git a/quetz/dao.py b/quetz/dao.py index c048b8af..19064875 100644 --- a/quetz/dao.py +++ b/quetz/dao.py @@ -361,7 +361,7 @@ def cleanup_channel_db( Package.channel_name == channel_name ) if package_name: - all_packages = all_packages.filter( + all_packages = all_packages.join(PackageVersion).filter( PackageVersion.package_name == package_name ) for each_package in all_packages: @@ -396,7 +396,7 @@ def cleanup_channel_db( Package.channel_name == channel_name ) if package_name: - all_packages = all_packages.filter( + all_packages = all_packages.join(PackageVersion).filter( PackageVersion.package_name == package_name ) for each_package in all_packages: @@ -429,7 +429,7 @@ def cleanup_channel_db( Package.channel_name == channel_name ) if package_name: - all_packages = all_packages.filter( + all_packages = all_packages.join(PackageVersion).filter( PackageVersion.package_name == package_name ) for x, each_package in enumerate(all_packages): @@ -595,6 +595,7 @@ def get_channel(self, channel_name: str) -> Optional[Channel]: return self.db.query(Channel).filter(Channel.name == channel_name).one_or_none() def get_package(self, channel_name: str, package_name: str) -> Optional[Package]: + print(f"get_package: {channel_name}{package_name}") return ( self.db.query(Package) .join(Channel) @@ -1026,6 +1027,11 @@ def get_channel_datas(self, channel_name: str): ) def assert_size_limits(self, channel_name: str, size: int): + """ + validate that adding a package of size `size` to channel `channel_name` + does not exceed the channel size limit. + raises: QuotaError + """ channel_size, channel_size_limit = ( self.db.query(Channel.size, Channel.size_limit) .filter(Channel.name == channel_name) diff --git a/quetz/pkgstores.py b/quetz/pkgstores.py index 5ce49497..6802dc0a 100644 --- a/quetz/pkgstores.py +++ b/quetz/pkgstores.py @@ -67,6 +67,10 @@ def support_redirect(self) -> bool: @abc.abstractmethod def create_channel(self, name): + """ + create channel with given name, depending on backend, + e.g. create a directory or bucket + """ pass @abc.abstractmethod diff --git a/quetz/tasks/mirror.py b/quetz/tasks/mirror.py index b4bb0b95..2799c451 100644 --- a/quetz/tasks/mirror.py +++ b/quetz/tasks/mirror.py @@ -202,6 +202,26 @@ def download_file(remote_repository, path_metadata): return f, package_name, metadata +@retry( + stop=stop_after_attempt(3), + wait=wait_exponential(multiplier=1, min=4, max=10), + after=after_log(logger, logging.WARNING), +) +def _upload_package(file, channel_name, subdir, pkgstore): + dest = os.path.join(subdir, file.filename) + + try: + file.file.seek(0) + logger.debug( + f"uploading file {dest} from channel {channel_name} to package store" + ) + pkgstore.add_package(file.file, channel_name, dest) + + except AttributeError as e: + logger.error(f"Could not upload {file}, {file.filename}. {str(e)}") + raise TryAgain + + def handle_repodata_package( channel, files_metadata, @@ -217,6 +237,7 @@ def handle_repodata_package( proxylist = channel.load_channel_metadata().get('proxylist', []) user_id = auth.assert_user() + # check package format and permissions, calculate total size total_size = 0 for file, package_name, metadata in files_metadata: parts = file.filename.rsplit("-", 2) @@ -240,38 +261,24 @@ def handle_repodata_package( total_size += size file.file.seek(0) - dao.assert_size_limits(channel_name, total_size) - - @retry( - stop=stop_after_attempt(3), - wait=wait_exponential(multiplier=1, min=4, max=10), - after=after_log(logger, logging.WARNING), - ) - def _upload_package(file, channel_name, subdir): - dest = os.path.join(subdir, file.filename) - - try: - file.file.seek(0) - logger.debug( - f"uploading file {dest} from channel {channel_name} to package store" - ) - pkgstore.add_package(file.file, channel_name, dest) + # create package in database + # channel_data = _load_remote_channel_data(remote_repository) + # create_packages_from_channeldata(channel_name, user_id, channel_data, dao) - except AttributeError as e: - logger.error(f"Could not upload {file}, {file.filename}. {str(e)}") - raise TryAgain + # validate quota + dao.assert_size_limits(channel_name, total_size) pkgstore.create_channel(channel_name) - nthreads = config.general_package_unpack_threads with TicToc("upload file without extracting"): + nthreads = config.general_package_unpack_threads with ThreadPoolExecutor(max_workers=nthreads) as executor: for file, package_name, metadata in files_metadata: if proxylist and package_name in proxylist: # skip packages that should only ever be proxied continue subdir = get_subdir_compat(metadata) - executor.submit(_upload_package, file, channel_name, subdir) + executor.submit(_upload_package, file, channel_name, subdir, pkgstore) with TicToc("add versions to the db"): for file, package_name, metadata in files_metadata: @@ -618,6 +625,24 @@ def create_versions_from_repodata( create_version_from_metadata(channel_name, user_id, filename, metadata, dao) +def _load_remote_channel_data(remote_repository: RemoteRepository) -> dict: + """ + given the remote repository, load the channeldata.json file + raises: HTTPException if the remote server is unavailable + """ + try: + channel_data = remote_repository.open("channeldata.json").json() + except (RemoteFileNotFound, json.JSONDecodeError): + channel_data = {} + except RemoteServerError as e: + logger.error(f"Remote server error: {e}") + raise HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + detail=f"Remote channel {remote_repository.host} unavailable", + ) + return channel_data + + def synchronize_packages( channel_name: str, dao: Dao, @@ -628,6 +653,14 @@ def synchronize_packages( excludelist: List[str] = None, use_repodata: bool = False, ): + """synchronize package from a remote channel. + + Args: + channel_name (str): the channel to be updated, e.g. the mirror channel + dao (Dao): database access object + pkgstore (PackageStore): the target channels package store + use_repodata (bool, optional): wether to create packages from repodata.json + """ logger.info(f"executing synchronize_packages task in a process {os.getpid()}") new_channel = dao.get_channel(channel_name) @@ -639,22 +672,14 @@ def synchronize_packages( for mirror_channel_url in new_channel.mirror_channel_urls: remote_repo = RemoteRepository(mirror_channel_url, session) - user_id = auth.assert_user() + auth.assert_user() - try: - channel_data = remote_repo.open("channeldata.json").json() - if use_repodata: - create_packages_from_channeldata( - channel_name, user_id, channel_data, dao - ) + channel_data = _load_remote_channel_data(remote_repo) + subdirs = None + if use_repodata: + # create_packages_from_channeldata(channel_name, user_id, channel_data, dao) subdirs = channel_data.get("subdirs", []) - except (RemoteFileNotFound, json.JSONDecodeError): - subdirs = None - except RemoteServerError: - raise HTTPException( - status_code=status.HTTP_503_SERVICE_UNAVAILABLE, - detail=f"Remote channel {mirror_channel_url} unavailable", - ) + # if no channel data use known architectures if subdirs is None: subdirs = KNOWN_SUBDIRS