Skip to content

Commit

Permalink
Fix anonymous reading in materialize and add rate limited logging. (#898
Browse files Browse the repository at this point in the history
)

* Fix anonymous reading in materialize and add rate limited logging.

* In materialize, try reading using the credentials, but if it doesn't work, fall back to
  reading anonymously if that seems to be working.
* Add rate limited logging to reading via materialize in local mode.
* Check for no root before checking if a source since that makes more sense.
* switch ntsb_loader_materialized.py over to read in local mode, it was working (with the anonymous
  fix), but was very slow hence the logging.
  • Loading branch information
eric-anderson authored Oct 10, 2024
1 parent 1b102e3 commit 0e9fd44
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 7 deletions.
12 changes: 7 additions & 5 deletions examples/query/ntsb_loader_materialized.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,11 @@
}


context = sycamore.init()
_ = context.read.materialize("s3://aryn-public/materialize/examples/luna/ntsb_loader_2024-08-29").write.opensearch(
os_client_args=os_client_args,
index_name=INDEX,
index_settings=index_settings,
context = sycamore.init(exec_mode=sycamore.EXEC_LOCAL)
(
context.read.materialize("s3://aryn-public/materialize/examples/luna/ntsb_loader_2024-08-29").write.opensearch(
os_client_args=os_client_args,
index_name=INDEX,
index_settings=index_settings,
)
)
40 changes: 38 additions & 2 deletions lib/sycamore/sycamore/materialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,17 +95,45 @@ def __init__(

super().__init__(child, **kwargs)

self._maybe_anonymous()

def _maybe_anonymous(self):
if self._root is None:
return
from pyarrow.fs import S3FileSystem

if not isinstance(self._fs, S3FileSystem):
return

try:
self._fs.get_file_info(str(self._root))
return
except OSError as e:
logging.warning(f"Got error {e} trying to get file info on {self._root}, trying again in anonymous mode")

fs = S3FileSystem(anonymous=True)
try:
fs.get_file_info(str(self._root))
self._fs = fs
self._fshelper = _PyArrowFsHelper(self._fs)
return
except OSError as e:
logging.warning(
f"Got error {e} trying to anonymously get file info on {self._root}. Likely to fail shortly."
)
return

def prepare(self):
"""
Clean up the materialize location if necessary.
Validate that cleaning worked, but only once all materializes have finished cleaning.
This protects against multiple materializes pointing to the same location.
"""

if self._will_be_source():
if self._root is None:
return

if self._root is None:
if self._will_be_source():
return

if not self._clean_root:
Expand Down Expand Up @@ -207,13 +235,21 @@ def local_source(self) -> list[Document]:
logger.info(f"Using {self._orig_path} as cached source of data")
if not self._fshelper.file_exists(self._success_path()):
logging.warning(f"materialize.success not found in {self._orig_path}. Returning partial data")
from sycamore.utils.sycamore_logger import LoggerFilter

limited_logger = logging.getLogger(__name__ + ".limited_local_source")
limited_logger.addFilter(LoggerFilter())
ret = []
count = 0
for fi in self._fshelper.list_files(self._root):
n = Path(fi.path)
if n.suffix == ".pickle":
limited_logger.info(f" reading file {count} from {str(n)}")
count = count + 1
f = self._fs.open_input_stream(str(n))
ret.append(Document.deserialize(f.read()))
f.close()
logger.info(f" read {count} total files")

return ret

Expand Down
28 changes: 28 additions & 0 deletions lib/sycamore/sycamore/tests/unit/utils/test_sycamore_logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import logging
import time

from sycamore.utils.sycamore_logger import LoggerFilter


def test_logger_ratelimit(caplog):
logger = logging.getLogger("test_sycamore")

with caplog.at_level(logging.INFO):
for i in range(5):
logger.info(f"Unbounded {i}")

logger.addFilter(LoggerFilter())
for i in range(5):
logger.info(f"Bounded {i}")

time.sleep(1)
logger.info("Bounded After")

for i in range(5):
assert f"Unbounded {i}\n" in caplog.text

assert "Bounded 0" in caplog.text
for i in range(1, 5):
assert f"Bounded {i}\n" not in caplog.text

assert "Bounded After\n" in caplog.text
29 changes: 29 additions & 0 deletions lib/sycamore/sycamore/utils/sycamore_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import sys
import time

from datetime import datetime, timedelta

handler_setup = False


Expand All @@ -27,3 +29,30 @@ def get_logger():
"""Get an application logger"""
logger = logging.getLogger("sycamore")
return logger


class LoggerFilter(logging.Filter):
def __init__(self, seconds=1):
"""
A filter limit the rate of log messages.
logger = logging.getLogger(__name__)
logger.setFilter(LoggerFilter())
Args:
seconds: Minimum seconds between log messages.
"""

self._min_interval = timedelta(seconds=seconds)
self._next_log = datetime.now()

def filter(self, record=None):
if record is None:
assert False
now = datetime.now()
if now >= self._next_log:
self._next_log = now + self._min_interval
return True

return False

0 comments on commit 0e9fd44

Please sign in to comment.