Skip to content

Commit

Permalink
Added OperationsQueue component (#7)
Browse files Browse the repository at this point in the history
  • Loading branch information
Raalsky authored Jul 29, 2024
1 parent 6cf6715 commit b85f1c5
Show file tree
Hide file tree
Showing 7 changed files with 223 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added
- Added minimal Run classes ([#6](https://github.com/neptune-ai/neptune-client-scale/pull/6))
- Added support for `max_queue_size` and `max_queue_size_exceeded_callback` parameters in `Run` ([#7](https://github.com/neptune-ai/neptune-client-scale/pull/7))
48 changes: 36 additions & 12 deletions src/neptune_scale/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,15 @@

__all__ = ["Run"]

import threading
from contextlib import AbstractContextManager
from types import TracebackType
from typing import Callable

from neptune_scale.core.components.abstract import (
Resource,
WithResources,
)
from neptune_scale.core.components.operations_queue import OperationsQueue
from neptune_scale.core.validation import (
verify_max_length,
verify_non_empty,
Expand All @@ -17,16 +23,26 @@
)
from neptune_scale.parameters import (
MAX_FAMILY_LENGTH,
MAX_QUEUE_SIZE,
MAX_RUN_ID_LENGTH,
)


class Run(AbstractContextManager):
class Run(WithResources, AbstractContextManager):
"""
Representation of tracked metadata.
"""

def __init__(self, *, project: str, api_token: str, family: str, run_id: str) -> None:
def __init__(
self,
*,
project: str,
api_token: str,
family: str,
run_id: str,
max_queue_size: int = MAX_QUEUE_SIZE,
max_queue_size_exceeded_callback: Callable[[int, BaseException], None] | None = None,
) -> None:
"""
Initializes a run that logs the model-building metadata to Neptune.
Expand All @@ -36,10 +52,17 @@ def __init__(self, *, project: str, api_token: str, family: str, run_id: str) ->
family: Identifies related runs. For example, the same value must apply to all runs within a run hierarchy.
Max length: 128 characters.
run_id: Unique identifier of a run. Must be unique within the project. Max length: 128 characters.
max_queue_size: Maximum number of operations in a queue.
max_queue_size_exceeded_callback: Callback function triggered when a queue is full.
Accepts two arguments:
- Maximum size of the queue.
- Exception that made the queue full.
"""
verify_type("api_token", api_token, str)
verify_type("family", family, str)
verify_type("run_id", run_id, str)
verify_type("max_queue_size", max_queue_size, int)
verify_type("max_queue_size_exceeded_callback", max_queue_size_exceeded_callback, (Callable, type(None)))

verify_non_empty("api_token", api_token)
verify_non_empty("family", family)
Expand All @@ -55,19 +78,20 @@ def __init__(self, *, project: str, api_token: str, family: str, run_id: str) ->
self._family: str = family
self._run_id: str = run_id

self._lock = threading.RLock()
self._operations_queue: OperationsQueue = OperationsQueue(
lock=self._lock, max_size=max_queue_size, max_size_exceeded_callback=max_queue_size_exceeded_callback
)

def __enter__(self) -> Run:
return self

@property
def resources(self) -> tuple[Resource, ...]:
return (self._operations_queue,)

def close(self) -> None:
"""
Stops the connection to Neptune and synchronizes all data.
"""
pass

def __exit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
self.close()
super().close()
Empty file.
52 changes: 52 additions & 0 deletions src/neptune_scale/core/components/abstract.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from __future__ import annotations

from abc import (
ABC,
abstractmethod,
)
from types import TracebackType


class AutoCloseable(ABC):
def __enter__(self) -> AutoCloseable:
return self

@abstractmethod
def close(self) -> None: ...

def __exit__(
self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
traceback: TracebackType | None,
) -> None:
self.close()


class Resource(AutoCloseable):
@abstractmethod
def cleanup(self) -> None: ...

def flush(self) -> None:
pass

def close(self) -> None:
self.flush()


class WithResources(Resource):
@property
@abstractmethod
def resources(self) -> tuple[Resource, ...]: ...

def flush(self) -> None:
for resource in self.resources:
resource.flush()

def close(self) -> None:
for resource in self.resources:
resource.close()

def cleanup(self) -> None:
for resource in self.resources:
resource.cleanup()
69 changes: 69 additions & 0 deletions src/neptune_scale/core/components/operations_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from __future__ import annotations

__all__ = ("OperationsQueue",)

from multiprocessing import Queue
from time import monotonic
from typing import (
TYPE_CHECKING,
Callable,
NamedTuple,
)

from neptune_scale.core.components.abstract import Resource
from neptune_scale.core.validation import verify_type
from neptune_scale.parameters import MAX_QUEUE_ELEMENT_SIZE

if TYPE_CHECKING:
from threading import RLock

from neptune_api.proto.neptune_pb.ingest.v1.pub.ingest_pb2 import RunOperation


class QueueElement(NamedTuple):
sequence_id: int
occured_at: float
operation: bytes


def default_max_size_exceeded_callback(max_size: int, e: BaseException) -> None:
raise ValueError(f"Queue is full (max size: {max_size})") from e


class OperationsQueue(Resource):
def __init__(
self,
*,
lock: RLock,
max_size: int = 0,
max_size_exceeded_callback: Callable[[int, BaseException], None] | None = None,
) -> None:
verify_type("max_size", max_size, int)

self._lock: RLock = lock
self._max_size: int = max_size
self._max_size_exceeded_callback: Callable[[int, BaseException], None] = (
max_size_exceeded_callback if max_size_exceeded_callback is not None else default_max_size_exceeded_callback
)

self._sequence_id: int = 0
self._queue: Queue[QueueElement] = Queue(maxsize=max_size)

def enqueue(self, *, operation: RunOperation) -> None:
try:
with self._lock:
serialized_operation = operation.SerializeToString()

if len(serialized_operation) > MAX_QUEUE_ELEMENT_SIZE:
raise ValueError(f"Operation size exceeds the maximum allowed size ({MAX_QUEUE_ELEMENT_SIZE})")

self._queue.put_nowait(QueueElement(self._sequence_id, monotonic(), serialized_operation))
self._sequence_id += 1
except Exception as e:
self._max_size_exceeded_callback(self._max_size, e)

def cleanup(self) -> None:
pass

def close(self) -> None:
self._queue.close()
2 changes: 2 additions & 0 deletions src/neptune_scale/parameters.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
MAX_RUN_ID_LENGTH = 128
MAX_FAMILY_LENGTH = 128
MAX_QUEUE_SIZE = 32767
MAX_QUEUE_ELEMENT_SIZE = 1024 * 1024 # 1MB
63 changes: 63 additions & 0 deletions tests/unit/test_operations_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import threading
from unittest.mock import MagicMock

import pytest
from neptune_api.proto.neptune_pb.ingest.v1.common_pb2 import (
UpdateRunSnapshot,
Value,
)
from neptune_api.proto.neptune_pb.ingest.v1.pub.ingest_pb2 import RunOperation

from neptune_scale.core.components.operations_queue import OperationsQueue


def test__enqueue():
# given
lock = threading.RLock()
queue = OperationsQueue(lock=lock, max_size=0)

# and
operation = RunOperation()

# when
queue.enqueue(operation=operation)

# then
assert queue._sequence_id == 1

# when
queue.enqueue(operation=operation)

# then
assert queue._sequence_id == 2


def test__max_queue_size_exceeded():
# given
lock = threading.RLock()
callback = MagicMock()
queue = OperationsQueue(lock=lock, max_size=1, max_size_exceeded_callback=callback)

# and
operation = RunOperation()

# when
queue.enqueue(operation=operation)
queue.enqueue(operation=operation)

# then
callback.assert_called_once()


def test__max_element_size_exceeded():
# given
lock = threading.RLock()
queue = OperationsQueue(lock=lock, max_size=1)

# and
snapshot = UpdateRunSnapshot(assign={f"key_{i}": Value(string=("a" * 1024)) for i in range(1024)})
operation = RunOperation(update=snapshot)

# then
with pytest.raises(ValueError):
queue.enqueue(operation=operation)

0 comments on commit b85f1c5

Please sign in to comment.