Skip to content

Commit

Permalink
More tests and element size
Browse files Browse the repository at this point in the history
  • Loading branch information
Raalsky committed Jul 29, 2024
1 parent e0e6ee0 commit f714a0e
Showing 1 changed file with 37 additions and 1 deletion.
38 changes: 37 additions & 1 deletion tests/unit/test_operations_queue.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
import threading

import pytest
from neptune_api.proto.neptune_pb.ingest.v1.common_pb2 import (
UpdateRunSnapshot,
Value,
)
from neptune_api.proto.neptune_pb.ingest.v1.pub.ingest_pb2 import RunOperation

from neptune_scale.core.components.operations_queue import OperationsQueue


def test_operations_queue():
def test__enqueue():
# given
lock = threading.RLock()
queue = OperationsQueue(lock=lock, max_size=0)
Expand All @@ -24,3 +29,34 @@ def test_operations_queue():

# then
assert queue._sequence_id == 2


def test__max_queue_size_exceeded():
# given
lock = threading.RLock()
callback = lambda _, __: 4 / 0
queue = OperationsQueue(lock=lock, max_size=1, max_size_exceeded_callback=callback)

# and
operation = RunOperation()

# when
queue.enqueue(operation=operation)

# then
with pytest.raises(ValueError):
queue.enqueue(operation=operation)


def test__max_element_size_exceeded():
# given
lock = threading.RLock()
queue = OperationsQueue(lock=lock, max_size=1)

# and
snapshot = UpdateRunSnapshot(assign={f"key_{i}": Value(string=("a" * 1024)) for i in range(1024)})
operation = RunOperation(update=snapshot)

# then
with pytest.raises(ValueError):
queue.enqueue(operation=operation)

0 comments on commit f714a0e

Please sign in to comment.