From 7aac1ded9b2aa9eea7b7e12318eb29c3b4128eeb Mon Sep 17 00:00:00 2001 From: Radovan Zvoncek Date: Thu, 26 Oct 2023 16:06:18 +0300 Subject: [PATCH] [S3 Storage] Use boto's connection pool so transfers are trully parallel --- medusa/storage/s3_base_storage.py | 55 ++++++++++++++----- tests/resources/config/medusa-ibm_storage.ini | 1 + tests/resources/config/medusa-minio.ini | 2 +- .../config/medusa-s3_us_west_oregon.ini | 2 +- .../medusa-s3_us_west_oregon_encrypted.ini | 2 +- 5 files changed, 46 insertions(+), 16 deletions(-) diff --git a/medusa/storage/s3_base_storage.py b/medusa/storage/s3_base_storage.py index 7d1be225e..22df5242e 100644 --- a/medusa/storage/s3_base_storage.py +++ b/medusa/storage/s3_base_storage.py @@ -13,10 +13,11 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import asyncio import base64 import boto3 import botocore.session +import concurrent.futures import logging import io import os @@ -114,6 +115,8 @@ def __init__(self, config): self.connection_extra_args = self._make_connection_arguments(config) self.transfer_config = self._make_transfer_config(config) + self.executor = concurrent.futures.ThreadPoolExecutor(int(config.concurrent_transfers)) + super().__init__(config) def connect(self): @@ -125,7 +128,8 @@ def connect(self): boto_config = Config( region_name=self.credentials.region, signature_version='v4', - tcp_keepalive=True + tcp_keepalive=True, + max_pool_connections=int(self.config.concurrent_transfers) ) self.s3_client = boto3.client( 's3', @@ -139,6 +143,7 @@ def disconnect(self): logging.debug('Disconnecting from S3...') try: self.s3_client.close() + self.executor.shutdown() except Exception as e: logging.error('Error disconnecting from S3: {}'.format(e)) @@ -260,7 +265,15 @@ async def _upload_object(self, data: io.BytesIO, object_key: str, headers: t.Dic @retry(stop_max_attempt_number=MAX_UP_DOWN_LOAD_RETRIES, wait_fixed=5000) async def _download_blob(self, src: str, dest: str): - blob = await self._stat_blob(src) + # boto has a connection pool, but it does not support the asyncio API + # so we make things ugly and submit the whole download as a task to an executor + # which allows us to download several files in parallel + loop = asyncio.get_event_loop() + future = loop.run_in_executor(self.executor, self.__download_blob, src, dest) + await future + + def __download_blob(self, src: str, dest: str): + blob = self.__stat_blob(src) object_key = blob.name # we must make sure the blob gets stored under sub-folder (if there is any) @@ -304,6 +317,11 @@ async def _stat_blob(self, object_key: str) -> AbstractBlob: logging.error("An error occurred:", e) logging.error('Error getting object from s3://{}/{}'.format(self.bucket_name, object_key)) + def __stat_blob(self, key): + resp = self.s3_client.head_object(Bucket=self.bucket_name, Key=key) + item_hash = resp['ETag'].replace('"', '') + return AbstractBlob(key, int(resp['ContentLength']), item_hash, resp['LastModified']) + @retry(stop_max_attempt_number=MAX_UP_DOWN_LOAD_RETRIES, wait_fixed=5000) async def _upload_blob(self, src: str, dest: str) -> ManifestObject: src_chunks = src.split('/') @@ -328,18 +346,29 @@ async def _upload_blob(self, src: str, dest: str) -> ManifestObject: ) ) - self.s3_client.upload_file( - Filename=src, - Bucket=self.bucket_name, - Key=object_key, - Config=self.transfer_config, - ExtraArgs=kms_args, - ) - - blob = await self._stat_blob(object_key) - mo = ManifestObject(blob.name, blob.size, blob.hash) + upload_conf = { + 'Filename': src, + 'Bucket': self.bucket_name, + 'Key': object_key, + 'Config': self.transfer_config, + 'ExtraArgs': kms_args, + } + # we are going to combine asyncio with boto's threading + # we do this by submitting the upload into an executor + loop = asyncio.get_event_loop() + future = loop.run_in_executor(self.executor, self.__upload_file, upload_conf) + # and then ask asyncio to yield until it completes + mo = await future return mo + def __upload_file(self, upload_conf): + self.s3_client.upload_file(**upload_conf) + resp = self.s3_client.head_object(Bucket=upload_conf['Bucket'], Key=upload_conf['Key']) + blob_name = upload_conf['Key'] + blob_size = int(resp['ContentLength']) + blob_hash = resp['ETag'].replace('"', '') + return ManifestObject(blob_name, blob_size, blob_hash) + async def _get_object(self, object_key: t.Union[Path, str]) -> AbstractBlob: blob = await self._stat_blob(str(object_key)) return blob diff --git a/tests/resources/config/medusa-ibm_storage.ini b/tests/resources/config/medusa-ibm_storage.ini index 85ab432eb..322028b6a 100644 --- a/tests/resources/config/medusa-ibm_storage.ini +++ b/tests/resources/config/medusa-ibm_storage.ini @@ -17,6 +17,7 @@ transfer_max_bandwidth = "1MB/s" secure = True backup_grace_period_in_days = 0 max_backup_count = 1 +concurrent_transfers = 16 [monitoring] monitoring_provider = local diff --git a/tests/resources/config/medusa-minio.ini b/tests/resources/config/medusa-minio.ini index a83bcf6f5..44c8b3141 100644 --- a/tests/resources/config/medusa-minio.ini +++ b/tests/resources/config/medusa-minio.ini @@ -20,7 +20,7 @@ host = localhost port = 9000 backup_grace_period_in_days = 0 max_backup_count = 1 -concurrent_transfers = 4 +concurrent_transfers = 16 [monitoring] monitoring_provider = local diff --git a/tests/resources/config/medusa-s3_us_west_oregon.ini b/tests/resources/config/medusa-s3_us_west_oregon.ini index 999805785..b99e3ae4e 100644 --- a/tests/resources/config/medusa-s3_us_west_oregon.ini +++ b/tests/resources/config/medusa-s3_us_west_oregon.ini @@ -12,7 +12,7 @@ fqdn = 127.0.0.1 prefix = storage_prefix multi_part_upload_threshold = 1024 aws_cli_path = aws -concurrent_transfers = 4 +concurrent_transfers = 16 backup_grace_period_in_days = 0 max_backup_count = 1 region = us-west-2 diff --git a/tests/resources/config/medusa-s3_us_west_oregon_encrypted.ini b/tests/resources/config/medusa-s3_us_west_oregon_encrypted.ini index 66f69c85e..be707f38a 100644 --- a/tests/resources/config/medusa-s3_us_west_oregon_encrypted.ini +++ b/tests/resources/config/medusa-s3_us_west_oregon_encrypted.ini @@ -12,7 +12,7 @@ fqdn = 127.0.0.1 prefix = storage_prefix multi_part_upload_threshold = 1024 aws_cli_path = aws -concurrent_transfers = 4 +concurrent_transfers = 16 backup_grace_period_in_days = 0 max_backup_count = 1 kms_id = 939b70ee-a65e-46af-aecf-a20ef6a457b7