Skip to content

Commit

Permalink
Sync thread fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Raalsky committed Aug 16, 2024
1 parent 3109d9b commit 1e1a056
Showing 1 changed file with 13 additions and 10 deletions.
23 changes: 13 additions & 10 deletions src/neptune_scale/core/components/sync_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ def __init__(
self._state: SyncProcessState = SyncProcessState.INIT

def run(self) -> None:
# TODO: Think of maxsize setting to internal queue size
worker = SyncProcessWorker(
family=self._family,
api_token=self._api_token,
Expand All @@ -69,7 +68,6 @@ def run(self) -> None:
try:
worker.join()
except KeyboardInterrupt:
logger.info("Ending...")
worker.terminate()
worker.join()
worker.close()
Expand Down Expand Up @@ -136,7 +134,7 @@ def join(self) -> None:
thread.join()


class ExternalToInternalOperationsThread(Daemon):
class ExternalToInternalOperationsThread(Daemon, Resource):
def __init__(
self,
external: multiprocessing.Queue[QueueElement],
Expand Down Expand Up @@ -192,7 +190,12 @@ def __init__(
self._last_put_seq: Synchronized[int] = last_put_seq
self._last_put_seq_wait: Condition = last_put_seq_wait

self._latest_unprocessed: QueueElement | None = None

def get_next(self) -> QueueElement | None:
if self._latest_unprocessed is not None:
return self._latest_unprocessed

try:
return self._operations_queue.get_nowait()
except queue.Empty:
Expand All @@ -207,23 +210,23 @@ def submit(self, *, operation: RunOperation) -> None:
self._errors_queue.put(e)

def work(self) -> None:
# TODO: Batching
while (operation := self.get_next()) is not None:
self._latest_unprocessed = operation
sequence_id, occured_at, data = operation

# TODO: Check for lag exceeded

try:
run_operation = RunOperation()
run_operation.ParseFromString(data)
except Exception as e:
self._errors_queue.put(e)
# We're skipping to the next operation
self._latest_unprocessed = None
continue

# self.submit(operation=run_operation)
self.submit(operation=run_operation)
self._latest_unprocessed = None

# Update Last PUT sequence id and notify main process
# Update Last PUT sequence id and notify threads in the main process
with self._last_put_seq_wait:
with self._last_put_seq.get_lock():
self._last_put_seq.value = sequence_id
self._last_put_seq.value = sequence_id
self._last_put_seq_wait.notify_all()

0 comments on commit 1e1a056

Please sign in to comment.