Skip to content

Commit

Permalink
Merge pull request #111 from neptune-ai/pl/simplify-test
Browse files Browse the repository at this point in the history
chore: use fixture to limit duplication
  • Loading branch information
pawel-ludwikow authored Jan 8, 2025
2 parents f86216d + fdd5383 commit 0c88bd1
Showing 1 changed file with 41 additions and 93 deletions.
134 changes: 41 additions & 93 deletions tests/unit/test_sync_process.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import queue
import time
from dataclasses import dataclass
from typing import Any
from unittest.mock import Mock

import pytest
Expand Down Expand Up @@ -37,8 +39,18 @@ def single_operation(update: UpdateRunSnapshot, sequence_id):
)


def test_sender_thread_work_finishes_when_queue_empty():
# given
@dataclass
class MockedSender:
operations_queue: Any
status_tracking_queue: Any
errors_queue: Any
last_queue_seq: Any
backend: Any
sender_thread: Any


@pytest.fixture
def sender() -> MockedSender:
operations_queue = Mock()
status_tracking_queue = Mock()
errors_queue = Mock()
Expand All @@ -55,165 +67,101 @@ def test_sender_thread_work_finishes_when_queue_empty():
)
sender_thread._backend = backend

# and
operations_queue.get.side_effect = queue.Empty
return MockedSender(operations_queue, status_tracking_queue, errors_queue, last_queue_seq, backend, sender_thread)


def test_sender_thread_work_finishes_when_queue_empty(sender):
# given
sender.operations_queue.get.side_effect = queue.Empty

# when
sender_thread.work()
sender.sender_thread.work()

# then
assert True


def test_sender_thread_processes_single_element():
def test_sender_thread_processes_single_element(sender):
# given
operations_queue = Mock()
status_tracking_queue = Mock()
errors_queue = Mock()
last_queue_seq = SharedInt(initial_value=0)
backend = Mock()
sender_thread = SenderThread(
api_token="",
family="",
operations_queue=operations_queue,
status_tracking_queue=status_tracking_queue,
errors_queue=errors_queue,
last_queued_seq=last_queue_seq,
mode="disabled",
)
sender_thread._backend = backend

# and
update = UpdateRunSnapshot(assign={"key": Value(string="a")})
element = single_operation(update, sequence_id=2)
operations_queue.get.side_effect = [
sender.operations_queue.get.side_effect = [
BatchedOperations(sequence_id=element.sequence_id, timestamp=element.timestamp, operation=element.operation),
queue.Empty,
]

# and
backend.submit.side_effect = [response(["1"])]
sender.backend.submit.side_effect = [response(["1"])]

# when
sender_thread.work()
sender.sender_thread.work()

# then
assert backend.submit.call_count == 1
assert sender.backend.submit.call_count == 1


def test_sender_thread_processes_element_on_single_retryable_error():
def test_sender_thread_processes_element_on_single_retryable_error(sender):
# given
operations_queue = Mock()
status_tracking_queue = Mock()
errors_queue = Mock()
last_queue_seq = SharedInt(initial_value=0)
backend = Mock()
sender_thread = SenderThread(
api_token="",
family="",
operations_queue=operations_queue,
status_tracking_queue=status_tracking_queue,
errors_queue=errors_queue,
last_queued_seq=last_queue_seq,
mode="disabled",
)
sender_thread._backend = backend

# and
update = UpdateRunSnapshot(assign={"key": Value(string="a")})
element = single_operation(update, sequence_id=2)
operations_queue.get.side_effect = [
sender.operations_queue.get.side_effect = [
BatchedOperations(sequence_id=element.sequence_id, timestamp=element.timestamp, operation=element.operation),
queue.Empty,
]

# and
backend.submit.side_effect = [
sender.backend.submit.side_effect = [
response([], status_code=503),
response(["a"], status_code=200),
]

# when
sender_thread.work()
sender.sender_thread.work()

# then
assert backend.submit.call_count == 2
assert sender.backend.submit.call_count == 2


def test_sender_thread_fails_on_regular_error():
def test_sender_thread_fails_on_regular_error(sender):
# given
operations_queue = Mock()
status_tracking_queue = Mock()
errors_queue = Mock()
last_queue_seq = SharedInt(initial_value=0)
backend = Mock()
sender_thread = SenderThread(
api_token="",
family="",
operations_queue=operations_queue,
status_tracking_queue=status_tracking_queue,
errors_queue=errors_queue,
last_queued_seq=last_queue_seq,
mode="disabled",
)
sender_thread._backend = backend

# and
update = UpdateRunSnapshot(assign={"key": Value(string="a")})
element = single_operation(update, sequence_id=2)
operations_queue.get.side_effect = [
sender.operations_queue.get.side_effect = [
BatchedOperations(sequence_id=element.sequence_id, timestamp=element.timestamp, operation=element.operation),
queue.Empty,
]

# and
backend.submit.side_effect = [
sender.backend.submit.side_effect = [
response([], status_code=200),
]

# when
with pytest.raises(NeptuneSynchronizationStopped):
sender_thread.work()
sender.sender_thread.work()

# then should throw NeptuneInternalServerError
errors_queue.put.assert_called_once()
sender.errors_queue.put.assert_called_once()


def test_sender_thread_processes_element_on_429_and_408_http_statuses():
def test_sender_thread_processes_element_on_429_and_408_http_statuses(sender):
# given
operations_queue = Mock()
status_tracking_queue = Mock()
errors_queue = Mock()
last_queue_seq = SharedInt(initial_value=0)
backend = Mock()
sender_thread = SenderThread(
api_token="",
family="",
operations_queue=operations_queue,
status_tracking_queue=status_tracking_queue,
errors_queue=errors_queue,
last_queued_seq=last_queue_seq,
mode="disabled",
)
sender_thread._backend = backend

# and
update = UpdateRunSnapshot(assign={"key": Value(string="a")})
element = single_operation(update, sequence_id=2)
operations_queue.get.side_effect = [
sender.operations_queue.get.side_effect = [
BatchedOperations(sequence_id=element.sequence_id, timestamp=element.timestamp, operation=element.operation),
queue.Empty,
]

# and
backend.submit.side_effect = [
sender.backend.submit.side_effect = [
response([], status_code=408),
response([], status_code=429),
response(["a"], status_code=200),
]

# when
sender_thread.work()
sender.sender_thread.work()

# then
assert backend.submit.call_count == 3
assert sender.backend.submit.call_count == 3

0 comments on commit 0c88bd1

Please sign in to comment.