diff --git a/src/neptune_scale/core/components/sync_process.py b/src/neptune_scale/core/components/sync_process.py index a620c452..3fffdfea 100644 --- a/src/neptune_scale/core/components/sync_process.py +++ b/src/neptune_scale/core/components/sync_process.py @@ -55,7 +55,6 @@ def __init__( self._state: SyncProcessState = SyncProcessState.INIT def run(self) -> None: - # TODO: Think of maxsize setting to internal queue size worker = SyncProcessWorker( family=self._family, api_token=self._api_token, @@ -69,7 +68,6 @@ def run(self) -> None: try: worker.join() except KeyboardInterrupt: - logger.info("Ending...") worker.terminate() worker.join() worker.close() @@ -136,7 +134,7 @@ def join(self) -> None: thread.join() -class ExternalToInternalOperationsThread(Daemon): +class ExternalToInternalOperationsThread(Daemon, Resource): def __init__( self, external: multiprocessing.Queue[QueueElement], @@ -192,7 +190,12 @@ def __init__( self._last_put_seq: Synchronized[int] = last_put_seq self._last_put_seq_wait: Condition = last_put_seq_wait + self._latest_unprocessed: QueueElement | None = None + def get_next(self) -> QueueElement | None: + if self._latest_unprocessed is not None: + return self._latest_unprocessed + try: return self._operations_queue.get_nowait() except queue.Empty: @@ -207,23 +210,23 @@ def submit(self, *, operation: RunOperation) -> None: self._errors_queue.put(e) def work(self) -> None: - # TODO: Batching while (operation := self.get_next()) is not None: + self._latest_unprocessed = operation sequence_id, occured_at, data = operation - # TODO: Check for lag exceeded - try: run_operation = RunOperation() run_operation.ParseFromString(data) except Exception as e: self._errors_queue.put(e) + # We're skipping to the next operation + self._latest_unprocessed = None continue - # self.submit(operation=run_operation) + self.submit(operation=run_operation) + self._latest_unprocessed = None - # Update Last PUT sequence id and notify main process + # Update Last PUT sequence id and notify threads in the main process with self._last_put_seq_wait: - with self._last_put_seq.get_lock(): - self._last_put_seq.value = sequence_id + self._last_put_seq.value = sequence_id self._last_put_seq_wait.notify_all()