From cdd03e8b8c2995803f21cf6fc721921d851af54e Mon Sep 17 00:00:00 2001 From: Krzysztof Godlewski Date: Fri, 10 Jan 2025 15:45:59 +0100 Subject: [PATCH] Allow reassigning `ErrorMonitor.on_error_callback` --- src/neptune_scale/sync/errors_tracking.py | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/src/neptune_scale/sync/errors_tracking.py b/src/neptune_scale/sync/errors_tracking.py index 8fadecd..29fa69c 100644 --- a/src/neptune_scale/sync/errors_tracking.py +++ b/src/neptune_scale/sync/errors_tracking.py @@ -63,6 +63,8 @@ def default_warning_callback(error: BaseException, last_seen_at: Optional[float] class ErrorsMonitor(Daemon, Resource): + """A thread that consumes messages from the provided queue, and calls user callbacks based on the error type.""" + def __init__( self, errors_queue: ErrorsQueue, @@ -100,6 +102,15 @@ def get_next(self) -> Optional[BaseException]: # TODO: we should synchronize here properly instead return None + @property + def on_error_callback(self) -> Callable[[BaseException, Optional[float]], None]: + return self._on_error_callback + + @on_error_callback.setter + def on_error_callback(self, callback: Optional[Callable[[BaseException, Optional[float]], None]]) -> None: + with self._wait_condition: + self._on_error_callback = callback or default_error_callback + def work(self) -> None: while (error := self.get_next()) is not None: last_raised_at = self._last_raised_timestamps.get(type(error), None) @@ -119,7 +130,12 @@ def work(self) -> None: elif isinstance(error, NeptuneRetryableError): self._on_warning_callback(error, last_raised_at) elif isinstance(error, NeptuneScaleError): - self._on_error_callback(error, last_raised_at) + # Allow swapping the error callback while we're working, but don't hold + # the lock during callback execution + with self._wait_condition: + error_callback = self._on_error_callback + + error_callback(error, last_raised_at) else: self._on_error_callback(NeptuneUnexpectedError(reason=str(error)), last_raised_at) except Exception as e: