Skip to content

Commit

Permalink
[S3 Storage] Use boto's connection pool so transfers are trully parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
rzvoncek committed Oct 26, 2023
1 parent db75271 commit 7aac1de
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 16 deletions.
55 changes: 42 additions & 13 deletions medusa/storage/s3_base_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
import base64
import boto3
import botocore.session
import concurrent.futures
import logging
import io
import os
Expand Down Expand Up @@ -114,6 +115,8 @@ def __init__(self, config):
self.connection_extra_args = self._make_connection_arguments(config)
self.transfer_config = self._make_transfer_config(config)

self.executor = concurrent.futures.ThreadPoolExecutor(int(config.concurrent_transfers))

super().__init__(config)

def connect(self):
Expand All @@ -125,7 +128,8 @@ def connect(self):
boto_config = Config(
region_name=self.credentials.region,
signature_version='v4',
tcp_keepalive=True
tcp_keepalive=True,
max_pool_connections=int(self.config.concurrent_transfers)
)
self.s3_client = boto3.client(
's3',
Expand All @@ -139,6 +143,7 @@ def disconnect(self):
logging.debug('Disconnecting from S3...')
try:
self.s3_client.close()
self.executor.shutdown()
except Exception as e:
logging.error('Error disconnecting from S3: {}'.format(e))

Expand Down Expand Up @@ -260,7 +265,15 @@ async def _upload_object(self, data: io.BytesIO, object_key: str, headers: t.Dic

@retry(stop_max_attempt_number=MAX_UP_DOWN_LOAD_RETRIES, wait_fixed=5000)
async def _download_blob(self, src: str, dest: str):
blob = await self._stat_blob(src)
# boto has a connection pool, but it does not support the asyncio API
# so we make things ugly and submit the whole download as a task to an executor
# which allows us to download several files in parallel
loop = asyncio.get_event_loop()
future = loop.run_in_executor(self.executor, self.__download_blob, src, dest)
await future

def __download_blob(self, src: str, dest: str):
blob = self.__stat_blob(src)
object_key = blob.name

# we must make sure the blob gets stored under sub-folder (if there is any)
Expand Down Expand Up @@ -304,6 +317,11 @@ async def _stat_blob(self, object_key: str) -> AbstractBlob:
logging.error("An error occurred:", e)
logging.error('Error getting object from s3://{}/{}'.format(self.bucket_name, object_key))

def __stat_blob(self, key):
resp = self.s3_client.head_object(Bucket=self.bucket_name, Key=key)
item_hash = resp['ETag'].replace('"', '')
return AbstractBlob(key, int(resp['ContentLength']), item_hash, resp['LastModified'])

@retry(stop_max_attempt_number=MAX_UP_DOWN_LOAD_RETRIES, wait_fixed=5000)
async def _upload_blob(self, src: str, dest: str) -> ManifestObject:
src_chunks = src.split('/')
Expand All @@ -328,18 +346,29 @@ async def _upload_blob(self, src: str, dest: str) -> ManifestObject:
)
)

self.s3_client.upload_file(
Filename=src,
Bucket=self.bucket_name,
Key=object_key,
Config=self.transfer_config,
ExtraArgs=kms_args,
)

blob = await self._stat_blob(object_key)
mo = ManifestObject(blob.name, blob.size, blob.hash)
upload_conf = {
'Filename': src,
'Bucket': self.bucket_name,
'Key': object_key,
'Config': self.transfer_config,
'ExtraArgs': kms_args,
}
# we are going to combine asyncio with boto's threading
# we do this by submitting the upload into an executor
loop = asyncio.get_event_loop()
future = loop.run_in_executor(self.executor, self.__upload_file, upload_conf)
# and then ask asyncio to yield until it completes
mo = await future
return mo

def __upload_file(self, upload_conf):
self.s3_client.upload_file(**upload_conf)
resp = self.s3_client.head_object(Bucket=upload_conf['Bucket'], Key=upload_conf['Key'])
blob_name = upload_conf['Key']
blob_size = int(resp['ContentLength'])
blob_hash = resp['ETag'].replace('"', '')
return ManifestObject(blob_name, blob_size, blob_hash)

async def _get_object(self, object_key: t.Union[Path, str]) -> AbstractBlob:
blob = await self._stat_blob(str(object_key))
return blob
Expand Down
1 change: 1 addition & 0 deletions tests/resources/config/medusa-ibm_storage.ini
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ transfer_max_bandwidth = "1MB/s"
secure = True
backup_grace_period_in_days = 0
max_backup_count = 1
concurrent_transfers = 16

[monitoring]
monitoring_provider = local
2 changes: 1 addition & 1 deletion tests/resources/config/medusa-minio.ini
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ host = localhost
port = 9000
backup_grace_period_in_days = 0
max_backup_count = 1
concurrent_transfers = 4
concurrent_transfers = 16

[monitoring]
monitoring_provider = local
2 changes: 1 addition & 1 deletion tests/resources/config/medusa-s3_us_west_oregon.ini
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ fqdn = 127.0.0.1
prefix = storage_prefix
multi_part_upload_threshold = 1024
aws_cli_path = aws
concurrent_transfers = 4
concurrent_transfers = 16
backup_grace_period_in_days = 0
max_backup_count = 1
region = us-west-2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ fqdn = 127.0.0.1
prefix = storage_prefix
multi_part_upload_threshold = 1024
aws_cli_path = aws
concurrent_transfers = 4
concurrent_transfers = 16
backup_grace_period_in_days = 0
max_backup_count = 1
kms_id = 939b70ee-a65e-46af-aecf-a20ef6a457b7
Expand Down

0 comments on commit 7aac1de

Please sign in to comment.