Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
kgodlewski committed Dec 9, 2024
1 parent 022ce35 commit 4909b26
Show file tree
Hide file tree
Showing 8 changed files with 259 additions and 72 deletions.
2 changes: 1 addition & 1 deletion src/neptune_scale/api/attribute.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def log(
)

for operation, metadata_size in splitter:
self._operations_queue.enqueue(operation=operation, size=metadata_size, step=step)
self._operations_queue.enqueue_run_op(operation=operation, size=metadata_size, step=step)

def _verify_and_update_metrics_state(self, step: Optional[float], metrics: Optional[Dict[str, float]]) -> None:
"""Check if step in provided metrics is increasing, raise `NeptuneSeriesStepNonIncreasing` if not."""
Expand Down
14 changes: 13 additions & 1 deletion src/neptune_scale/api/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,9 @@ def __init__(
self._last_ack_timestamp = SharedFloat(-1)

self._process_link = ProcessLink()

self._files_in_progress_count = SharedInt(0)

self._sync_process = SyncProcess(
project=self._project,
family=self._run_id,
Expand All @@ -235,6 +238,7 @@ def __init__(
last_queued_seq=self._last_queued_seq,
last_ack_seq=self._last_ack_seq,
last_ack_timestamp=self._last_ack_timestamp,
files_in_progress_counter=self._files_in_progress_count,
max_queue_size=max_queue_size,
mode=mode,
)
Expand Down Expand Up @@ -392,7 +396,7 @@ def _create_run(
creation_time=None if creation_time is None else datetime_to_proto(creation_time),
),
)
self._operations_queue.enqueue(operation=operation)
self._operations_queue.enqueue_run_op(operation=operation)

def __getitem__(self, key: str) -> Attribute:
return self._attr_store[key]
Expand Down Expand Up @@ -556,6 +560,14 @@ def log(
step=step, timestamp=timestamp, configs=configs, metrics=metrics, tags_add=tags_add, tags_remove=tags_remove
)

def log_file(
self,
attribute_path: str,
local_path: str,
target_base_name: Optional[str] = None,
target_path: Optional[str] = None,
) -> None: ...

def _wait(
self,
phrase: str,
Expand Down
104 changes: 104 additions & 0 deletions src/neptune_scale/sync/dispatcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
from __future__ import annotations

import multiprocessing
import queue
from typing import Optional

from neptune_api.proto.neptune_pb.ingest.v1.pub.ingest_pb2 import RunOperation

from neptune_scale.exceptions import (
NeptuneOperationsQueueMaxSizeExceeded,
NeptuneSynchronizationStopped,
)
from neptune_scale.sync.aggregating_queue import AggregatingQueue
from neptune_scale.sync.errors_tracking import ErrorsQueue
from neptune_scale.sync.file_upload import FileUploader
from neptune_scale.sync.parameters import INTERNAL_QUEUE_FEEDER_THREAD_SLEEP_TIME
from neptune_scale.sync.queue_element import (
OperationMessage,
OperationType,
)
from neptune_scale.util import (
Daemon,
logger,
)
from neptune_scale.util.abstract import Resource


class OperationDispatcherThread(Daemon, Resource):
"""Retrieves messages from the operations queue that is fed by the main process. Dispatches messages based on
their type:
* SingleOperation: common logging operations - push to the aggregating queue
* UploadFileOperation: push to file upload queue
"""

def __init__(
self,
input_queue: multiprocessing.Queue[OperationMessage],
operations_queue: AggregatingQueue,
errors_queue: ErrorsQueue,
file_uploader: FileUploader,
) -> None:
super().__init__(name="OperationDispatcherThread", sleep_time=INTERNAL_QUEUE_FEEDER_THREAD_SLEEP_TIME)

self._input_queue: multiprocessing.Queue[OperationMessage] = input_queue
self._operations_queue: AggregatingQueue = operations_queue
self._errors_queue: ErrorsQueue = errors_queue
self._file_uploader: FileUploader = file_uploader

self._latest_unprocessed: Optional[OperationMessage] = None

def get_next(self) -> Optional[OperationMessage]:
if self._latest_unprocessed is not None:
return self._latest_unprocessed

try:
self._latest_unprocessed = self._input_queue.get(timeout=INTERNAL_QUEUE_FEEDER_THREAD_SLEEP_TIME)
return self._latest_unprocessed
except queue.Empty:
return None

def commit(self) -> None:
self._latest_unprocessed = None

def work(self) -> None:
try:
while not self._is_interrupted():
message = self.get_next()
if message is None:
continue

if not self.dispatch(message):
break
except Exception as e:
self._errors_queue.put(e)
self.interrupt()
raise NeptuneSynchronizationStopped() from e

def dispatch(self, message: OperationMessage) -> bool:
op = message.operation
try:
if message.type == OperationType.SINGLE_OPERATION:
self._operations_queue.put_nowait(op)
elif message.type == OperationType.UPLOAD_FILE:
self._file_uploader.start_upload(
self._finalize_file_upload, op.local_path, op.target_path, op.target_basename
)

self.commit()
return True
except queue.Full:
logger.debug(
"Operations queue is full (%d elements), waiting for free space", self._operations_queue.maxsize
)
self._errors_queue.put(NeptuneOperationsQueueMaxSizeExceeded(max_size=self._operations_queue.maxsize))
return False

def _finalize_file_upload(self, path: str, error: Optional[Exception]) -> None:
if error:
self._errors_queue.put(error)
return

op = RunOperation()
# TODO: Fill it out once we have established the protocol with the backend
self._operations_queue.put_nowait(op)
77 changes: 77 additions & 0 deletions src/neptune_scale/sync/file_upload.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
from concurrent import futures
from pathlib import Path
from typing import (
Optional,
Protocol,
)

from neptune_scale.sync.errors_tracking import ErrorsQueue
from neptune_scale.util import SharedInt


class Finalizer(Protocol):
def __call__(self, path: str, error: Optional[Exception] = None) -> None: ...


class FileUploader:
def __init__(
self, project: str, api_token: str, family: str, in_progress_counter: SharedInt, errors_queue: ErrorsQueue
) -> None:
self._project = project
self._api_token = api_token
self._family = family
self._errors_queue = errors_queue
self._in_progress_counter = in_progress_counter
self._executor = futures.ThreadPoolExecutor()

def start_upload(
self, finalizer: Finalizer, local_path: Path, target_path: Optional[str], target_basename: Optional[str]
) -> None:
with self._in_progress_counter:
self._in_progress_counter.value += 1

self._executor.submit(self._do_upload, finalizer, local_path, target_path, target_basename)

def _do_upload(
self, finalizer: Finalizer, local_path: Path, target_path: Optional[str], target_basename: Optional[str]
) -> None:
path = determine_path(local_path, target_path, target_basename)

try:
url = self._request_upload_url(path)
upload_file(local_path, url)
error = None
except Exception as e:
error = e

finalizer(path, error=error)

with self._in_progress_counter:
self._in_progress_counter.value -= 1
assert self._in_progress_counter.value >= 0

self._in_progress_counter.notify_all()

def _request_upload_url(self, path) -> str:
assert self._api_token
# TODO: temporary
return "http://localhost:8012/" + path

def wait_for_completion(self) -> None:
self._executor.shutdown()
with self._in_progress_counter:
assert self._in_progress_counter.value >= 0


def determine_path(local_path: Path, target_path: Optional[str], target_basename: Optional[str]) -> str:
if target_path:
return target_path

# TODO: figure out the path
return str(Path("DUMMY_PATH") / local_path)


def upload_file(local_path: Path, url: str) -> None:
# TODO: do the actual work :)
assert local_path and url
pass
40 changes: 29 additions & 11 deletions src/neptune_scale/sync/operations_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
MAX_QUEUE_ELEMENT_SIZE,
MAX_QUEUE_SIZE,
)
from neptune_scale.sync.queue_element import SingleOperation
from neptune_scale.sync.queue_element import (
OperationMessage,
OperationType,
SingleOperation,
)
from neptune_scale.util import get_logger
from neptune_scale.util.abstract import Resource

Expand All @@ -41,10 +45,10 @@ def __init__(

self._sequence_id: int = 0
self._last_timestamp: Optional[float] = None
self._queue: Queue[SingleOperation] = Queue(maxsize=min(MAX_MULTIPROCESSING_QUEUE_SIZE, max_size))
self._queue: Queue[OperationMessage] = Queue(maxsize=min(MAX_MULTIPROCESSING_QUEUE_SIZE, max_size))

@property
def queue(self) -> Queue[SingleOperation]:
def queue(self) -> Queue[OperationMessage]:
return self._queue

@property
Expand All @@ -57,7 +61,16 @@ def last_timestamp(self) -> Optional[float]:
with self._lock:
return self._last_timestamp

def enqueue(self, *, operation: RunOperation, size: Optional[int] = None, step: Optional[float] = None) -> None:
def enqueue(self, message: OperationMessage) -> int:
with self._lock:
self._queue.put(message, block=True, timeout=None)
self._sequence_id += 1

return self._sequence_id - 1

def enqueue_run_op(
self, *, operation: RunOperation, size: Optional[int] = None, step: Optional[float] = None
) -> int:
try:
is_metadata_update = operation.HasField("update")
serialized_operation = operation.SerializeToString()
Expand All @@ -69,18 +82,23 @@ def enqueue(self, *, operation: RunOperation, size: Optional[int] = None, step:
self._last_timestamp = monotonic()
# TODO: should we not block here, and just call the error callback if we were to block?
self._queue.put(
SingleOperation(
sequence_id=self._sequence_id,
timestamp=self._last_timestamp,
operation=serialized_operation,
metadata_size=size,
is_batchable=is_metadata_update,
step=step,
OperationMessage(
OperationType.SINGLE_OPERATION,
SingleOperation(
sequence_id=self._sequence_id,
timestamp=self._last_timestamp,
operation=serialized_operation,
metadata_size=size,
is_batchable=is_metadata_update,
step=step,
),
),
block=True,
timeout=None,
)
self._sequence_id += 1

return self._sequence_id - 1
except Exception as e:
logger.error("Failed to enqueue operation: %s %s", e, operation)
raise e
Expand Down
24 changes: 23 additions & 1 deletion src/neptune_scale/sync/queue_element.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
__all__ = ("BatchedOperations", "SingleOperation")
__all__ = ("BatchedOperations", "SingleOperation", "UploadFile", "OperationType", "OperationMessage")

import pathlib
from enum import (
Enum,
auto,
)
from typing import (
NamedTuple,
Optional,
Union,
)


Expand All @@ -28,3 +34,19 @@ class SingleOperation(NamedTuple):
metadata_size: Optional[int]
# Step provided by the user
step: Optional[float]


class UploadFile(NamedTuple):
local_path: pathlib.Path
target_path: Optional[str]
target_basename: Optional[str]


class OperationType(Enum):
UPLOAD_FILE = auto()
SINGLE_OPERATION = auto()


class OperationMessage(NamedTuple):
type: OperationType
operation: Union[SingleOperation, UploadFile]
Loading

0 comments on commit 4909b26

Please sign in to comment.