Skip to content

Commit

Permalink
Refactors
Browse files Browse the repository at this point in the history
  • Loading branch information
Raalsky committed Aug 21, 2024
1 parent 52a2968 commit d65985c
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 70 deletions.
128 changes: 59 additions & 69 deletions src/neptune_scale/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,55 +329,80 @@ def log(
for operation in splitter:
self._operations_queue.enqueue(operation=operation)

def wait_for_submission(self, timeout: Optional[float] = None) -> None:
"""
Waits until all metadata is submitted to Neptune.
def _wait(
self,
phrase: str,
sleep_time: float,
wait_condition: ConditionT,
external_value: Synchronized[int],
timeout: Optional[float] = None,
verbose: bool = True,
) -> None:
if verbose:
logger.info(f"Waiting for all operations to be {phrase}")

Args:
timeout: Maximum time to wait for submission
"""
begin_time = time.time()
logger.info("Waiting for all operations to be submitted")
if timeout is None:
if timeout is None and verbose:
logger.warning("No timeout specified. Waiting indefinitely")

with self._lock:
if not self._sync_process.is_alive():
logger.warning("Sync process is not running")
if verbose:
logger.warning("Sync process is not running")
return # No need to wait if the sync process is not running

sleep_time_wait = (
min(MINIMAL_WAIT_FOR_PUT_SLEEP_TIME, timeout) if timeout is not None else MINIMAL_WAIT_FOR_PUT_SLEEP_TIME
)
begin_time = time.time()
wait_time = min(sleep_time, timeout) if timeout is not None else sleep_time
last_queued_sequence_id = self._operations_queue.last_sequence_id
last_message_printed: Optional[float] = None

while True:
with self._last_put_seq_wait:
self._last_put_seq_wait.wait(timeout=sleep_time_wait)
value = self._last_put_seq.value
with wait_condition:
wait_condition.wait(timeout=wait_time)
value = external_value.value

if value == -1:
if self._operations_queue.last_sequence_id != -1:
if last_message_printed is None or time.time() - last_message_printed > STOP_MESSAGE_FREQUENCY:
if verbose and should_print_message(last_message_printed):
last_message_printed = time.time()
logger.info(
"Waiting. No operations were submitted yet. Operations to sync: %s",
f"Waiting. No operations were {phrase} yet. Operations to sync: %s",
self._operations_queue.last_sequence_id + 1,
)
else:
if last_message_printed is None or time.time() - last_message_printed > STOP_MESSAGE_FREQUENCY:
if verbose and should_print_message(last_message_printed):
last_message_printed = time.time()
logger.info("Waiting. No operations were submitted yet")
logger.info(f"Waiting. No operations were {phrase} yet")
else:
if last_message_printed is None or time.time() - last_message_printed > STOP_MESSAGE_FREQUENCY:
if verbose and should_print_message(last_message_printed):
last_message_printed = time.time()
logger.info(
"Waiting until remaining %d operation(s) will be submitted",
f"Waiting until remaining %d operation(s) will be {phrase}",
last_queued_sequence_id - value + 1,
)

# Reaching the last queued sequence ID means that all operations were submitted
if value >= last_queued_sequence_id or (timeout is not None and time.time() - begin_time > timeout):
break

logger.info("All operations were submitted")
if verbose:
logger.info(f"All operations were {phrase}")

def wait_for_submission(self, timeout: Optional[float] = None, verbose: bool = True) -> None:
"""
Waits until all metadata is submitted to Neptune.
Args:
timeout: Maximum time to wait for submission
verbose: Whether to print messages about the waiting process
"""
self._wait(
phrase="submitted",
sleep_time=MINIMAL_WAIT_FOR_PUT_SLEEP_TIME,
wait_condition=self._last_put_seq_wait,
external_value=self._last_put_seq,
timeout=timeout,
verbose=verbose,
)

def wait_for_processing(self, timeout: Optional[float] = None, verbose: bool = True) -> None:
"""
Expand All @@ -387,51 +412,16 @@ def wait_for_processing(self, timeout: Optional[float] = None, verbose: bool = T
timeout: Maximum time to wait for processing.
verbose: Whether to print messages about the waiting process
"""
begin_time = time.time()
if verbose:
logger.info("Waiting for all operations to be processed")
if timeout is None and verbose:
logger.warning("No timeout specified. Waiting indefinitely")

with self._lock:
if not self._sync_process.is_alive():
if verbose:
logger.warning("Sync process is not running")
return # No need to wait if the sync process is not running

sleep_time_wait = (
min(MINIMAL_WAIT_FOR_ACK_SLEEP_TIME, timeout) if timeout is not None else MINIMAL_WAIT_FOR_PUT_SLEEP_TIME
self._wait(
phrase="processed",
sleep_time=MINIMAL_WAIT_FOR_ACK_SLEEP_TIME,
wait_condition=self._last_ack_seq_wait,
external_value=self._last_ack_seq,
timeout=timeout,
verbose=verbose,
)
last_queued_sequence_id = self._operations_queue.last_sequence_id
last_message_printed: Optional[float] = None
while True:
with self._last_ack_seq_wait:
self._last_ack_seq_wait.wait(timeout=sleep_time_wait)
value = self._last_ack_seq.value
if value == -1:
if self._operations_queue.last_sequence_id != -1:
if last_message_printed is None or time.time() - last_message_printed > STOP_MESSAGE_FREQUENCY:
last_message_printed = time.time()
if verbose:
logger.info(
"Waiting. No operations were processed yet. Operations to sync: %s",
self._operations_queue.last_sequence_id + 1,
)
else:
if last_message_printed is None or time.time() - last_message_printed > STOP_MESSAGE_FREQUENCY:
last_message_printed = time.time()
if verbose:
logger.info("Waiting. No operations were processed yet")
else:
if last_message_printed is None or time.time() - last_message_printed > STOP_MESSAGE_FREQUENCY:
last_message_printed = time.time()
if verbose:
logger.info(
"Waiting until remaining %d operation(s) will be processed",
last_queued_sequence_id - value + 1,
)
if value >= last_queued_sequence_id or (timeout is not None and time.time() - begin_time > timeout):
break

if verbose:
logger.info("All operations were processed")

def should_print_message(last_message_printed: Optional[float]) -> bool:
"""Check if enough time has passed to print a message."""
return last_message_printed is None or time.time() - last_message_printed > STOP_MESSAGE_FREQUENCY
5 changes: 4 additions & 1 deletion src/neptune_scale/api/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
Error,
)
from neptune_api.proto.google_rpc.code_pb2 import Code
from neptune_api.proto.neptune_pb.ingest.v1.ingest_pb2 import IngestCode
from neptune_api.proto.neptune_pb.ingest.v1.pub.client_pb2 import (
BulkRequestStatus,
RequestId,
Expand Down Expand Up @@ -150,7 +151,9 @@ def check_batch(self, request_ids: list[str], project: str) -> Response[BulkRequ
response_body = BulkRequestStatus(
statuses=list(
map(
lambda _: RequestStatus(code_by_count=[RequestStatus.CodeByCount(count=1, code=Code.OK)]),
lambda _: RequestStatus(
code_by_count=[RequestStatus.CodeByCount(count=1, code=Code.OK, detail=IngestCode.OK)]
),
request_ids,
)
)
Expand Down

0 comments on commit d65985c

Please sign in to comment.