From 6c03001374dad7e7c3d18f86f29761c394e36314 Mon Sep 17 00:00:00 2001 From: Patryk Gala Date: Tue, 7 Jan 2025 16:16:14 +0100 Subject: [PATCH 01/21] chore: Do not invoke error callback on 408 and 429 http statuses. Use: - 408 -> network_error_callback - 429 -> warning_callback --- src/neptune_scale/exceptions.py | 10 ++++++ src/neptune_scale/sync/errors_tracking.py | 3 ++ src/neptune_scale/sync/sync_process.py | 9 +++++ tests/unit/test_errors_monitor.py | 2 ++ tests/unit/test_sync_process.py | 40 +++++++++++++++++++++++ 5 files changed, 64 insertions(+) diff --git a/src/neptune_scale/exceptions.py b/src/neptune_scale/exceptions.py index f1e6aa7e..f2d68da4 100644 --- a/src/neptune_scale/exceptions.py +++ b/src/neptune_scale/exceptions.py @@ -36,6 +36,7 @@ "NeptuneAsyncLagThresholdExceeded", "NeptuneProjectNotProvided", "NeptuneApiTokenNotProvided", + "NeptuneTooManyRequestsResponseError", ) from typing import Any @@ -191,6 +192,15 @@ class NeptuneUnexpectedResponseError(NeptuneRetryableError): """ +class NeptuneTooManyRequestsResponseError(NeptuneRetryableError): + message = """ +{h1} +NeptuneTooManyRequestsResponseError: The Neptune server returned 429 response. +{end} +This is a temporary problem. If the problem persists, please contact us at support@neptune.ai. +""" + + class NeptuneInternalServerError(NeptuneRetryableError): message = """ {h1} diff --git a/src/neptune_scale/sync/errors_tracking.py b/src/neptune_scale/sync/errors_tracking.py index fbf48b88..7b8f211f 100644 --- a/src/neptune_scale/sync/errors_tracking.py +++ b/src/neptune_scale/sync/errors_tracking.py @@ -14,6 +14,7 @@ NeptuneOperationsQueueMaxSizeExceeded, NeptuneScaleError, NeptuneScaleWarning, + NeptuneTooManyRequestsResponseError, NeptuneUnexpectedError, ) from neptune_scale.sync.parameters import ERRORS_MONITOR_THREAD_SLEEP_TIME @@ -109,6 +110,8 @@ def work(self) -> None: self._on_async_lag_callback() elif isinstance(error, NeptuneScaleWarning): self._on_warning_callback(error, last_raised_at) + elif isinstance(error, NeptuneTooManyRequestsResponseError): + self._on_warning_callback(error, last_raised_at) elif isinstance(error, NeptuneScaleError): self._on_error_callback(error, last_raised_at) else: diff --git a/src/neptune_scale/sync/sync_process.py b/src/neptune_scale/sync/sync_process.py index 2a08f916..c63bcfd6 100644 --- a/src/neptune_scale/sync/sync_process.py +++ b/src/neptune_scale/sync/sync_process.py @@ -54,6 +54,7 @@ NeptuneStringSetExceedsSizeLimit, NeptuneStringValueExceedsSizeLimit, NeptuneSynchronizationStopped, + NeptuneTooManyRequestsResponseError, NeptuneUnauthorizedError, NeptuneUnexpectedError, NeptuneUnexpectedResponseError, @@ -410,6 +411,10 @@ def submit(self, *, operation: RunOperation) -> Optional[SubmitResponse]: logger.error("HTTP response error: %s", response.status_code) if response.status_code // 100 == 5: raise NeptuneInternalServerError() + elif response.status_code == 429: + raise NeptuneTooManyRequestsResponseError() + elif response.status_code == 408: + raise NeptuneConnectionLostError() else: raise NeptuneUnexpectedResponseError() @@ -503,6 +508,10 @@ def check_batch(self, *, request_ids: list[str]) -> Optional[BulkRequestStatus]: logger.error("HTTP response error: %s", response.status_code) if response.status_code // 100 == 5: raise NeptuneInternalServerError() + elif response.status_code == 429: + raise NeptuneTooManyRequestsResponseError() + elif response.status_code == 408: + raise NeptuneConnectionLostError() else: raise NeptuneUnexpectedResponseError() diff --git a/tests/unit/test_errors_monitor.py b/tests/unit/test_errors_monitor.py index ba2bc872..a4ae4886 100644 --- a/tests/unit/test_errors_monitor.py +++ b/tests/unit/test_errors_monitor.py @@ -11,6 +11,7 @@ NeptuneScaleError, NeptuneScaleWarning, NeptuneSeriesPointDuplicate, + NeptuneTooManyRequestsResponseError, ) from neptune_scale.sync.errors_tracking import ( ErrorsMonitor, @@ -29,6 +30,7 @@ (NeptuneOperationsQueueMaxSizeExceeded("error5"), "on_queue_full_callback"), (NeptuneConnectionLostError("error6"), "on_network_error_callback"), (NeptuneAsyncLagThresholdExceeded("error7"), "on_async_lag_callback"), + (NeptuneTooManyRequestsResponseError(), "on_warning_callback"), ], ) def test_errors_monitor_callbacks_called(error, callback_name): diff --git a/tests/unit/test_sync_process.py b/tests/unit/test_sync_process.py index c1140b18..063b56d5 100644 --- a/tests/unit/test_sync_process.py +++ b/tests/unit/test_sync_process.py @@ -177,3 +177,43 @@ def test_sender_thread_fails_on_regular_error(): # then should throw NeptuneInternalServerError errors_queue.put.assert_called_once() + + +def test_sender_thread_processes_element_on_429_and_408_http_statuses(): + # given + operations_queue = Mock() + status_tracking_queue = Mock() + errors_queue = Mock() + last_queue_seq = SharedInt(initial_value=0) + backend = Mock() + sender_thread = SenderThread( + api_token="", + family="", + operations_queue=operations_queue, + status_tracking_queue=status_tracking_queue, + errors_queue=errors_queue, + last_queued_seq=last_queue_seq, + mode="disabled", + ) + sender_thread._backend = backend + + # and + update = UpdateRunSnapshot(assign={"key": Value(string="a")}) + element = single_operation(update, sequence_id=2) + operations_queue.get.side_effect = [ + BatchedOperations(sequence_id=element.sequence_id, timestamp=element.timestamp, operation=element.operation), + queue.Empty, + ] + + # and + backend.submit.side_effect = [ + response([], status_code=408), + response([], status_code=429), + response(["a"], status_code=200), + ] + + # when + sender_thread.work() + + # then + assert backend.submit.call_count == 3 From 63d85002da9ec73bea0e4dc13f5d25c30ac47bb4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Ludwik=C3=B3w?= Date: Tue, 7 Jan 2025 16:25:17 +0100 Subject: [PATCH 02/21] chore: use fixture to limit duplication --- tests/unit/test_sync_process.py | 107 +++++++++++--------------------- 1 file changed, 36 insertions(+), 71 deletions(-) diff --git a/tests/unit/test_sync_process.py b/tests/unit/test_sync_process.py index c1140b18..94a83cd3 100644 --- a/tests/unit/test_sync_process.py +++ b/tests/unit/test_sync_process.py @@ -1,5 +1,7 @@ import queue import time +from dataclasses import dataclass +from typing import Any from unittest.mock import Mock import pytest @@ -37,8 +39,18 @@ def single_operation(update: UpdateRunSnapshot, sequence_id): ) -def test_sender_thread_work_finishes_when_queue_empty(): - # given +@dataclass +class MockedSender: + operations_queue: Any + status_tracking_queue: Any + errors_queue: Any + last_queue_seq: Any + backend: Any + sender_thread: Any + + +@pytest.fixture +def sender() -> MockedSender: operations_queue = Mock() status_tracking_queue = Mock() errors_queue = Mock() @@ -55,125 +67,78 @@ def test_sender_thread_work_finishes_when_queue_empty(): ) sender_thread._backend = backend - # and - operations_queue.get.side_effect = queue.Empty + return MockedSender(operations_queue, status_tracking_queue, errors_queue, last_queue_seq, backend, sender_thread) + + +def test_sender_thread_work_finishes_when_queue_empty(sender): + # given + sender.operations_queue.get.side_effect = queue.Empty # when - sender_thread.work() + sender.sender_thread.work() # then assert True -def test_sender_thread_processes_single_element(): +def test_sender_thread_processes_single_element(sender): # given - operations_queue = Mock() - status_tracking_queue = Mock() - errors_queue = Mock() - last_queue_seq = SharedInt(initial_value=0) - backend = Mock() - sender_thread = SenderThread( - api_token="", - family="", - operations_queue=operations_queue, - status_tracking_queue=status_tracking_queue, - errors_queue=errors_queue, - last_queued_seq=last_queue_seq, - mode="disabled", - ) - sender_thread._backend = backend - - # and update = UpdateRunSnapshot(assign={"key": Value(string="a")}) element = single_operation(update, sequence_id=2) - operations_queue.get.side_effect = [ + sender.operations_queue.get.side_effect = [ BatchedOperations(sequence_id=element.sequence_id, timestamp=element.timestamp, operation=element.operation), queue.Empty, ] # and - backend.submit.side_effect = [response(["1"])] + sender.backend.submit.side_effect = [response(["1"])] # when - sender_thread.work() + sender.sender_thread.work() # then - assert backend.submit.call_count == 1 + assert sender.backend.submit.call_count == 1 -def test_sender_thread_processes_element_on_single_retryable_error(): +def test_sender_thread_processes_element_on_single_retryable_error(sender): # given - operations_queue = Mock() - status_tracking_queue = Mock() - errors_queue = Mock() - last_queue_seq = SharedInt(initial_value=0) - backend = Mock() - sender_thread = SenderThread( - api_token="", - family="", - operations_queue=operations_queue, - status_tracking_queue=status_tracking_queue, - errors_queue=errors_queue, - last_queued_seq=last_queue_seq, - mode="disabled", - ) - sender_thread._backend = backend - - # and update = UpdateRunSnapshot(assign={"key": Value(string="a")}) element = single_operation(update, sequence_id=2) - operations_queue.get.side_effect = [ + sender.operations_queue.get.side_effect = [ BatchedOperations(sequence_id=element.sequence_id, timestamp=element.timestamp, operation=element.operation), queue.Empty, ] # and - backend.submit.side_effect = [ + sender.backend.submit.side_effect = [ response([], status_code=503), response(["a"], status_code=200), ] # when - sender_thread.work() + sender.sender_thread.work() # then - assert backend.submit.call_count == 2 + assert sender.backend.submit.call_count == 2 -def test_sender_thread_fails_on_regular_error(): +def test_sender_thread_fails_on_regular_error(sender): # given - operations_queue = Mock() - status_tracking_queue = Mock() - errors_queue = Mock() - last_queue_seq = SharedInt(initial_value=0) - backend = Mock() - sender_thread = SenderThread( - api_token="", - family="", - operations_queue=operations_queue, - status_tracking_queue=status_tracking_queue, - errors_queue=errors_queue, - last_queued_seq=last_queue_seq, - mode="disabled", - ) - sender_thread._backend = backend - - # and update = UpdateRunSnapshot(assign={"key": Value(string="a")}) element = single_operation(update, sequence_id=2) - operations_queue.get.side_effect = [ + sender.operations_queue.get.side_effect = [ BatchedOperations(sequence_id=element.sequence_id, timestamp=element.timestamp, operation=element.operation), queue.Empty, ] # and - backend.submit.side_effect = [ + sender.backend.submit.side_effect = [ response([], status_code=200), ] # when with pytest.raises(NeptuneSynchronizationStopped): - sender_thread.work() + sender.sender_thread.work() # then should throw NeptuneInternalServerError - errors_queue.put.assert_called_once() + sender.errors_queue.put.assert_called_once() From 42bdc3e98e9012f89523ea0ed0bd0e1e6b682171 Mon Sep 17 00:00:00 2001 From: PatrykGala Date: Tue, 7 Jan 2025 16:56:32 +0100 Subject: [PATCH 03/21] Apply suggestions from code review Co-authored-by: Krzysztof Godlewski --- src/neptune_scale/exceptions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/neptune_scale/exceptions.py b/src/neptune_scale/exceptions.py index f2d68da4..61510384 100644 --- a/src/neptune_scale/exceptions.py +++ b/src/neptune_scale/exceptions.py @@ -195,7 +195,7 @@ class NeptuneUnexpectedResponseError(NeptuneRetryableError): class NeptuneTooManyRequestsResponseError(NeptuneRetryableError): message = """ {h1} -NeptuneTooManyRequestsResponseError: The Neptune server returned 429 response. +NeptuneTooManyRequestsResponseError: The Neptune server reported receiving too many requests. {end} This is a temporary problem. If the problem persists, please contact us at support@neptune.ai. """ From 246693cb24d5f9e9001e2a4ab83ca093ab5fe658 Mon Sep 17 00:00:00 2001 From: Patryk Gala Date: Tue, 7 Jan 2025 17:05:06 +0100 Subject: [PATCH 04/21] chore: MR changes --- src/neptune_scale/sync/errors_tracking.py | 3 ++ src/neptune_scale/sync/sync_process.py | 47 ++++++++++------------- 2 files changed, 24 insertions(+), 26 deletions(-) diff --git a/src/neptune_scale/sync/errors_tracking.py b/src/neptune_scale/sync/errors_tracking.py index 7b8f211f..c7f51bb0 100644 --- a/src/neptune_scale/sync/errors_tracking.py +++ b/src/neptune_scale/sync/errors_tracking.py @@ -12,6 +12,7 @@ NeptuneAsyncLagThresholdExceeded, NeptuneConnectionLostError, NeptuneOperationsQueueMaxSizeExceeded, + NeptuneRetryableError, NeptuneScaleError, NeptuneScaleWarning, NeptuneTooManyRequestsResponseError, @@ -112,6 +113,8 @@ def work(self) -> None: self._on_warning_callback(error, last_raised_at) elif isinstance(error, NeptuneTooManyRequestsResponseError): self._on_warning_callback(error, last_raised_at) + elif isinstance(error, NeptuneRetryableError): + self._on_warning_callback(error, last_raised_at) elif isinstance(error, NeptuneScaleError): self._on_error_callback(error, last_raised_at) else: diff --git a/src/neptune_scale/sync/sync_process.py b/src/neptune_scale/sync/sync_process.py index c63bcfd6..4329a2aa 100644 --- a/src/neptune_scale/sync/sync_process.py +++ b/src/neptune_scale/sync/sync_process.py @@ -404,19 +404,9 @@ def submit(self, *, operation: RunOperation) -> Optional[SubmitResponse]: response = self._backend.submit(operation=operation, family=self._family) - if response.status_code == 403: - raise NeptuneUnauthorizedError() - - if response.status_code != 200: - logger.error("HTTP response error: %s", response.status_code) - if response.status_code // 100 == 5: - raise NeptuneInternalServerError() - elif response.status_code == 429: - raise NeptuneTooManyRequestsResponseError() - elif response.status_code == 408: - raise NeptuneConnectionLostError() - else: - raise NeptuneUnexpectedResponseError() + status_code = response.status_code + if status_code != 200: + _raise_exception(status_code) return response.parsed @@ -458,6 +448,20 @@ def work(self) -> None: raise NeptuneSynchronizationStopped() from e +def _raise_exception(status_code: int) -> None: + logger.error("HTTP response error: %s", status_code) + if status_code == 403: + raise NeptuneUnauthorizedError() + elif status_code == 408: + raise NeptuneConnectionLostError() + elif status_code == 429: + raise NeptuneTooManyRequestsResponseError() + elif status_code // 100 == 5: + raise NeptuneInternalServerError() + else: + raise NeptuneUnexpectedResponseError() + + class StatusTrackingThread(Daemon, WithResources): def __init__( self, @@ -501,19 +505,10 @@ def check_batch(self, *, request_ids: list[str]) -> Optional[BulkRequestStatus]: response = self._backend.check_batch(request_ids=request_ids, project=self._project) - if response.status_code == 403: - raise NeptuneUnauthorizedError() - - if response.status_code != 200: - logger.error("HTTP response error: %s", response.status_code) - if response.status_code // 100 == 5: - raise NeptuneInternalServerError() - elif response.status_code == 429: - raise NeptuneTooManyRequestsResponseError() - elif response.status_code == 408: - raise NeptuneConnectionLostError() - else: - raise NeptuneUnexpectedResponseError() + status_code = response.status_code + + if status_code != 200: + _raise_exception(status_code) return response.parsed From b89b8ca21e5a25919436d501452afd97e108b16e Mon Sep 17 00:00:00 2001 From: Patryk Gala Date: Tue, 7 Jan 2025 17:10:49 +0100 Subject: [PATCH 05/21] chore: MR changes --- tests/unit/test_errors_monitor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/test_errors_monitor.py b/tests/unit/test_errors_monitor.py index a4ae4886..fe11b78e 100644 --- a/tests/unit/test_errors_monitor.py +++ b/tests/unit/test_errors_monitor.py @@ -23,7 +23,7 @@ ["error", "callback_name"], [ (NeptuneScaleError("error1"), "on_error_callback"), - (NeptuneRetryableError("error1"), "on_error_callback"), + (NeptuneRetryableError("error1"), "on_warning_callback"), (ValueError("error2"), "on_error_callback"), (NeptuneScaleWarning("error3"), "on_warning_callback"), (NeptuneSeriesPointDuplicate("error4"), "on_warning_callback"), From 539d08589e7ea511e17b5ace291d28a0841088d6 Mon Sep 17 00:00:00 2001 From: PatrykGala Date: Tue, 7 Jan 2025 17:20:29 +0100 Subject: [PATCH 06/21] Update neptune-api=^0.10.0 --- dev_requirements.txt | 2 +- pyproject.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dev_requirements.txt b/dev_requirements.txt index e6cb5378..0670678b 100644 --- a/dev_requirements.txt +++ b/dev_requirements.txt @@ -7,4 +7,4 @@ pytest pytest-timeout pytest-xdist freezegun -neptune-fetcher +neptune-fetcher @ git+https://github.com/neptune-ai/neptune-fetcher.git@7912dfbbffcc676079870733572553116e569efa diff --git a/pyproject.toml b/pyproject.toml index f2953e94..5d6bb78e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,7 +11,7 @@ pattern = "default-unprefixed" [tool.poetry.dependencies] python = "^3.9" -neptune-api = "^0.9.0" +neptune-api = "^0.10.0" more-itertools = "^10.0.0" psutil = "^5.0.0" backoff = "^2.0.0" From 5a9a90e5622ff56d1cbf42fbcf383501ca4b5a03 Mon Sep 17 00:00:00 2001 From: Siddhant Sadangi Date: Tue, 7 Jan 2025 18:14:58 +0100 Subject: [PATCH 07/21] Update CHANGELOG.md --- CHANGELOG.md | 34 ++++++++++++++++++++-------------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 556c1cf8..208b7ba4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,7 +7,22 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## [0.8.0] - 2024-11-26 +## 0.9.0 - 2025-01-07 + +### Changes +* Removed support for Python 3.8 (https://github.com/neptune-ai/neptune-client-scale/pull/105) + +### Added +* Added `projects.list_projects()` method to list projects accessible to the current user (https://github.com/neptune-ai/neptune-client-scale/pull/97) + +### Fixed +* Fixed retry behavior on encountering a `NeptuneRetryableError` (https://github.com/neptune-ai/neptune-client-scale/pull/99) +* Fixed batching of metrics when logged with steps out of order (https://github.com/neptune-ai/neptune-client-scale/pull/91) + +### Chores +* Not invoking `on_error_callback` on encountering 408 and 429 HTTP statuses (https://github.com/neptune-ai/neptune-client-scale/pull/110) + +## 0.8.0 - 2024-11-26 ### Added - Added function `neptune_scale.projects.create_project()` to programmatically create Neptune projects ([#92](https://github.com/neptune-ai/neptune-client-scale/pull/92)) @@ -24,7 +39,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed - Fixed batching of steps ([#82](https://github.com/neptune-ai/neptune-client-scale/pull/82)) -## [0.7.2] - 2024-11-07 +## 0.7.2 - 2024-11-07 ### Added @@ -32,63 +47,54 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Tuple support for tags ([#67](https://github.com/neptune-ai/neptune-client-scale/pull/67)) ### Changed - - Performance improvements - Change the logger's configuration to be more resilient ([#66](https://github.com/neptune-ai/neptune-client-scale/pull/66)) - Update docs: info about timestamp and timezones ([#69](https://github.com/neptune-ai/neptune-client-scale/pull/69)) - Strip quotes from the `NEPTUNE_PROJECT` env variable ([#51](https://github.com/neptune-ai/neptune-client-scale/pull/51)) -## [0.7.1] - 2024-10-28 +## 0.7.1 - 2024-10-28 ### Changed - Removed `family` from run initialization parameters ([#62](https://github.com/neptune-ai/neptune-client-scale/pull/62)) - Made `timestamp` keyword-only in `log_metrics()` ([#58](https://github.com/neptune-ai/neptune-client-scale/pull/58)) -## [0.6.3] - 2024-10-23 +## 0.6.3 - 2024-10-23 ### Changed - - Changed the signature of `Run.log_metrics`: - `date` is now the first parameter in line with other logging methods ([#58](https://github.com/neptune-ai/neptune-client-scale/pull/58)) - `step` and `data` are now mandatory ([#55](https://github.com/neptune-ai/neptune-client-scale/pull/55)) - - Removed iterables from `log_config` value type hints ([#53](https://github.com/neptune-ai/neptune-client-scale/pull/53)) -## [0.6.0] - 2024-09-09 +## 0.6.0 - 2024-09-09 ### Added - - Dedicated exceptions for missing project or API token ([#44](https://github.com/neptune-ai/neptune-client-scale/pull/44)) ### Changed - - Removed `timestamp` parameter from `add_tags()`, `remove_tags()` and `log_configs()` methods ([#37](https://github.com/neptune-ai/neptune-client-scale/pull/37)) - Performance improvements of metadata logging ([#42](https://github.com/neptune-ai/neptune-client-scale/pull/42)) ## [0.5.0] - 2024-09-05 ### Added - - Added docstrings to logging methods ([#40](https://github.com/neptune-ai/neptune-client-scale/pull/40)) ## [0.4.0] - 2024-09-03 ### Added - - Added support for integer values when logging metric values ([#33](https://github.com/neptune-ai/neptune-client-scale/pull/33)) - Added support for async lag threshold ([#22](https://github.com/neptune-ai/neptune-client-scale/pull/22)) ## [0.3.0] - 2024-09-03 ### Added - - Package renamed to `neptune-scale` ([#31](https://github.com/neptune-ai/neptune-client-scale/pull/31)) ## [0.2.0] - 2024-09-02 ### Added - - Added minimal Run classes ([#6](https://github.com/neptune-ai/neptune-client-scale/pull/6)) - Added support for `max_queue_size` and `max_queue_size_exceeded_callback` parameters in `Run` ([#7](https://github.com/neptune-ai/neptune-client-scale/pull/7)) - Added support for logging metadata ([#8](https://github.com/neptune-ai/neptune-client-scale/pull/8)) From fdd53832e767abb126517570f95d82fceb6410cd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Ludwik=C3=B3w?= Date: Tue, 7 Jan 2025 18:32:19 +0100 Subject: [PATCH 08/21] chore: use fixture to limit duplication in a new test --- tests/unit/test_sync_process.py | 27 +++++---------------------- 1 file changed, 5 insertions(+), 22 deletions(-) diff --git a/tests/unit/test_sync_process.py b/tests/unit/test_sync_process.py index dc630900..1709510a 100644 --- a/tests/unit/test_sync_process.py +++ b/tests/unit/test_sync_process.py @@ -144,41 +144,24 @@ def test_sender_thread_fails_on_regular_error(sender): sender.errors_queue.put.assert_called_once() -def test_sender_thread_processes_element_on_429_and_408_http_statuses(): +def test_sender_thread_processes_element_on_429_and_408_http_statuses(sender): # given - operations_queue = Mock() - status_tracking_queue = Mock() - errors_queue = Mock() - last_queue_seq = SharedInt(initial_value=0) - backend = Mock() - sender_thread = SenderThread( - api_token="", - family="", - operations_queue=operations_queue, - status_tracking_queue=status_tracking_queue, - errors_queue=errors_queue, - last_queued_seq=last_queue_seq, - mode="disabled", - ) - sender_thread._backend = backend - - # and update = UpdateRunSnapshot(assign={"key": Value(string="a")}) element = single_operation(update, sequence_id=2) - operations_queue.get.side_effect = [ + sender.operations_queue.get.side_effect = [ BatchedOperations(sequence_id=element.sequence_id, timestamp=element.timestamp, operation=element.operation), queue.Empty, ] # and - backend.submit.side_effect = [ + sender.backend.submit.side_effect = [ response([], status_code=408), response([], status_code=429), response(["a"], status_code=200), ] # when - sender_thread.work() + sender.sender_thread.work() # then - assert backend.submit.call_count == 3 + assert sender.backend.submit.call_count == 3 From f86216d4ada741c5007b9fbda43e80b7a897d61f Mon Sep 17 00:00:00 2001 From: Siddhant Sadangi Date: Wed, 8 Jan 2025 11:16:16 +0100 Subject: [PATCH 09/21] Update pyproject.toml (#114) --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 5d6bb78e..b2e285cd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,7 +19,7 @@ backoff = "^2.0.0" [tool.poetry] name = "neptune-scale" version = "0.1.0" -description = "A minimal client library" +description = "Python logging API for Neptune Scale" authors = ["neptune.ai "] repository = "https://github.com/neptune-ai/neptune-client-scale" readme = "README.md" From ee4fab868504b70845f2ded4339fca62937ae53e Mon Sep 17 00:00:00 2001 From: PatrykGala Date: Fri, 10 Jan 2025 11:37:37 +0100 Subject: [PATCH 10/21] Update neptune-fetcher in e2e --- dev_requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev_requirements.txt b/dev_requirements.txt index 0670678b..e6cb5378 100644 --- a/dev_requirements.txt +++ b/dev_requirements.txt @@ -7,4 +7,4 @@ pytest pytest-timeout pytest-xdist freezegun -neptune-fetcher @ git+https://github.com/neptune-ai/neptune-fetcher.git@7912dfbbffcc676079870733572553116e569efa +neptune-fetcher From aecddeb87934be3414605e949a8c52dcf214cb8c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20So=C5=9Bnicki?= Date: Thu, 9 Jan 2025 19:14:25 +0100 Subject: [PATCH 11/21] fix: retain operation timestamp while batching --- src/neptune_scale/api/attribute.py | 2 +- src/neptune_scale/sync/aggregating_queue.py | 9 +- src/neptune_scale/sync/operations_queue.py | 3 +- src/neptune_scale/sync/queue_element.py | 3 +- tests/unit/test_aggregating_queue.py | 97 +++++++++++++++------ 5 files changed, 79 insertions(+), 35 deletions(-) diff --git a/src/neptune_scale/api/attribute.py b/src/neptune_scale/api/attribute.py index ea433137..3b70672c 100644 --- a/src/neptune_scale/api/attribute.py +++ b/src/neptune_scale/api/attribute.py @@ -100,7 +100,7 @@ def log( ) for operation, metadata_size in splitter: - self._operations_queue.enqueue(operation=operation, size=metadata_size, key=step) + self._operations_queue.enqueue(operation=operation, size=metadata_size, key=(step, timestamp.timestamp())) class Attribute: diff --git a/src/neptune_scale/sync/aggregating_queue.py b/src/neptune_scale/sync/aggregating_queue.py index 8e3fc40e..e63b6c62 100644 --- a/src/neptune_scale/sync/aggregating_queue.py +++ b/src/neptune_scale/sync/aggregating_queue.py @@ -8,7 +8,10 @@ Queue, ) from threading import RLock -from typing import Optional +from typing import ( + Any, + Optional, +) from neptune_api.proto.neptune_pb.ingest.v1.pub.ingest_pb2 import RunOperation @@ -71,7 +74,7 @@ def commit(self) -> None: def get(self) -> BatchedOperations: start = time.monotonic() - batch_operations: dict[Optional[float], RunOperation] = {} + batch_operations: dict[Any, RunOperation] = {} batch_sequence_id: Optional[int] = None batch_timestamp: Optional[float] = None @@ -157,7 +160,7 @@ def get(self) -> BatchedOperations: ) -def create_run_batch(operations: dict[Optional[float], RunOperation]) -> RunOperation: +def create_run_batch(operations: dict[Any, RunOperation]) -> RunOperation: if len(operations) == 1: return next(iter(operations.values())) diff --git a/src/neptune_scale/sync/operations_queue.py b/src/neptune_scale/sync/operations_queue.py index 0069b472..dd1dcc60 100644 --- a/src/neptune_scale/sync/operations_queue.py +++ b/src/neptune_scale/sync/operations_queue.py @@ -6,6 +6,7 @@ from time import monotonic from typing import ( TYPE_CHECKING, + Any, Optional, ) @@ -57,7 +58,7 @@ def last_timestamp(self) -> Optional[float]: with self._lock: return self._last_timestamp - def enqueue(self, *, operation: RunOperation, size: Optional[int] = None, key: Optional[float] = None) -> None: + def enqueue(self, *, operation: RunOperation, size: Optional[int] = None, key: Optional[Any] = None) -> None: try: is_metadata_update = operation.HasField("update") serialized_operation = operation.SerializeToString() diff --git a/src/neptune_scale/sync/queue_element.py b/src/neptune_scale/sync/queue_element.py index 1c37ff11..f2752079 100644 --- a/src/neptune_scale/sync/queue_element.py +++ b/src/neptune_scale/sync/queue_element.py @@ -1,6 +1,7 @@ __all__ = ("BatchedOperations", "SingleOperation") from typing import ( + Any, NamedTuple, Optional, ) @@ -27,4 +28,4 @@ class SingleOperation(NamedTuple): # Size of the metadata in the operation (without project, family, run_id etc.) metadata_size: Optional[int] # Update metadata key - batch_key: Optional[float] + batch_key: Optional[Any] diff --git a/tests/unit/test_aggregating_queue.py b/tests/unit/test_aggregating_queue.py index de9394a0..c2c5a3b1 100644 --- a/tests/unit/test_aggregating_queue.py +++ b/tests/unit/test_aggregating_queue.py @@ -6,6 +6,7 @@ import pytest from freezegun import freeze_time +from google.protobuf.timestamp_pb2 import Timestamp from neptune_api.proto.neptune_pb.ingest.v1.common_pb2 import Run as CreateRun from neptune_api.proto.neptune_pb.ingest.v1.common_pb2 import ( Step, @@ -189,21 +190,23 @@ def test__queue_element_size_limit_with_different_steps(): update2 = UpdateRunSnapshot(step=Step(whole=2), assign={f"bb{i}": Value(int64=(i * 25)) for i in range(2)}) operation1 = RunOperation(update=update1) operation2 = RunOperation(update=update2) + timestamp1 = time.process_time() + timestamp2 = timestamp1 + 1 element1 = SingleOperation( sequence_id=1, - timestamp=time.process_time(), + timestamp=timestamp1, operation=operation1.SerializeToString(), is_batchable=True, metadata_size=update1.ByteSize(), - batch_key=1.0, + batch_key=(1.0, timestamp1), ) element2 = SingleOperation( sequence_id=2, - timestamp=time.process_time(), + timestamp=timestamp2, operation=operation2.SerializeToString(), is_batchable=True, metadata_size=update2.ByteSize(), - batch_key=2.0, + batch_key=(2.0, timestamp2), ) # and @@ -361,21 +364,23 @@ def test__merge_same_key(): operation2 = RunOperation(update=update2, project="project", run_id="run_id") # and + timestamp0 = time.process_time() + batch_key = (1.0, timestamp0) element1 = SingleOperation( sequence_id=1, - timestamp=time.process_time(), + timestamp=timestamp0, operation=operation1.SerializeToString(), is_batchable=True, metadata_size=update1.ByteSize(), - batch_key=1.0, + batch_key=batch_key, ) element2 = SingleOperation( sequence_id=2, - timestamp=time.process_time(), + timestamp=timestamp0, operation=operation2.SerializeToString(), is_batchable=True, metadata_size=update2.ByteSize(), - batch_key=1.0, + batch_key=batch_key, ) # and @@ -390,7 +395,7 @@ def test__merge_same_key(): # then assert result.sequence_id == 2 - assert result.timestamp == element2.timestamp + assert result.timestamp == timestamp0 # and batch = RunOperation() @@ -403,7 +408,7 @@ def test__merge_same_key(): @freeze_time("2024-09-01") -def test__merge_two_different_steps(): +def test__batch_two_different_steps(): # given update1 = UpdateRunSnapshot(step=Step(whole=1, micro=0), assign={f"aa{i}": Value(int64=(i * 97)) for i in range(2)}) update2 = UpdateRunSnapshot(step=Step(whole=2, micro=0), assign={f"bb{i}": Value(int64=(i * 25)) for i in range(2)}) @@ -413,21 +418,23 @@ def test__merge_two_different_steps(): operation2 = RunOperation(update=update2, project="project", run_id="run_id") # and + timestamp1 = time.process_time() + timestamp2 = timestamp1 + 1 element1 = SingleOperation( sequence_id=1, - timestamp=time.process_time(), + timestamp=timestamp1, operation=operation1.SerializeToString(), is_batchable=True, metadata_size=0, - batch_key=1.0, + batch_key=(1.0, timestamp1), ) element2 = SingleOperation( sequence_id=2, - timestamp=time.process_time(), + timestamp=timestamp2, operation=operation2.SerializeToString(), is_batchable=True, metadata_size=0, - batch_key=2.0, + batch_key=(2.0, timestamp2), ) # and @@ -454,7 +461,7 @@ def test__merge_two_different_steps(): @freeze_time("2024-09-01") -def test__merge_step_with_none(): +def test__batch_step_with_none(): # given update1 = UpdateRunSnapshot(step=Step(whole=1, micro=0), assign={f"aa{i}": Value(int64=(i * 97)) for i in range(2)}) update2 = UpdateRunSnapshot(step=None, assign={f"bb{i}": Value(int64=(i * 25)) for i in range(2)}) @@ -464,13 +471,14 @@ def test__merge_step_with_none(): operation2 = RunOperation(update=update2, project="project", run_id="run_id") # and + timestamp1 = time.process_time() element1 = SingleOperation( sequence_id=1, - timestamp=time.process_time(), + timestamp=timestamp1, operation=operation1.SerializeToString(), is_batchable=True, metadata_size=0, - batch_key=1.0, + batch_key=(1.0, timestamp1), ) element2 = SingleOperation( sequence_id=2, @@ -507,10 +515,27 @@ def test__merge_step_with_none(): @freeze_time("2024-09-01") def test__merge_two_steps_two_metrics(): # given - update1a = UpdateRunSnapshot(step=Step(whole=1, micro=0), assign={"aa": Value(int64=10)}) - update2a = UpdateRunSnapshot(step=Step(whole=2, micro=0), assign={"aa": Value(int64=20)}) - update1b = UpdateRunSnapshot(step=Step(whole=1, micro=0), assign={"bb": Value(int64=100)}) - update2b = UpdateRunSnapshot(step=Step(whole=2, micro=0), assign={"bb": Value(int64=200)}) + timestamp0 = int(time.process_time()) + update1a = UpdateRunSnapshot( + step=Step(whole=1, micro=0), + timestamp=Timestamp(seconds=timestamp0 + 1, nanos=0), + assign={"aa": Value(int64=10)}, + ) + update2a = UpdateRunSnapshot( + step=Step(whole=2, micro=0), + timestamp=Timestamp(seconds=timestamp0 + 2, nanos=0), + assign={"aa": Value(int64=20)}, + ) + update1b = UpdateRunSnapshot( + step=Step(whole=1, micro=0), + timestamp=Timestamp(seconds=timestamp0 + 3, nanos=0), + assign={"bb": Value(int64=100)}, + ) + update2b = UpdateRunSnapshot( + step=Step(whole=2, micro=0), + timestamp=Timestamp(seconds=timestamp0 + 4, nanos=0), + assign={"bb": Value(int64=200)}, + ) # and operations = [ @@ -522,13 +547,13 @@ def test__merge_two_steps_two_metrics(): elements = [ SingleOperation( sequence_id=sequence_id, - timestamp=time.process_time(), + timestamp=timestamp0 + sequence_id, operation=operation.SerializeToString(), is_batchable=True, metadata_size=0, - batch_key=batch_key, + batch_key=(step, timestamp0 + sequence_id), ) - for sequence_id, batch_key, operation in [ + for sequence_id, step, operation in [ (1, 1.0, operations[0]), (2, 2.0, operations[1]), (3, 1.0, operations[2]), @@ -554,13 +579,27 @@ def test__merge_two_steps_two_metrics(): batch = RunOperation() batch.ParseFromString(result.operation) - update1_merged = UpdateRunSnapshot( - step=Step(whole=1, micro=0), assign={"aa": Value(int64=10), "bb": Value(int64=100)} + update1 = UpdateRunSnapshot( + step=Step(whole=1, micro=0), + timestamp=Timestamp(seconds=timestamp0 + 1, nanos=0), + assign={"aa": Value(int64=10)}, + ) + update2 = UpdateRunSnapshot( + step=Step(whole=1, micro=0), + timestamp=Timestamp(seconds=timestamp0 + 3, nanos=0), + assign={"bb": Value(int64=100)}, + ) + update3 = UpdateRunSnapshot( + step=Step(whole=2, micro=0), + timestamp=Timestamp(seconds=timestamp0 + 2, nanos=0), + assign={"aa": Value(int64=20)}, ) - update2_merged = UpdateRunSnapshot( - step=Step(whole=2, micro=0), assign={"aa": Value(int64=20), "bb": Value(int64=200)} + update4 = UpdateRunSnapshot( + step=Step(whole=2, micro=0), + timestamp=Timestamp(seconds=timestamp0 + 4, nanos=0), + assign={"bb": Value(int64=200)}, ) assert batch.project == "project" assert batch.run_id == "run_id" - assert batch.update_batch.snapshots == [update1_merged, update2_merged] + assert batch.update_batch.snapshots == [update1, update2, update3, update4] From f9a3219d037c090281228fb67db3713c67e8190d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20So=C5=9Bnicki?= Date: Thu, 9 Jan 2025 19:53:27 +0100 Subject: [PATCH 12/21] fix: remove merging, exclusively perform batching --- src/neptune_scale/api/attribute.py | 2 +- src/neptune_scale/sync/aggregating_queue.py | 67 +++++---------------- src/neptune_scale/sync/operations_queue.py | 4 +- src/neptune_scale/sync/queue_element.py | 3 - tests/unit/test_aggregating_queue.py | 59 +++--------------- tests/unit/test_sync_process.py | 1 - 6 files changed, 28 insertions(+), 108 deletions(-) diff --git a/src/neptune_scale/api/attribute.py b/src/neptune_scale/api/attribute.py index 3b70672c..734bf3e5 100644 --- a/src/neptune_scale/api/attribute.py +++ b/src/neptune_scale/api/attribute.py @@ -100,7 +100,7 @@ def log( ) for operation, metadata_size in splitter: - self._operations_queue.enqueue(operation=operation, size=metadata_size, key=(step, timestamp.timestamp())) + self._operations_queue.enqueue(operation=operation, size=metadata_size) class Attribute: diff --git a/src/neptune_scale/sync/aggregating_queue.py b/src/neptune_scale/sync/aggregating_queue.py index e63b6c62..0bf47226 100644 --- a/src/neptune_scale/sync/aggregating_queue.py +++ b/src/neptune_scale/sync/aggregating_queue.py @@ -8,10 +8,7 @@ Queue, ) from threading import RLock -from typing import ( - Any, - Optional, -) +from typing import Optional from neptune_api.proto.neptune_pb.ingest.v1.pub.ingest_pb2 import RunOperation @@ -74,7 +71,7 @@ def commit(self) -> None: def get(self) -> BatchedOperations: start = time.monotonic() - batch_operations: dict[Any, RunOperation] = {} + batch_operations: list[RunOperation] = [] batch_sequence_id: Optional[int] = None batch_timestamp: Optional[float] = None @@ -98,7 +95,7 @@ def get(self) -> BatchedOperations: if not batch_operations: new_operation = RunOperation() new_operation.ParseFromString(element.operation) - batch_operations[element.batch_key] = new_operation + batch_operations.append(new_operation) batch_bytes += len(element.operation) else: if not element.is_batchable: @@ -113,10 +110,7 @@ def get(self) -> BatchedOperations: new_operation = RunOperation() new_operation.ParseFromString(element.operation) - if element.batch_key not in batch_operations: - batch_operations[element.batch_key] = new_operation - else: - merge_run_operation(batch_operations[element.batch_key], new_operation) + batch_operations.append(new_operation) batch_bytes += element.metadata_size batch_sequence_id = element.sequence_id @@ -160,54 +154,25 @@ def get(self) -> BatchedOperations: ) -def create_run_batch(operations: dict[Any, RunOperation]) -> RunOperation: +def create_run_batch(operations: list[RunOperation]) -> RunOperation: + if not operations: + raise Empty + if len(operations) == 1: - return next(iter(operations.values())) + return operations[0] - batch = None - for _, operation in sorted(operations.items(), key=lambda x: (x[0] is not None, x[0])): - if batch is None: - batch = RunOperation() - batch.project = operation.project - batch.run_id = operation.run_id - batch.create_missing_project = operation.create_missing_project - batch.api_key = operation.api_key + head = operations[0] + batch = RunOperation() + batch.project = head.project + batch.run_id = head.run_id + batch.create_missing_project = head.create_missing_project + batch.api_key = head.api_key + for operation in operations: operation_type = operation.WhichOneof("operation") if operation_type == "update": batch.update_batch.snapshots.append(operation.update) else: raise ValueError("Cannot batch operation of type %s", operation_type) - if batch is None: - raise Empty return batch - - -def merge_run_operation(batch: RunOperation, operation: RunOperation) -> None: - """ - Merge the `operation` into `batch`, taking into account the special case of `modify_sets`. - - Protobuf merges existing map keys by simply overwriting values, instead of calling - `MergeFrom` on the existing value, eg: A['foo'] = B['foo']. - - We want this instead: - - batch = {'sys/tags': 'string': { 'values': {'foo': ADD}}} - operation = {'sys/tags': 'string': { 'values': {'bar': ADD}}} - result = {'sys/tags': 'string': { 'values': {'foo': ADD, 'bar': ADD}}} - - If we called `batch.MergeFrom(operation)` we would get an overwritten value: - result = {'sys/tags': 'string': { 'values': {'bar': ADD}}} - - This function ensures that the `modify_sets` are merged correctly, leaving the default - behaviour for all other fields. - """ - - modify_sets = operation.update.modify_sets - operation.update.ClearField("modify_sets") - - batch.MergeFrom(operation) - - for k, v in modify_sets.items(): - batch.update.modify_sets[k].MergeFrom(v) diff --git a/src/neptune_scale/sync/operations_queue.py b/src/neptune_scale/sync/operations_queue.py index dd1dcc60..d518da11 100644 --- a/src/neptune_scale/sync/operations_queue.py +++ b/src/neptune_scale/sync/operations_queue.py @@ -6,7 +6,6 @@ from time import monotonic from typing import ( TYPE_CHECKING, - Any, Optional, ) @@ -58,7 +57,7 @@ def last_timestamp(self) -> Optional[float]: with self._lock: return self._last_timestamp - def enqueue(self, *, operation: RunOperation, size: Optional[int] = None, key: Optional[Any] = None) -> None: + def enqueue(self, *, operation: RunOperation, size: Optional[int] = None) -> None: try: is_metadata_update = operation.HasField("update") serialized_operation = operation.SerializeToString() @@ -76,7 +75,6 @@ def enqueue(self, *, operation: RunOperation, size: Optional[int] = None, key: O operation=serialized_operation, metadata_size=size, is_batchable=is_metadata_update, - batch_key=key, ), block=True, timeout=None, diff --git a/src/neptune_scale/sync/queue_element.py b/src/neptune_scale/sync/queue_element.py index f2752079..521f89e4 100644 --- a/src/neptune_scale/sync/queue_element.py +++ b/src/neptune_scale/sync/queue_element.py @@ -1,7 +1,6 @@ __all__ = ("BatchedOperations", "SingleOperation") from typing import ( - Any, NamedTuple, Optional, ) @@ -27,5 +26,3 @@ class SingleOperation(NamedTuple): is_batchable: bool # Size of the metadata in the operation (without project, family, run_id etc.) metadata_size: Optional[int] - # Update metadata key - batch_key: Optional[Any] diff --git a/tests/unit/test_aggregating_queue.py b/tests/unit/test_aggregating_queue.py index c2c5a3b1..0f736f57 100644 --- a/tests/unit/test_aggregating_queue.py +++ b/tests/unit/test_aggregating_queue.py @@ -33,7 +33,6 @@ def test__simple(): operation=operation.SerializeToString(), is_batchable=True, metadata_size=update.ByteSize(), - batch_key=None, ) # and @@ -61,7 +60,6 @@ def test__max_size_exceeded(): operation=operation1.SerializeToString(), is_batchable=True, metadata_size=0, - batch_key=None, ) element2 = SingleOperation( sequence_id=2, @@ -69,7 +67,6 @@ def test__max_size_exceeded(): operation=operation2.SerializeToString(), is_batchable=True, metadata_size=0, - batch_key=None, ) # and @@ -109,7 +106,6 @@ def test__batch_size_limit(): operation=operation1.SerializeToString(), is_batchable=True, metadata_size=update1.ByteSize(), - batch_key=None, ) element2 = SingleOperation( sequence_id=2, @@ -117,7 +113,6 @@ def test__batch_size_limit(): operation=operation2.SerializeToString(), is_batchable=True, metadata_size=update2.ByteSize(), - batch_key=None, ) # and @@ -149,7 +144,6 @@ def test__batching(): operation=operation1.SerializeToString(), is_batchable=True, metadata_size=update1.ByteSize(), - batch_key=None, ) element2 = SingleOperation( sequence_id=2, @@ -157,7 +151,6 @@ def test__batching(): operation=operation2.SerializeToString(), is_batchable=True, metadata_size=update2.ByteSize(), - batch_key=None, ) # and @@ -180,7 +173,8 @@ def test__batching(): assert batch.project == "project" assert batch.run_id == "run_id" - assert all(k in batch.update.assign for k in ["aa0", "aa1", "bb0", "bb1"]) + assert all(k in batch.update_batch.snapshots[0].assign for k in ["aa0", "aa1"]) + assert all(k in batch.update_batch.snapshots[1].assign for k in ["bb0", "bb1"]) @freeze_time("2024-09-01") @@ -198,7 +192,6 @@ def test__queue_element_size_limit_with_different_steps(): operation=operation1.SerializeToString(), is_batchable=True, metadata_size=update1.ByteSize(), - batch_key=(1.0, timestamp1), ) element2 = SingleOperation( sequence_id=2, @@ -206,7 +199,6 @@ def test__queue_element_size_limit_with_different_steps(): operation=operation2.SerializeToString(), is_batchable=True, metadata_size=update2.ByteSize(), - batch_key=(2.0, timestamp2), ) # and @@ -238,7 +230,6 @@ def test__not_merge_two_run_creation(): operation=operation1.SerializeToString(), is_batchable=False, metadata_size=0, - batch_key=None, ) element2 = SingleOperation( sequence_id=2, @@ -246,7 +237,6 @@ def test__not_merge_two_run_creation(): operation=operation2.SerializeToString(), is_batchable=False, metadata_size=0, - batch_key=None, ) # and @@ -304,7 +294,6 @@ def test__not_merge_run_creation_with_metadata_update(): operation=operation1.SerializeToString(), is_batchable=False, metadata_size=0, - batch_key=None, ) element2 = SingleOperation( sequence_id=2, @@ -312,7 +301,6 @@ def test__not_merge_run_creation_with_metadata_update(): operation=operation2.SerializeToString(), is_batchable=True, metadata_size=update.ByteSize(), - batch_key=None, ) # and @@ -354,7 +342,7 @@ def test__not_merge_run_creation_with_metadata_update(): @freeze_time("2024-09-01") -def test__merge_same_key(): +def test__batch_same_key(): # given update1 = UpdateRunSnapshot(step=Step(whole=1, micro=0), assign={f"aa{i}": Value(int64=(i * 97)) for i in range(2)}) update2 = UpdateRunSnapshot(step=Step(whole=1, micro=0), assign={f"bb{i}": Value(int64=(i * 25)) for i in range(2)}) @@ -365,14 +353,12 @@ def test__merge_same_key(): # and timestamp0 = time.process_time() - batch_key = (1.0, timestamp0) element1 = SingleOperation( sequence_id=1, timestamp=timestamp0, operation=operation1.SerializeToString(), is_batchable=True, metadata_size=update1.ByteSize(), - batch_key=batch_key, ) element2 = SingleOperation( sequence_id=2, @@ -380,7 +366,6 @@ def test__merge_same_key(): operation=operation2.SerializeToString(), is_batchable=True, metadata_size=update2.ByteSize(), - batch_key=batch_key, ) # and @@ -403,8 +388,10 @@ def test__merge_same_key(): assert batch.project == "project" assert batch.run_id == "run_id" - assert batch.update.step == Step(whole=1, micro=0) - assert all(k in batch.update.assign for k in ["aa0", "aa1", "bb0", "bb1"]) + assert batch.update_batch.snapshots[0].step == Step(whole=1, micro=0) + assert batch.update_batch.snapshots[1].step == Step(whole=1, micro=0) + assert all(k in batch.update_batch.snapshots[0].assign for k in ["aa0", "aa1"]) + assert all(k in batch.update_batch.snapshots[1].assign for k in ["bb0", "bb1"]) @freeze_time("2024-09-01") @@ -426,7 +413,6 @@ def test__batch_two_different_steps(): operation=operation1.SerializeToString(), is_batchable=True, metadata_size=0, - batch_key=(1.0, timestamp1), ) element2 = SingleOperation( sequence_id=2, @@ -434,7 +420,6 @@ def test__batch_two_different_steps(): operation=operation2.SerializeToString(), is_batchable=True, metadata_size=0, - batch_key=(2.0, timestamp2), ) # and @@ -478,7 +463,6 @@ def test__batch_step_with_none(): operation=operation1.SerializeToString(), is_batchable=True, metadata_size=0, - batch_key=(1.0, timestamp1), ) element2 = SingleOperation( sequence_id=2, @@ -486,7 +470,6 @@ def test__batch_step_with_none(): operation=operation2.SerializeToString(), is_batchable=True, metadata_size=0, - batch_key=None, ) # and @@ -509,11 +492,11 @@ def test__batch_step_with_none(): assert batch.project == "project" assert batch.run_id == "run_id" - assert batch.update_batch.snapshots == [update2, update1] # None is always first + assert batch.update_batch.snapshots == [update1, update2] @freeze_time("2024-09-01") -def test__merge_two_steps_two_metrics(): +def test__batch_two_steps_two_metrics(): # given timestamp0 = int(time.process_time()) update1a = UpdateRunSnapshot( @@ -551,7 +534,6 @@ def test__merge_two_steps_two_metrics(): operation=operation.SerializeToString(), is_batchable=True, metadata_size=0, - batch_key=(step, timestamp0 + sequence_id), ) for sequence_id, step, operation in [ (1, 1.0, operations[0]), @@ -579,27 +561,6 @@ def test__merge_two_steps_two_metrics(): batch = RunOperation() batch.ParseFromString(result.operation) - update1 = UpdateRunSnapshot( - step=Step(whole=1, micro=0), - timestamp=Timestamp(seconds=timestamp0 + 1, nanos=0), - assign={"aa": Value(int64=10)}, - ) - update2 = UpdateRunSnapshot( - step=Step(whole=1, micro=0), - timestamp=Timestamp(seconds=timestamp0 + 3, nanos=0), - assign={"bb": Value(int64=100)}, - ) - update3 = UpdateRunSnapshot( - step=Step(whole=2, micro=0), - timestamp=Timestamp(seconds=timestamp0 + 2, nanos=0), - assign={"aa": Value(int64=20)}, - ) - update4 = UpdateRunSnapshot( - step=Step(whole=2, micro=0), - timestamp=Timestamp(seconds=timestamp0 + 4, nanos=0), - assign={"bb": Value(int64=200)}, - ) - assert batch.project == "project" assert batch.run_id == "run_id" - assert batch.update_batch.snapshots == [update1, update2, update3, update4] + assert batch.update_batch.snapshots == [update1a, update2a, update1b, update2b] diff --git a/tests/unit/test_sync_process.py b/tests/unit/test_sync_process.py index 1709510a..25b17918 100644 --- a/tests/unit/test_sync_process.py +++ b/tests/unit/test_sync_process.py @@ -35,7 +35,6 @@ def single_operation(update: UpdateRunSnapshot, sequence_id): operation=operation.SerializeToString(), is_batchable=True, metadata_size=update.ByteSize(), - batch_key=None, ) From 21fdf668a1343c4f64aa01101407c5309f75c183 Mon Sep 17 00:00:00 2001 From: Krzysztof Godlewski Date: Thu, 9 Jan 2025 17:35:39 +0100 Subject: [PATCH 13/21] Add `util.ensure_api_token()` --- src/neptune_scale/net/projects.py | 18 +++--------------- src/neptune_scale/sync/util.py | 15 +++++++++++++++ 2 files changed, 18 insertions(+), 15 deletions(-) diff --git a/src/neptune_scale/net/projects.py b/src/neptune_scale/net/projects.py index ce698ada..4a97de2d 100644 --- a/src/neptune_scale/net/projects.py +++ b/src/neptune_scale/net/projects.py @@ -1,4 +1,3 @@ -import os import re from enum import Enum from json import JSONDecodeError @@ -11,7 +10,6 @@ import httpx from neptune_scale.exceptions import ( - NeptuneApiTokenNotProvided, NeptuneBadRequestError, NeptuneProjectAlreadyExists, ) @@ -19,7 +17,7 @@ HostedApiClient, with_api_errors_handling, ) -from neptune_scale.util.envs import API_TOKEN_ENV_NAME +from neptune_scale.sync.util import ensure_api_token PROJECTS_PATH_BASE = "/api/backend/v1/projects" @@ -33,14 +31,6 @@ class ProjectVisibility(Enum): ORGANIZATION_NOT_FOUND_RE = re.compile(r"Organization .* not found") -def _get_api_token(api_token: Optional[str]) -> str: - api_token = api_token or os.environ.get(API_TOKEN_ENV_NAME) - if api_token is None: - raise NeptuneApiTokenNotProvided() - - return api_token - - @with_api_errors_handling def create_project( workspace: str, @@ -52,9 +42,7 @@ def create_project( fail_if_exists: bool = False, api_token: Optional[str] = None, ) -> None: - api_token = _get_api_token(api_token) - - client = HostedApiClient(api_token=api_token) + client = HostedApiClient(api_token=ensure_api_token(api_token)) visibility = ProjectVisibility(visibility) body = { @@ -92,7 +80,7 @@ def _safe_json(response: httpx.Response) -> Any: def get_project_list(*, api_token: Optional[str] = None) -> list[dict]: - client = HostedApiClient(api_token=_get_api_token(api_token)) + client = HostedApiClient(api_token=ensure_api_token(api_token)) params = { "userRelation": "viewerOrHigher", diff --git a/src/neptune_scale/sync/util.py b/src/neptune_scale/sync/util.py index 60fe4b0b..2d5ecf96 100644 --- a/src/neptune_scale/sync/util.py +++ b/src/neptune_scale/sync/util.py @@ -1,4 +1,9 @@ +import os import signal +from typing import Optional + +from neptune_scale.exceptions import NeptuneApiTokenNotProvided +from neptune_scale.util.envs import API_TOKEN_ENV_NAME def safe_signal_name(signum: int) -> str: @@ -8,3 +13,13 @@ def safe_signal_name(signum: int) -> str: signame = str(signum) return signame + + +def ensure_api_token(api_token: Optional[str]) -> str: + """Ensure the API token is provided via either explicit argument, or env variable.""" + + api_token = api_token or os.environ.get(API_TOKEN_ENV_NAME) + if api_token is None: + raise NeptuneApiTokenNotProvided() + + return api_token From dfda98c5ab0ec7f28d9a5ac2b555224ef954116e Mon Sep 17 00:00:00 2001 From: Krzysztof Godlewski Date: Fri, 10 Jan 2025 16:52:48 +0100 Subject: [PATCH 14/21] Add `.net.runs.run_exists()` --- .pre-commit-config.yaml | 2 +- src/neptune_scale/net/api_client.py | 29 +++++++++++++++++++++++++++++ src/neptune_scale/net/runs.py | 29 +++++++++++++++++++++++++++++ src/neptune_scale/net/util.py | 6 ++++++ 4 files changed, 65 insertions(+), 1 deletion(-) create mode 100644 src/neptune_scale/net/runs.py create mode 100644 src/neptune_scale/net/util.py diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index b421be2a..73ef26dc 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -29,7 +29,7 @@ repos: args: [ --config-file, pyproject.toml ] pass_filenames: false additional_dependencies: - - neptune-api==0.7.0b + - neptune-api - more-itertools - backoff default_language_version: diff --git a/src/neptune_scale/net/api_client.py b/src/neptune_scale/net/api_client.py index a6cf90c2..1bbcccfd 100644 --- a/src/neptune_scale/net/api_client.py +++ b/src/neptune_scale/net/api_client.py @@ -27,9 +27,11 @@ from typing import ( Any, Literal, + cast, ) import httpx +import neptune_retrieval_api.client from httpx import Timeout from neptune_api import ( AuthenticatedClient, @@ -64,6 +66,11 @@ from neptune_api.proto.neptune_pb.ingest.v1.pub.ingest_pb2 import RunOperation from neptune_api.proto.neptune_pb.ingest.v1.pub.request_status_pb2 import RequestStatus from neptune_api.types import Response +from neptune_retrieval_api.api.default import search_leaderboard_entries_proto +from neptune_retrieval_api.models import SearchLeaderboardEntriesParamsDTO +from neptune_retrieval_api.proto.neptune_pb.api.v1.model.leaderboard_entries_pb2 import ( + ProtoLeaderboardEntriesSearchResultDTO, +) from neptune_scale.exceptions import ( NeptuneConnectionLostError, @@ -129,6 +136,11 @@ def submit(self, operation: RunOperation, family: str) -> Response[SubmitRespons @abc.abstractmethod def check_batch(self, request_ids: list[str], project: str) -> Response[BulkRequestStatus]: ... + @abc.abstractmethod + def search_entries( + self, project_id: str, body: SearchLeaderboardEntriesParamsDTO + ) -> ProtoLeaderboardEntriesSearchResultDTO: ... + class HostedApiClient(ApiClient): def __init__(self, api_token: str) -> None: @@ -141,6 +153,9 @@ def __init__(self, api_token: str) -> None: self.backend = create_auth_api_client( credentials=credentials, config=config, token_refreshing_urls=token_urls, verify_ssl=verify_ssl ) + # This is required only to silence mypy. The two client objects are compatible, because they're + # generated by swagger codegen. + self.retrieval_backend = cast(neptune_retrieval_api.client.AuthenticatedClient, self.backend) logger.debug("Connected to Neptune API") def submit(self, operation: RunOperation, family: str) -> Response[SubmitResponse]: @@ -153,6 +168,15 @@ def check_batch(self, request_ids: list[str], project: str) -> Response[BulkRequ body=RequestIdList(ids=[RequestId(value=request_id) for request_id in request_ids]), ) + def search_entries( + self, project_id: str, body: SearchLeaderboardEntriesParamsDTO + ) -> ProtoLeaderboardEntriesSearchResultDTO: + resp = search_leaderboard_entries_proto.sync_detailed( + client=self.retrieval_backend, project_identifier=project_id, type=["run"], body=body + ) + result = ProtoLeaderboardEntriesSearchResultDTO.FromString(resp.content) + return result + def close(self) -> None: logger.debug("Closing API client") self.backend.__exit__() @@ -181,6 +205,11 @@ def check_batch(self, request_ids: list[str], project: str) -> Response[BulkRequ ) return Response(content=b"", parsed=response_body, status_code=HTTPStatus.OK, headers={}) + def search_entries( + self, project_id: str, body: SearchLeaderboardEntriesParamsDTO + ) -> ProtoLeaderboardEntriesSearchResultDTO: + return ProtoLeaderboardEntriesSearchResultDTO() + def backend_factory(api_token: str, mode: Literal["async", "disabled"]) -> ApiClient: if mode == "disabled": diff --git a/src/neptune_scale/net/runs.py b/src/neptune_scale/net/runs.py new file mode 100644 index 00000000..bf14fe86 --- /dev/null +++ b/src/neptune_scale/net/runs.py @@ -0,0 +1,29 @@ +from typing import Optional + +from neptune_retrieval_api.models import SearchLeaderboardEntriesParamsDTO + +from neptune_scale.exceptions import NeptuneScaleError +from neptune_scale.net.api_client import HostedApiClient +from neptune_scale.net.util import escape_nql_criterion +from neptune_scale.sync.util import ensure_api_token + + +def run_exists(project: str, run_id: str, api_token: Optional[str] = None) -> bool: + """Query the backend for the existence of a Run with the given ID. + + Returns True if the Run exists, False otherwise. + """ + + client = HostedApiClient(api_token=ensure_api_token(api_token)) + body = SearchLeaderboardEntriesParamsDTO.from_dict( + { + "query": {"query": f'`sys/custom_run_id`:string = "{escape_nql_criterion(run_id)}"'}, + } + ) + + try: + result = client.search_entries(project, body) + except Exception as e: + raise NeptuneScaleError(reason=e) + + return bool(result.entries) diff --git a/src/neptune_scale/net/util.py b/src/neptune_scale/net/util.py new file mode 100644 index 00000000..cbc25d2e --- /dev/null +++ b/src/neptune_scale/net/util.py @@ -0,0 +1,6 @@ +def escape_nql_criterion(criterion: str) -> str: + """ + Escape backslash and (double-)quotes in the string, to match what the NQL engine expects. + """ + + return criterion.replace("\\", r"\\").replace('"', r"\"") From a4d83bea0b84bf87c158d46ec038a0e4fafe4c8e Mon Sep 17 00:00:00 2001 From: Krzysztof Godlewski Date: Fri, 10 Jan 2025 17:11:22 +0100 Subject: [PATCH 15/21] Add test for `run_exists()` --- tests/e2e/test_net.py | 10 ++++++++++ 1 file changed, 10 insertions(+) create mode 100644 tests/e2e/test_net.py diff --git a/tests/e2e/test_net.py b/tests/e2e/test_net.py new file mode 100644 index 00000000..263def20 --- /dev/null +++ b/tests/e2e/test_net.py @@ -0,0 +1,10 @@ +import os + +from neptune_scale.net.runs import run_exists + +NEPTUNE_PROJECT = os.getenv("NEPTUNE_E2E_PROJECT") + + +def test_run_exists_true(run): + assert run_exists(run._project, run._run_id) + assert not run_exists(run._project, "nonexistent_run_id") From 98cedc51d90bb5d484e1f2c9e15747a8acd71b1d Mon Sep 17 00:00:00 2001 From: Krzysztof Godlewski Date: Fri, 10 Jan 2025 17:47:10 +0100 Subject: [PATCH 16/21] Wait for longer in `ProcessLink` test: fixes an issue on Windows runner --- tests/unit/test_process_link.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/unit/test_process_link.py b/tests/unit/test_process_link.py index f3f68ec3..c030a268 100644 --- a/tests/unit/test_process_link.py +++ b/tests/unit/test_process_link.py @@ -169,7 +169,7 @@ def on_closed(_): link.start(on_link_closed=on_closed) # We should never finish the sleep call, as on_closed raises SystemExit - time.sleep(5) + time.sleep(10) assert False, "on_closed callback was not called" @@ -184,5 +184,5 @@ def test_parent_termination(): p = multiprocessing.Process(target=parent, args=(var, event)) p.start() - assert event.wait(1) + assert event.wait(5) assert var.value == 1 From fd8cd26ea778e201607a91df56f5342d0c90053e Mon Sep 17 00:00:00 2001 From: Krzysztof Godlewski Date: Tue, 17 Dec 2024 16:02:34 +0100 Subject: [PATCH 17/21] Raise on logging non-increasing series step Even though the backend returns an error in this case as well, We want to detect this locally, so it's easier for the user to debug their code. # Conflicts: # src/neptune_scale/api/attribute.py # tests/unit/test_attribute.py --- src/neptune_scale/api/attribute.py | 52 ++++++++++++++++++++------ src/neptune_scale/sync/sync_process.py | 2 + tests/unit/test_attribute.py | 30 ++++++++++++--- 3 files changed, 67 insertions(+), 17 deletions(-) diff --git a/src/neptune_scale/api/attribute.py b/src/neptune_scale/api/attribute.py index 734bf3e5..144387d9 100644 --- a/src/neptune_scale/api/attribute.py +++ b/src/neptune_scale/api/attribute.py @@ -1,5 +1,6 @@ import functools import itertools +import threading import warnings from collections.abc import ( Collection, @@ -14,6 +15,7 @@ cast, ) +from neptune_scale.exceptions import NeptuneSeriesStepNonIncreasing from neptune_scale.sync.metadata_splitter import MetadataSplitter from neptune_scale.sync.operations_queue import OperationsQueue @@ -59,6 +61,11 @@ def __init__(self, project: str, run_id: str, operations_queue: OperationsQueue) self._run_id = run_id self._operations_queue = operations_queue self._attributes: dict[str, Attribute] = {} + # Keep a list of path -> (last step, last value) mappings to detect non-increasing steps + # at call site. The backend will detect this error as well, but it's more convenient for the user + # to get the error as soon as possible. + self._metric_state: dict[str, tuple[float, float]] = {} + self._lock = threading.RLock() def __getitem__(self, path: str) -> "Attribute": path = cleanup_path(path) @@ -85,22 +92,43 @@ def log( ) -> None: if timestamp is None: timestamp = datetime.now() - elif isinstance(timestamp, float): + elif isinstance(timestamp, (float, int)): timestamp = datetime.fromtimestamp(timestamp) - splitter: MetadataSplitter = MetadataSplitter( - project=self._project, - run_id=self._run_id, - step=step, - timestamp=timestamp, - configs=configs, - metrics=metrics, - add_tags=tags_add, - remove_tags=tags_remove, + # TODO: Move splitting into the worker process. Here we should just send messages as they are. + chunks = list( + MetadataSplitter( + project=self._project, + run_id=self._run_id, + step=step, + timestamp=timestamp, + configs=configs, + metrics=metrics, + add_tags=tags_add, + remove_tags=tags_remove, + ) ) - for operation, metadata_size in splitter: - self._operations_queue.enqueue(operation=operation, size=metadata_size) + with self._lock: + self._verify_and_update_metrics_state(step, metrics) + + for operation, metadata_size in chunks: + self._operations_queue.enqueue(operation=operation, size=metadata_size) + + def _verify_and_update_metrics_state(self, step: Optional[float], metrics: Optional[dict[str, float]]) -> None: + """Check if step in provided metrics is increasing, raise `NeptuneSeriesStepNonIncreasing` if not.""" + + if step is None or metrics is None: + return + + for metric, value in metrics.items(): + if (state := self._metric_state.get(metric)) is not None: + last_step, last_value = state + # Repeating a step is fine as long as the value does not change + if step < last_step or (step == last_step and value != last_value): + raise NeptuneSeriesStepNonIncreasing() + + self._metric_state[metric] = (step, value) class Attribute: diff --git a/src/neptune_scale/sync/sync_process.py b/src/neptune_scale/sync/sync_process.py index 4329a2aa..61f22053 100644 --- a/src/neptune_scale/sync/sync_process.py +++ b/src/neptune_scale/sync/sync_process.py @@ -412,6 +412,8 @@ def submit(self, *, operation: RunOperation) -> Optional[SubmitResponse]: def work(self) -> None: try: + # TODO: is there a point in serializing the data on AggregatingQueue? It does not move between processes, + # so we could just pass around instances of RunOperation while (operation := self.get_next()) is not None: sequence_id, timestamp, data = operation diff --git a/tests/unit/test_attribute.py b/tests/unit/test_attribute.py index 481651c7..968286b3 100644 --- a/tests/unit/test_attribute.py +++ b/tests/unit/test_attribute.py @@ -8,13 +8,15 @@ ) from neptune_scale.api.attribute import cleanup_path +from neptune_scale.exceptions import NeptuneSeriesStepNonIncreasing from neptune_scale.legacy import Run @fixture def run(api_token): run = Run(project="dummy/project", run_id="dummy-run", mode="disabled", api_token=api_token) - run._attr_store.log = Mock() + # Mock log to be able to assert calls, but also proxy to the actual method so it does its job + run._attr_store.log = Mock(side_effect=run._attr_store.log) with run: yield run @@ -67,11 +69,29 @@ def test_tags(run, store): def test_series(run, store): - run["sys/series"].append(1, step=1, timestamp=10) - store.log.assert_called_with(metrics={"sys/series": 1}, step=1, timestamp=10) + run["my/series"].append(1, step=1, timestamp=10) + store.log.assert_called_with(metrics={"my/series": 1}, step=1, timestamp=10) - run["sys/series"].append({"foo": 1, "bar": 2}, step=2) - store.log.assert_called_with(metrics={"sys/series/foo": 1, "sys/series/bar": 2}, step=2, timestamp=None) + run["my/series"].append({"foo": 1, "bar": 2}, step=2) + store.log.assert_called_with(metrics={"my/series/foo": 1, "my/series/bar": 2}, step=2, timestamp=None) + + +def test_error_on_non_increasing_step(run): + run["series"].append(1, step=2) + + # Step lower than previous + with pytest.raises(NeptuneSeriesStepNonIncreasing): + run["series"].append(2, step=1) + + # Equal to previous, but different value + with pytest.raises(NeptuneSeriesStepNonIncreasing): + run["series"].append(3, step=2) + + # Equal to previous, same value -> should pass + run["series"].append(1, step=2) + + # None should pass, as it means auto-increment + run["series"].append(4, step=None) @pytest.mark.parametrize( From 7aa5f2426f2475c8fc63d308c2788b4a4883dc9e Mon Sep 17 00:00:00 2001 From: Krzysztof Godlewski Date: Tue, 17 Dec 2024 16:12:10 +0100 Subject: [PATCH 18/21] Fix a test on Windows --- tests/unit/test_attribute.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/unit/test_attribute.py b/tests/unit/test_attribute.py index 968286b3..072aa84a 100644 --- a/tests/unit/test_attribute.py +++ b/tests/unit/test_attribute.py @@ -1,3 +1,4 @@ +import time from datetime import datetime from unittest.mock import Mock @@ -69,8 +70,9 @@ def test_tags(run, store): def test_series(run, store): - run["my/series"].append(1, step=1, timestamp=10) - store.log.assert_called_with(metrics={"my/series": 1}, step=1, timestamp=10) + now = time.time() + run["my/series"].append(1, step=1, timestamp=now) + store.log.assert_called_with(metrics={"my/series": 1}, step=1, timestamp=now) run["my/series"].append({"foo": 1, "bar": 2}, step=2) store.log.assert_called_with(metrics={"my/series/foo": 1, "my/series/bar": 2}, step=2, timestamp=None) From e9855e1dff1944531041aa2c168d7b8a60462e10 Mon Sep 17 00:00:00 2001 From: Krzysztof Godlewski Date: Mon, 13 Jan 2025 12:53:47 +0100 Subject: [PATCH 19/21] Add a comment --- src/neptune_scale/api/attribute.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/neptune_scale/api/attribute.py b/src/neptune_scale/api/attribute.py index 144387d9..e3dbbf7d 100644 --- a/src/neptune_scale/api/attribute.py +++ b/src/neptune_scale/api/attribute.py @@ -95,6 +95,8 @@ def log( elif isinstance(timestamp, (float, int)): timestamp = datetime.fromtimestamp(timestamp) + # MetadataSplitter is an iterator, so gather everything into a list instead of iterating over + # it in the critical section, to avoid holding the lock for too long. # TODO: Move splitting into the worker process. Here we should just send messages as they are. chunks = list( MetadataSplitter( From 65c00a45c641adb5dad3be537a34c1b1b75e2a4a Mon Sep 17 00:00:00 2001 From: Krzysztof Godlewski Date: Fri, 6 Dec 2024 15:38:11 +0100 Subject: [PATCH 20/21] Rename `InternalQueueFeederThread` Rename the class and friends to `OperationDispatcherThread` --- src/neptune_scale/sync/sync_process.py | 40 ++++++++++++++------------ 1 file changed, 22 insertions(+), 18 deletions(-) diff --git a/src/neptune_scale/sync/sync_process.py b/src/neptune_scale/sync/sync_process.py index 4329a2aa..c227d1f7 100644 --- a/src/neptune_scale/sync/sync_process.py +++ b/src/neptune_scale/sync/sync_process.py @@ -179,7 +179,7 @@ def __init__( ) -> None: super().__init__(name="SyncProcess") - self._external_operations_queue: Queue[SingleOperation] = operations_queue + self._input_operations_queue: Queue[SingleOperation] = operations_queue self._errors_queue: ErrorsQueue = errors_queue self._process_link: ProcessLink = process_link self._api_token: str = api_token @@ -213,7 +213,7 @@ def run(self) -> None: family=self._family, api_token=self._api_token, errors_queue=self._errors_queue, - external_operations_queue=self._external_operations_queue, + input_queue=self._input_operations_queue, last_queued_seq=self._last_queued_seq, last_ack_seq=self._last_ack_seq, max_queue_size=self._max_queue_size, @@ -244,7 +244,7 @@ def __init__( family: str, mode: Literal["async", "disabled"], errors_queue: ErrorsQueue, - external_operations_queue: multiprocessing.Queue[SingleOperation], + input_queue: multiprocessing.Queue[SingleOperation], last_queued_seq: SharedInt, last_ack_seq: SharedInt, last_ack_timestamp: SharedFloat, @@ -263,9 +263,9 @@ def __init__( last_queued_seq=last_queued_seq, mode=mode, ) - self._external_to_internal_thread = InternalQueueFeederThread( - external=external_operations_queue, - internal=self._internal_operations_queue, + self._operation_dispatcher_thread = OperationDispatcherThread( + input_queue=input_queue, + operations_queue=self._internal_operations_queue, errors_queue=self._errors_queue, ) self._status_tracking_thread = StatusTrackingThread( @@ -280,11 +280,11 @@ def __init__( @property def threads(self) -> tuple[Daemon, ...]: - return self._external_to_internal_thread, self._sync_thread, self._status_tracking_thread + return self._operation_dispatcher_thread, self._sync_thread, self._status_tracking_thread @property def resources(self) -> tuple[Resource, ...]: - return self._external_to_internal_thread, self._sync_thread, self._status_tracking_thread + return self._operation_dispatcher_thread, self._sync_thread, self._status_tracking_thread def interrupt(self) -> None: for thread in self.threads: @@ -304,17 +304,17 @@ def join(self, timeout: Optional[int] = None) -> None: thread.join(timeout=timeout) -class InternalQueueFeederThread(Daemon, Resource): +class OperationDispatcherThread(Daemon, Resource): def __init__( self, - external: multiprocessing.Queue[SingleOperation], - internal: AggregatingQueue, + input_queue: multiprocessing.Queue[SingleOperation], + operations_queue: AggregatingQueue, errors_queue: ErrorsQueue, ) -> None: - super().__init__(name="InternalQueueFeederThread", sleep_time=INTERNAL_QUEUE_FEEDER_THREAD_SLEEP_TIME) + super().__init__(name="OperationDispatcherThread", sleep_time=INTERNAL_QUEUE_FEEDER_THREAD_SLEEP_TIME) - self._external: multiprocessing.Queue[SingleOperation] = external - self._internal: AggregatingQueue = internal + self._input_queue: multiprocessing.Queue[SingleOperation] = input_queue + self._operations_queue: AggregatingQueue = operations_queue self._errors_queue: ErrorsQueue = errors_queue self._latest_unprocessed: Optional[SingleOperation] = None @@ -324,7 +324,7 @@ def get_next(self) -> Optional[SingleOperation]: return self._latest_unprocessed try: - self._latest_unprocessed = self._external.get(timeout=INTERNAL_QUEUE_FEEDER_THREAD_SLEEP_TIME) + self._latest_unprocessed = self._input_queue.get(timeout=INTERNAL_QUEUE_FEEDER_THREAD_SLEEP_TIME) return self._latest_unprocessed except queue.Empty: return None @@ -340,11 +340,15 @@ def work(self) -> None: continue try: - self._internal.put_nowait(operation) + self._operations_queue.put_nowait(operation) self.commit() except queue.Full: - logger.debug("Internal queue is full (%d elements), waiting for free space", self._internal.maxsize) - self._errors_queue.put(NeptuneOperationsQueueMaxSizeExceeded(max_size=self._internal.maxsize)) + logger.debug( + "Operations queue is full (%d elements), waiting for free space", self._operations_queue.maxsize + ) + self._errors_queue.put( + NeptuneOperationsQueueMaxSizeExceeded(max_size=self._operations_queue.maxsize) + ) # Sleep before retry break except Exception as e: From 9c713a0c1b074a01804fa30347aeca663a7ccdab Mon Sep 17 00:00:00 2001 From: Krzysztof Godlewski Date: Fri, 20 Dec 2024 16:58:37 +0100 Subject: [PATCH 21/21] `OperationDispatcherThread` accepts a list of consumers Instead of pushing messages to a single queue, it's now possible to copy the message to an arbitrary list of consumers. --- src/neptune_scale/sync/sync_process.py | 53 ++++++++++++++++---------- 1 file changed, 33 insertions(+), 20 deletions(-) diff --git a/src/neptune_scale/sync/sync_process.py b/src/neptune_scale/sync/sync_process.py index c227d1f7..098be7b2 100644 --- a/src/neptune_scale/sync/sync_process.py +++ b/src/neptune_scale/sync/sync_process.py @@ -6,6 +6,7 @@ import queue import signal import threading +from collections.abc import Iterable from multiprocessing import ( Process, Queue, @@ -16,6 +17,7 @@ Literal, NamedTuple, Optional, + Protocol, TypeVar, ) @@ -38,7 +40,6 @@ NeptuneConnectionLostError, NeptuneFloatValueNanInfUnsupported, NeptuneInternalServerError, - NeptuneOperationsQueueMaxSizeExceeded, NeptuneProjectInvalidName, NeptuneProjectNotFound, NeptuneRetryableError, @@ -235,6 +236,10 @@ def run(self) -> None: logger.info("Data synchronization finished") +class SupportsPutNowait(Protocol): + def put_nowait(self, element: SingleOperation) -> None: ... + + class SyncProcessWorker(WithResources): def __init__( self, @@ -263,11 +268,7 @@ def __init__( last_queued_seq=last_queued_seq, mode=mode, ) - self._operation_dispatcher_thread = OperationDispatcherThread( - input_queue=input_queue, - operations_queue=self._internal_operations_queue, - errors_queue=self._errors_queue, - ) + self._status_tracking_thread = StatusTrackingThread( api_token=api_token, mode=mode, @@ -278,6 +279,12 @@ def __init__( last_ack_timestamp=last_ack_timestamp, ) + self._operation_dispatcher_thread = OperationDispatcherThread( + input_queue=input_queue, + consumers=[self._internal_operations_queue], + errors_queue=self._errors_queue, + ) + @property def threads(self) -> tuple[Daemon, ...]: return self._operation_dispatcher_thread, self._sync_thread, self._status_tracking_thread @@ -305,16 +312,22 @@ def join(self, timeout: Optional[int] = None) -> None: class OperationDispatcherThread(Daemon, Resource): + """Reads incoming messages from a multiprocessing.Queue, and dispatches them to a list of consumers, + which can be of type `queue.Queue`, but also any other object that supports put_nowait() method. + + If any of the consumers' put_nowait() raises queue.Full, the thread will stop processing further operations. + """ + def __init__( self, input_queue: multiprocessing.Queue[SingleOperation], - operations_queue: AggregatingQueue, + consumers: Iterable[SupportsPutNowait], errors_queue: ErrorsQueue, ) -> None: super().__init__(name="OperationDispatcherThread", sleep_time=INTERNAL_QUEUE_FEEDER_THREAD_SLEEP_TIME) self._input_queue: multiprocessing.Queue[SingleOperation] = input_queue - self._operations_queue: AggregatingQueue = operations_queue + self._consumers = tuple(consumers) self._errors_queue: ErrorsQueue = errors_queue self._latest_unprocessed: Optional[SingleOperation] = None @@ -335,22 +348,22 @@ def commit(self) -> None: def work(self) -> None: try: while not self._is_interrupted(): - operation = self.get_next() - if operation is None: + if (operation := self.get_next()) is None: continue try: - self._operations_queue.put_nowait(operation) + for consumer in self._consumers: + consumer.put_nowait(operation) self.commit() - except queue.Full: - logger.debug( - "Operations queue is full (%d elements), waiting for free space", self._operations_queue.maxsize - ) - self._errors_queue.put( - NeptuneOperationsQueueMaxSizeExceeded(max_size=self._operations_queue.maxsize) - ) - # Sleep before retry - break + except queue.Full as e: + # We have two ways to deal with this situation: + # 1. Consider this a fatal error, and stop processing further operations. + # 2. Retry, assuming that any consumer that _did_ manage to receive the operation, is + # idempotent and can handle the same operation again. + # + # Currently, we choose 1. + logger.error("Operation queue overflow. Neptune will not process further operations.") + raise e except Exception as e: self._errors_queue.put(e) self.interrupt()