Skip to content

Commit

Permalink
[Local Storage] Prevent OOM errors by processing files by chunks
Browse files Browse the repository at this point in the history
  • Loading branch information
rzvoncek committed Oct 25, 2023
1 parent 98f6d63 commit 255cf44
Showing 1 changed file with 35 additions and 6 deletions.
41 changes: 35 additions & 6 deletions medusa/storage/local_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
from medusa.storage.abstract_storage import AbstractStorage, AbstractBlob, ManifestObject, ObjectDoesNotExistError


BUFFER_SIZE = 4 * 1024 * 1024


class LocalStorage(AbstractStorage):

def __init__(self, config):
Expand Down Expand Up @@ -63,17 +66,31 @@ async def _list_blobs(self, prefix=None):

def _md5(self, file_path: str) -> str:
with open(file_path, 'rb') as f:
return hashlib.md5(f.read()).hexdigest()
md5 = hashlib.md5()
while True:

Check warning on line 70 in medusa/storage/local_storage.py

View check run for this annotation

Codecov / codecov/patch

medusa/storage/local_storage.py#L70

Added line #L70 was not covered by tests
data = f.read(BUFFER_SIZE)
if not data:
break
md5.update(data)
return md5.hexdigest()

async def _upload_object(self, data: io.BytesIO, object_key: str, headers: t.Dict[str, str]) -> AbstractBlob:
object_path = self.root_dir / object_key
object_path.parent.mkdir(parents=True, exist_ok=True)

with open(object_path, 'wb') as f:
f.write(data.read())
md5 = hashlib.md5()
while True:

Check warning on line 83 in medusa/storage/local_storage.py

View check run for this annotation

Codecov / codecov/patch

medusa/storage/local_storage.py#L83

Added line #L83 was not covered by tests
chunk = data.read(BUFFER_SIZE)
if not chunk:
break
md5.update(chunk)
f.write(chunk)

return AbstractBlob(
object_key,
os.stat(object_path).st_size,
self._md5(object_path),
md5.hexdigest(),
datetime.datetime.fromtimestamp(os.stat(object_path).st_mtime)
)

Expand All @@ -95,7 +112,11 @@ async def _download_blob(self, src: str, dest: str):

with open(src_file, 'rb') as f:
with open(dest_file, 'wb') as d:
d.write(f.read())
while True:

Check warning on line 115 in medusa/storage/local_storage.py

View check run for this annotation

Codecov / codecov/patch

medusa/storage/local_storage.py#L115

Added line #L115 was not covered by tests
data = f.read(BUFFER_SIZE)
if not data:
break
d.write(data)

async def _upload_blob(self, src: str, dest: str) -> ManifestObject:

Expand All @@ -119,14 +140,22 @@ async def _upload_blob(self, src: str, dest: str) -> ManifestObject:
# remove root_dir from dest_file name
dest_object_key = str(dest_file.relative_to(str(self.root_dir)))

md5 = hashlib.md5()

with open(src_file, 'rb') as f:
dest_file.parent.mkdir(parents=True, exist_ok=True)
with open(dest_file, 'wb') as d:
d.write(f.read())
while True:

Check warning on line 148 in medusa/storage/local_storage.py

View check run for this annotation

Codecov / codecov/patch

medusa/storage/local_storage.py#L148

Added line #L148 was not covered by tests
data = f.read(BUFFER_SIZE)
if not data:
break
d.write(data)
md5.update(data)

return ManifestObject(
dest_object_key,
os.stat(dest_file).st_size,
self._md5(dest_file),
md5.hexdigest(),
)

async def _get_object(self, object_key: t.Union[Path, str]) -> AbstractBlob:
Expand Down

0 comments on commit 255cf44

Please sign in to comment.