diff --git a/src/neptune_scale/api/attribute.py b/src/neptune_scale/api/attribute.py index 643f25c..aad3866 100644 --- a/src/neptune_scale/api/attribute.py +++ b/src/neptune_scale/api/attribute.py @@ -116,7 +116,7 @@ def log( ) for operation, metadata_size in splitter: - self._operations_queue.enqueue(operation=operation, size=metadata_size, step=step) + self._operations_queue.enqueue_run_op(operation=operation, size=metadata_size, step=step) def _verify_and_update_metrics_state(self, step: Optional[float], metrics: Optional[Dict[str, float]]) -> None: """Check if step in provided metrics is increasing, raise `NeptuneSeriesStepNonIncreasing` if not.""" diff --git a/src/neptune_scale/api/run.py b/src/neptune_scale/api/run.py index 5e053b8..cbed706 100644 --- a/src/neptune_scale/api/run.py +++ b/src/neptune_scale/api/run.py @@ -225,6 +225,9 @@ def __init__( self._last_ack_timestamp = SharedFloat(-1) self._process_link = ProcessLink() + + self._files_in_progress_count = SharedInt(0) + self._sync_process = SyncProcess( project=self._project, family=self._run_id, @@ -235,6 +238,7 @@ def __init__( last_queued_seq=self._last_queued_seq, last_ack_seq=self._last_ack_seq, last_ack_timestamp=self._last_ack_timestamp, + files_in_progress_counter=self._files_in_progress_count, max_queue_size=max_queue_size, mode=mode, ) @@ -392,7 +396,7 @@ def _create_run( creation_time=None if creation_time is None else datetime_to_proto(creation_time), ), ) - self._operations_queue.enqueue(operation=operation) + self._operations_queue.enqueue_run_op(operation=operation) def __getitem__(self, key: str) -> Attribute: return self._attr_store[key] @@ -556,6 +560,14 @@ def log( step=step, timestamp=timestamp, configs=configs, metrics=metrics, tags_add=tags_add, tags_remove=tags_remove ) + def log_file( + self, + attribute_path: str, + local_path: str, + target_base_name: Optional[str] = None, + target_path: Optional[str] = None, + ) -> None: ... + def _wait( self, phrase: str, diff --git a/src/neptune_scale/sync/dispatcher.py b/src/neptune_scale/sync/dispatcher.py new file mode 100644 index 0000000..c225151 --- /dev/null +++ b/src/neptune_scale/sync/dispatcher.py @@ -0,0 +1,104 @@ +from __future__ import annotations + +import multiprocessing +import queue +from typing import Optional + +from neptune_api.proto.neptune_pb.ingest.v1.pub.ingest_pb2 import RunOperation + +from neptune_scale.exceptions import ( + NeptuneOperationsQueueMaxSizeExceeded, + NeptuneSynchronizationStopped, +) +from neptune_scale.sync.aggregating_queue import AggregatingQueue +from neptune_scale.sync.errors_tracking import ErrorsQueue +from neptune_scale.sync.file_upload import FileUploader +from neptune_scale.sync.parameters import INTERNAL_QUEUE_FEEDER_THREAD_SLEEP_TIME +from neptune_scale.sync.queue_element import ( + OperationMessage, + OperationType, +) +from neptune_scale.util import ( + Daemon, + logger, +) +from neptune_scale.util.abstract import Resource + + +class OperationDispatcherThread(Daemon, Resource): + """Retrieves messages from the operations queue that is fed by the main process. Dispatches messages based on + their type: + * SingleOperation: common logging operations - push to the aggregating queue + * UploadFileOperation: push to file upload queue + """ + + def __init__( + self, + input_queue: multiprocessing.Queue[OperationMessage], + operations_queue: AggregatingQueue, + errors_queue: ErrorsQueue, + file_uploader: FileUploader, + ) -> None: + super().__init__(name="OperationDispatcherThread", sleep_time=INTERNAL_QUEUE_FEEDER_THREAD_SLEEP_TIME) + + self._input_queue: multiprocessing.Queue[OperationMessage] = input_queue + self._operations_queue: AggregatingQueue = operations_queue + self._errors_queue: ErrorsQueue = errors_queue + self._file_uploader: FileUploader = file_uploader + + self._latest_unprocessed: Optional[OperationMessage] = None + + def get_next(self) -> Optional[OperationMessage]: + if self._latest_unprocessed is not None: + return self._latest_unprocessed + + try: + self._latest_unprocessed = self._input_queue.get(timeout=INTERNAL_QUEUE_FEEDER_THREAD_SLEEP_TIME) + return self._latest_unprocessed + except queue.Empty: + return None + + def commit(self) -> None: + self._latest_unprocessed = None + + def work(self) -> None: + try: + while not self._is_interrupted(): + message = self.get_next() + if message is None: + continue + + if not self.dispatch(message): + break + except Exception as e: + self._errors_queue.put(e) + self.interrupt() + raise NeptuneSynchronizationStopped() from e + + def dispatch(self, message: OperationMessage) -> bool: + op = message.operation + try: + if message.type == OperationType.SINGLE_OPERATION: + self._operations_queue.put_nowait(op) + elif message.type == OperationType.UPLOAD_FILE: + self._file_uploader.start_upload( + self._finalize_file_upload, op.local_path, op.target_path, op.target_basename + ) + + self.commit() + return True + except queue.Full: + logger.debug( + "Operations queue is full (%d elements), waiting for free space", self._operations_queue.maxsize + ) + self._errors_queue.put(NeptuneOperationsQueueMaxSizeExceeded(max_size=self._operations_queue.maxsize)) + return False + + def _finalize_file_upload(self, path: str, error: Optional[Exception]) -> None: + if error: + self._errors_queue.put(error) + return + + op = RunOperation() + # TODO: Fill it out once we have established the protocol with the backend + self._operations_queue.put_nowait(op) diff --git a/src/neptune_scale/sync/file_upload.py b/src/neptune_scale/sync/file_upload.py new file mode 100644 index 0000000..1f022cc --- /dev/null +++ b/src/neptune_scale/sync/file_upload.py @@ -0,0 +1,77 @@ +from concurrent import futures +from pathlib import Path +from typing import ( + Optional, + Protocol, +) + +from neptune_scale.sync.errors_tracking import ErrorsQueue +from neptune_scale.util import SharedInt + + +class Finalizer(Protocol): + def __call__(self, path: str, error: Optional[Exception] = None) -> None: ... + + +class FileUploader: + def __init__( + self, project: str, api_token: str, family: str, in_progress_counter: SharedInt, errors_queue: ErrorsQueue + ) -> None: + self._project = project + self._api_token = api_token + self._family = family + self._errors_queue = errors_queue + self._in_progress_counter = in_progress_counter + self._executor = futures.ThreadPoolExecutor() + + def start_upload( + self, finalizer: Finalizer, local_path: Path, target_path: Optional[str], target_basename: Optional[str] + ) -> None: + with self._in_progress_counter: + self._in_progress_counter.value += 1 + + self._executor.submit(self._do_upload, finalizer, local_path, target_path, target_basename) + + def _do_upload( + self, finalizer: Finalizer, local_path: Path, target_path: Optional[str], target_basename: Optional[str] + ) -> None: + path = determine_path(local_path, target_path, target_basename) + + try: + url = self._request_upload_url(path) + upload_file(local_path, url) + error = None + except Exception as e: + error = e + + finalizer(path, error=error) + + with self._in_progress_counter: + self._in_progress_counter.value -= 1 + assert self._in_progress_counter.value >= 0 + + self._in_progress_counter.notify_all() + + def _request_upload_url(self, path) -> str: + assert self._api_token + # TODO: temporary + return "http://localhost:8012/" + path + + def wait_for_completion(self) -> None: + self._executor.shutdown() + with self._in_progress_counter: + assert self._in_progress_counter.value >= 0 + + +def determine_path(local_path: Path, target_path: Optional[str], target_basename: Optional[str]) -> str: + if target_path: + return target_path + + # TODO: figure out the path + return str(Path("DUMMY_PATH") / local_path) + + +def upload_file(local_path: Path, url: str) -> None: + # TODO: do the actual work :) + assert local_path and url + pass diff --git a/src/neptune_scale/sync/operations_queue.py b/src/neptune_scale/sync/operations_queue.py index f1ec0df..a6c2793 100644 --- a/src/neptune_scale/sync/operations_queue.py +++ b/src/neptune_scale/sync/operations_queue.py @@ -15,7 +15,11 @@ MAX_QUEUE_ELEMENT_SIZE, MAX_QUEUE_SIZE, ) -from neptune_scale.sync.queue_element import SingleOperation +from neptune_scale.sync.queue_element import ( + OperationMessage, + OperationType, + SingleOperation, +) from neptune_scale.util import get_logger from neptune_scale.util.abstract import Resource @@ -41,10 +45,10 @@ def __init__( self._sequence_id: int = 0 self._last_timestamp: Optional[float] = None - self._queue: Queue[SingleOperation] = Queue(maxsize=min(MAX_MULTIPROCESSING_QUEUE_SIZE, max_size)) + self._queue: Queue[OperationMessage] = Queue(maxsize=min(MAX_MULTIPROCESSING_QUEUE_SIZE, max_size)) @property - def queue(self) -> Queue[SingleOperation]: + def queue(self) -> Queue[OperationMessage]: return self._queue @property @@ -57,7 +61,16 @@ def last_timestamp(self) -> Optional[float]: with self._lock: return self._last_timestamp - def enqueue(self, *, operation: RunOperation, size: Optional[int] = None, step: Optional[float] = None) -> None: + def enqueue(self, message: OperationMessage) -> int: + with self._lock: + self._queue.put(message, block=True, timeout=None) + self._sequence_id += 1 + + return self._sequence_id - 1 + + def enqueue_run_op( + self, *, operation: RunOperation, size: Optional[int] = None, step: Optional[float] = None + ) -> int: try: is_metadata_update = operation.HasField("update") serialized_operation = operation.SerializeToString() @@ -69,18 +82,23 @@ def enqueue(self, *, operation: RunOperation, size: Optional[int] = None, step: self._last_timestamp = monotonic() # TODO: should we not block here, and just call the error callback if we were to block? self._queue.put( - SingleOperation( - sequence_id=self._sequence_id, - timestamp=self._last_timestamp, - operation=serialized_operation, - metadata_size=size, - is_batchable=is_metadata_update, - step=step, + OperationMessage( + OperationType.SINGLE_OPERATION, + SingleOperation( + sequence_id=self._sequence_id, + timestamp=self._last_timestamp, + operation=serialized_operation, + metadata_size=size, + is_batchable=is_metadata_update, + step=step, + ), ), block=True, timeout=None, ) self._sequence_id += 1 + + return self._sequence_id - 1 except Exception as e: logger.error("Failed to enqueue operation: %s %s", e, operation) raise e diff --git a/src/neptune_scale/sync/queue_element.py b/src/neptune_scale/sync/queue_element.py index da33eae..53b6261 100644 --- a/src/neptune_scale/sync/queue_element.py +++ b/src/neptune_scale/sync/queue_element.py @@ -1,8 +1,14 @@ -__all__ = ("BatchedOperations", "SingleOperation") +__all__ = ("BatchedOperations", "SingleOperation", "UploadFile", "OperationType", "OperationMessage") +import pathlib +from enum import ( + Enum, + auto, +) from typing import ( NamedTuple, Optional, + Union, ) @@ -28,3 +34,19 @@ class SingleOperation(NamedTuple): metadata_size: Optional[int] # Step provided by the user step: Optional[float] + + +class UploadFile(NamedTuple): + local_path: pathlib.Path + target_path: Optional[str] + target_basename: Optional[str] + + +class OperationType(Enum): + UPLOAD_FILE = auto() + SINGLE_OPERATION = auto() + + +class OperationMessage(NamedTuple): + type: OperationType + operation: Union[SingleOperation, UploadFile] diff --git a/src/neptune_scale/sync/sync_process.py b/src/neptune_scale/sync/sync_process.py index c23604f..2e95505 100644 --- a/src/neptune_scale/sync/sync_process.py +++ b/src/neptune_scale/sync/sync_process.py @@ -41,7 +41,6 @@ NeptuneConnectionLostError, NeptuneFloatValueNanInfUnsupported, NeptuneInternalServerError, - NeptuneOperationsQueueMaxSizeExceeded, NeptuneProjectInvalidName, NeptuneProjectNotFound, NeptuneRetryableError, @@ -67,9 +66,10 @@ with_api_errors_handling, ) from neptune_scale.sync.aggregating_queue import AggregatingQueue +from neptune_scale.sync.dispatcher import OperationDispatcherThread from neptune_scale.sync.errors_tracking import ErrorsQueue +from neptune_scale.sync.file_upload import FileUploader from neptune_scale.sync.parameters import ( - INTERNAL_QUEUE_FEEDER_THREAD_SLEEP_TIME, MAX_QUEUE_SIZE, MAX_REQUEST_RETRY_SECONDS, MAX_REQUESTS_STATUS_BATCH_SIZE, @@ -177,6 +177,7 @@ def __init__( last_queued_seq: SharedInt, last_ack_seq: SharedInt, last_ack_timestamp: SharedFloat, + files_in_progress_counter: SharedInt, max_queue_size: int = MAX_QUEUE_SIZE, ) -> None: super().__init__(name="SyncProcess") @@ -184,6 +185,9 @@ def __init__( self._input_operations_queue: Queue[SingleOperation] = operations_queue self._errors_queue: ErrorsQueue = errors_queue self._process_link: ProcessLink = process_link + self._file_uploader: FileUploader = FileUploader( + project, api_token, family, files_in_progress_counter, errors_queue + ) self._api_token: str = api_token self._project: str = project self._family: str = family @@ -216,6 +220,7 @@ def run(self) -> None: api_token=self._api_token, errors_queue=self._errors_queue, input_queue=self._input_operations_queue, + file_uploader=self._file_uploader, last_queued_seq=self._last_queued_seq, last_ack_seq=self._last_ack_seq, max_queue_size=self._max_queue_size, @@ -247,6 +252,7 @@ def __init__( mode: Literal["async", "disabled"], errors_queue: ErrorsQueue, input_queue: multiprocessing.Queue[SingleOperation], + file_uploader: FileUploader, last_queued_seq: SharedInt, last_ack_seq: SharedInt, last_ack_timestamp: SharedFloat, @@ -269,6 +275,7 @@ def __init__( input_queue=input_queue, operations_queue=self._internal_operations_queue, errors_queue=self._errors_queue, + file_uploader=file_uploader, ) self._status_tracking_thread = StatusTrackingThread( api_token=api_token, @@ -306,59 +313,6 @@ def join(self, timeout: Optional[int] = None) -> None: thread.join(timeout=timeout) -class OperationDispatcherThread(Daemon, Resource): - def __init__( - self, - input_queue: multiprocessing.Queue[SingleOperation], - operations_queue: AggregatingQueue, - errors_queue: ErrorsQueue, - ) -> None: - super().__init__(name="OperationDispatcherThread", sleep_time=INTERNAL_QUEUE_FEEDER_THREAD_SLEEP_TIME) - - self._input_queue: multiprocessing.Queue[SingleOperation] = input_queue - self._operations_queue: AggregatingQueue = operations_queue - self._errors_queue: ErrorsQueue = errors_queue - - self._latest_unprocessed: Optional[SingleOperation] = None - - def get_next(self) -> Optional[SingleOperation]: - if self._latest_unprocessed is not None: - return self._latest_unprocessed - - try: - self._latest_unprocessed = self._input_queue.get(timeout=INTERNAL_QUEUE_FEEDER_THREAD_SLEEP_TIME) - return self._latest_unprocessed - except queue.Empty: - return None - - def commit(self) -> None: - self._latest_unprocessed = None - - def work(self) -> None: - try: - while not self._is_interrupted(): - operation = self.get_next() - if operation is None: - continue - - try: - self._operations_queue.put_nowait(operation) - self.commit() - except queue.Full: - logger.debug( - "Operations queue is full (%d elements), waiting for free space", self._operations_queue.maxsize - ) - self._errors_queue.put( - NeptuneOperationsQueueMaxSizeExceeded(max_size=self._operations_queue.maxsize) - ) - # Sleep before retry - break - except Exception as e: - self._errors_queue.put(e) - self.interrupt() - raise NeptuneSynchronizationStopped() from e - - class SenderThread(Daemon, WithResources): def __init__( self, diff --git a/tests/unit/test_operations_queue.py b/tests/unit/test_operations_queue.py index 76471ee..c3371fa 100644 --- a/tests/unit/test_operations_queue.py +++ b/tests/unit/test_operations_queue.py @@ -19,13 +19,13 @@ def test__enqueue(): operation = RunOperation() # when - queue.enqueue(operation=operation, size=0) + queue.enqueue_run_op(operation=operation, size=0) # then assert queue._sequence_id == 1 # when - queue.enqueue(operation=operation, size=0) + queue.enqueue_run_op(operation=operation, size=0) # then assert queue._sequence_id == 2 @@ -42,4 +42,4 @@ def test__max_element_size_exceeded(): # then with pytest.raises(ValueError): - queue.enqueue(operation=operation, size=snapshot.ByteSize()) + queue.enqueue_run_op(operation=operation, size=snapshot.ByteSize())