Skip to content

Commit

Permalink
Add 'last modified' to S3 object (Netflix#778)
Browse files Browse the repository at this point in the history
* last_modified to S3Object

* run black
  • Loading branch information
akyrola authored Nov 17, 2021
1 parent 0f09a1d commit 584b7e1
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 3 deletions.
25 changes: 22 additions & 3 deletions metaflow/datatools/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ def __init__(
content_type=None,
metadata=None,
range_info=None,
last_modified=None,
):

# all fields of S3Object should return a unicode object
Expand All @@ -107,6 +108,7 @@ def __init__(
self._path = path
self._key = None
self._content_type = content_type
self._last_modified = last_modified

self._metadata = None
if metadata is not None and "metaflow-user-attributes" in metadata:
Expand Down Expand Up @@ -237,6 +239,14 @@ def range_info(self):
"""
return self._range_info

@property
def last_modified(self):
"""
Returns the last modified unix timestamp of the object, or None
if not fetched.
"""
return self._last_modified

def __str__(self):
if self._path:
return "<S3Object %s (%d bytes, local)>" % (self._url, self._size)
Expand Down Expand Up @@ -486,6 +496,7 @@ def _info(s3, tmp):
"content_type": resp["ContentType"],
"metadata": resp["Metadata"],
"size": resp["ContentLength"],
"last_modified": resp["LastModified"].timestamp(),
}

info_results = None
Expand All @@ -504,6 +515,7 @@ def _info(s3, tmp):
size=info_results["size"],
content_type=info_results["content_type"],
metadata=info_results["metadata"],
last_modified=info_results["last_modified"],
)
return S3Object(self._s3root, url, None)

Expand Down Expand Up @@ -547,7 +559,7 @@ def _head():
else:
yield self._s3root, s3url, None, info["size"], info[
"content_type"
], info["metadata"]
], info["metadata"], None, info["last_modified"]
else:
# This should not happen; we should always get a response
# even if it contains an error inside it
Expand Down Expand Up @@ -593,6 +605,7 @@ def _download(s3, tmp):
return {
"content_type": resp["ContentType"],
"metadata": resp["Metadata"],
"last_modified": resp["LastModified"].timestamp(),
}
return None

Expand All @@ -611,6 +624,7 @@ def _download(s3, tmp):
path,
content_type=addl_info["content_type"],
metadata=addl_info["metadata"],
last_modified=addl_info["last_modified"],
)
return S3Object(self._s3root, url, path)

Expand Down Expand Up @@ -652,7 +666,9 @@ def _get():
info = json.load(f)
yield self._s3root, s3url, os.path.join(
self._tmpdir, fname
), None, info["content_type"], info["metadata"]
), None, info["content_type"], info["metadata"], None, info[
"last_modified"
]
else:
yield self._s3root, s3prefix, None
else:
Expand Down Expand Up @@ -694,7 +710,9 @@ def _get():
info = json.load(f)
yield self._s3root, s3url, os.path.join(
self._tmpdir, fname
), None, info["content_type"], info["metadata"]
), None, info["content_type"], info["metadata"], None, info[
"last_modified"
]
else:
yield s3prefix, s3url, os.path.join(self._tmpdir, fname)

Expand Down Expand Up @@ -1023,6 +1041,7 @@ def _s3op_with_retries(self, mode, **options):
raise MetaflowS3NotFound(err_out)
elif ex.returncode == s3op.ERROR_URL_ACCESS_DENIED:
raise MetaflowS3AccessDenied(err_out)
print("Error with S3 operation:", err_out)
time.sleep(2 ** i + random.randint(0, 10))

return None, err_out
4 changes: 4 additions & 0 deletions metaflow/datatools/s3op.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ def op_info(url):
"size": head["ContentLength"],
"content_type": head["ContentType"],
"metadata": head["Metadata"],
"last_modified": head["LastModified"].timestamp(),
}
except client_error as err:
error_code = normalize_client_error(err)
Expand Down Expand Up @@ -183,12 +184,15 @@ def op_info(url):
# TODO specific error message for out of disk space
# If we need the metadata, get it and write it out
if pre_op_info:

with open("%s_meta" % url.local, mode="w") as f:
args = {"size": resp["ContentLength"]}
if resp["ContentType"]:
args["content_type"] = resp["ContentType"]
if resp["Metadata"] is not None:
args["metadata"] = resp["Metadata"]
if resp["LastModified"]:
args["last_modified"] = resp["LastModified"].timestamp()
json.dump(args, f)
# Finally, we push out the size to the result_pipe since
# the size is used for verification and other purposes and
Expand Down

0 comments on commit 584b7e1

Please sign in to comment.