Skip to content

Commit

Permalink
Minimal implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
Raalsky committed Jul 29, 2024
1 parent 6b502b2 commit 41f7f01
Show file tree
Hide file tree
Showing 6 changed files with 176 additions and 12 deletions.
46 changes: 34 additions & 12 deletions src/neptune_scale/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,15 @@

__all__ = ["Run"]

import threading
from contextlib import AbstractContextManager
from types import TracebackType
from typing import Callable

from neptune_scale.core.components.abstract import (
Resource,
WithResources,
)
from neptune_scale.core.components.operations_queue import OperationsQueue
from neptune_scale.core.validation import (
verify_max_length,
verify_non_empty,
Expand All @@ -17,16 +23,26 @@
)
from neptune_scale.parameters import (
MAX_FAMILY_LENGTH,
MAX_QUEUE_SIZE,
MAX_RUN_ID_LENGTH,
)


class Run(AbstractContextManager):
class Run(WithResources, AbstractContextManager):
"""
Representation of tracked metadata.
"""

def __init__(self, *, project: str, api_token: str, family: str, run_id: str) -> None:
def __init__(
self,
*,
project: str,
api_token: str,
family: str,
run_id: str,
queue_max_size: int = MAX_QUEUE_SIZE,
queue_max_size_exceeded_callback: Callable[[int, BaseException], None] | None = None,
) -> None:
"""
Initializes a run that logs the model-building metadata to Neptune.
Expand All @@ -36,10 +52,15 @@ def __init__(self, *, project: str, api_token: str, family: str, run_id: str) ->
family: Identifies related runs. For example, the same value must apply to all runs within a run hierarchy.
Max length: 128 characters.
run_id: Unique identifier of a run. Must be unique within the project. Max length: 128 characters.
queue_max_size: Maximum number of operations that can be queued before the queue is considered full.
queue_max_size_exceeded_callback: Callback function called when the queue is full. Accepts two arguments:
the maximum size of the queue and the exception that caused the queue to be full.
"""
verify_type("api_token", api_token, str)
verify_type("family", family, str)
verify_type("run_id", run_id, str)
verify_type("queue_max_size", queue_max_size, int)
verify_type("queue_max_size_exceeded_callback", queue_max_size_exceeded_callback, (Callable, type(None)))

verify_non_empty("api_token", api_token)
verify_non_empty("family", family)
Expand All @@ -55,19 +76,20 @@ def __init__(self, *, project: str, api_token: str, family: str, run_id: str) ->
self._family: str = family
self._run_id: str = run_id

self._lock = threading.RLock()
self._operations_queue: OperationsQueue = OperationsQueue(
lock=self._lock, max_size=queue_max_size, max_size_exceeded_callback=queue_max_size_exceeded_callback
)

def __enter__(self) -> Run:
return self

@property
def resources(self) -> tuple[Resource, ...]:
return (self._operations_queue,)

def close(self) -> None:
"""
Stops the connection to Neptune and synchronizes all data.
"""
pass

def __exit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
self.close()
super().close()
Empty file.
52 changes: 52 additions & 0 deletions src/neptune_scale/core/components/abstract.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from __future__ import annotations

from abc import (
ABC,
abstractmethod,
)
from types import TracebackType


class AutoCloseable(ABC):
def __enter__(self) -> AutoCloseable:
return self

@abstractmethod
def close(self) -> None: ...

def __exit__(
self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
traceback: TracebackType | None,
) -> None:
self.close()


class Resource(AutoCloseable):
@abstractmethod
def cleanup(self) -> None: ...

def flush(self) -> None:
pass

def close(self) -> None:
self.flush()


class WithResources(Resource):
@property
@abstractmethod
def resources(self) -> tuple[Resource, ...]: ...

def flush(self) -> None:
for resource in self.resources:
resource.flush()

def close(self) -> None:
for resource in self.resources:
resource.close()

def cleanup(self) -> None:
for resource in self.resources:
resource.cleanup()
63 changes: 63 additions & 0 deletions src/neptune_scale/core/components/operations_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from __future__ import annotations

__all__ = ("OperationsQueue",)

from multiprocessing import Queue
from time import monotonic
from typing import (
TYPE_CHECKING,
Callable,
NamedTuple,
)

from neptune_scale.core.components.abstract import Resource
from neptune_scale.core.validation import verify_type

if TYPE_CHECKING:
from threading import RLock

from neptune_api.proto.neptune_pb.ingest.v1.pub.ingest_pb2 import RunOperation


class QueueElement(NamedTuple):
sequence_id: int
occured_at: float
operation: bytes


def default_max_size_exceeded_callback(max_size: int, e: BaseException) -> None:
raise ValueError(f"Queue is full (max size: {max_size})") from e


class OperationsQueue(Resource):
def __init__(
self,
*,
lock: RLock,
max_size: int = 0,
max_size_exceeded_callback: Callable[[int, BaseException], None] | None = None,
) -> None:
verify_type("max_size", max_size, int)

self._lock: RLock = lock
self._max_size: int = max_size
self._max_size_exceeded_callback: Callable[[int, BaseException], None] = (
max_size_exceeded_callback if max_size_exceeded_callback is not None else default_max_size_exceeded_callback
)

self._sequence_id: int = 0
self._queue: Queue[QueueElement] = Queue(maxsize=max_size)

def enqueue(self, *, operation: RunOperation) -> None:
try:
with self._lock:
self._queue.put_nowait(QueueElement(self._sequence_id, monotonic(), operation.SerializeToString()))
self._sequence_id += 1
except Exception as e:
self._max_size_exceeded_callback(self._max_size, e)

def cleanup(self) -> None:
pass

def close(self) -> None:
self._queue.close()
1 change: 1 addition & 0 deletions src/neptune_scale/parameters.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
MAX_RUN_ID_LENGTH = 128
MAX_FAMILY_LENGTH = 128
MAX_QUEUE_SIZE = 32767
26 changes: 26 additions & 0 deletions tests/unit/test_operations_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import threading

from neptune_api.proto.neptune_pb.ingest.v1.pub.ingest_pb2 import RunOperation

from neptune_scale.core.components.operations_queue import OperationsQueue


def test_operations_queue():
# given
lock = threading.RLock()
queue = OperationsQueue(lock=lock, max_size=0)

# and
operation = RunOperation()

# when
queue.enqueue(operation=operation)

# then
assert queue._sequence_id == 1

# when
queue.enqueue(operation=operation)

# then
assert queue._sequence_id == 2

0 comments on commit 41f7f01

Please sign in to comment.