Skip to content

Commit

Permalink
Allow reassigning ErrorMonitor.on_error_callback
Browse files Browse the repository at this point in the history
  • Loading branch information
kgodlewski authored and PatrykGala committed Jan 14, 2025
1 parent 716d01f commit cdd03e8
Showing 1 changed file with 17 additions and 1 deletion.
18 changes: 17 additions & 1 deletion src/neptune_scale/sync/errors_tracking.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ def default_warning_callback(error: BaseException, last_seen_at: Optional[float]


class ErrorsMonitor(Daemon, Resource):
"""A thread that consumes messages from the provided queue, and calls user callbacks based on the error type."""

def __init__(
self,
errors_queue: ErrorsQueue,
Expand Down Expand Up @@ -100,6 +102,15 @@ def get_next(self) -> Optional[BaseException]:
# TODO: we should synchronize here properly instead
return None

@property
def on_error_callback(self) -> Callable[[BaseException, Optional[float]], None]:
return self._on_error_callback

@on_error_callback.setter
def on_error_callback(self, callback: Optional[Callable[[BaseException, Optional[float]], None]]) -> None:
with self._wait_condition:
self._on_error_callback = callback or default_error_callback

def work(self) -> None:
while (error := self.get_next()) is not None:
last_raised_at = self._last_raised_timestamps.get(type(error), None)
Expand All @@ -119,7 +130,12 @@ def work(self) -> None:
elif isinstance(error, NeptuneRetryableError):
self._on_warning_callback(error, last_raised_at)
elif isinstance(error, NeptuneScaleError):
self._on_error_callback(error, last_raised_at)
# Allow swapping the error callback while we're working, but don't hold
# the lock during callback execution
with self._wait_condition:
error_callback = self._on_error_callback

error_callback(error, last_raised_at)
else:
self._on_error_callback(NeptuneUnexpectedError(reason=str(error)), last_raised_at)
except Exception as e:
Expand Down

0 comments on commit cdd03e8

Please sign in to comment.