From aecddeb87934be3414605e949a8c52dcf214cb8c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20So=C5=9Bnicki?= Date: Thu, 9 Jan 2025 19:14:25 +0100 Subject: [PATCH 1/2] fix: retain operation timestamp while batching --- src/neptune_scale/api/attribute.py | 2 +- src/neptune_scale/sync/aggregating_queue.py | 9 +- src/neptune_scale/sync/operations_queue.py | 3 +- src/neptune_scale/sync/queue_element.py | 3 +- tests/unit/test_aggregating_queue.py | 97 +++++++++++++++------ 5 files changed, 79 insertions(+), 35 deletions(-) diff --git a/src/neptune_scale/api/attribute.py b/src/neptune_scale/api/attribute.py index ea433137..3b70672c 100644 --- a/src/neptune_scale/api/attribute.py +++ b/src/neptune_scale/api/attribute.py @@ -100,7 +100,7 @@ def log( ) for operation, metadata_size in splitter: - self._operations_queue.enqueue(operation=operation, size=metadata_size, key=step) + self._operations_queue.enqueue(operation=operation, size=metadata_size, key=(step, timestamp.timestamp())) class Attribute: diff --git a/src/neptune_scale/sync/aggregating_queue.py b/src/neptune_scale/sync/aggregating_queue.py index 8e3fc40e..e63b6c62 100644 --- a/src/neptune_scale/sync/aggregating_queue.py +++ b/src/neptune_scale/sync/aggregating_queue.py @@ -8,7 +8,10 @@ Queue, ) from threading import RLock -from typing import Optional +from typing import ( + Any, + Optional, +) from neptune_api.proto.neptune_pb.ingest.v1.pub.ingest_pb2 import RunOperation @@ -71,7 +74,7 @@ def commit(self) -> None: def get(self) -> BatchedOperations: start = time.monotonic() - batch_operations: dict[Optional[float], RunOperation] = {} + batch_operations: dict[Any, RunOperation] = {} batch_sequence_id: Optional[int] = None batch_timestamp: Optional[float] = None @@ -157,7 +160,7 @@ def get(self) -> BatchedOperations: ) -def create_run_batch(operations: dict[Optional[float], RunOperation]) -> RunOperation: +def create_run_batch(operations: dict[Any, RunOperation]) -> RunOperation: if len(operations) == 1: return next(iter(operations.values())) diff --git a/src/neptune_scale/sync/operations_queue.py b/src/neptune_scale/sync/operations_queue.py index 0069b472..dd1dcc60 100644 --- a/src/neptune_scale/sync/operations_queue.py +++ b/src/neptune_scale/sync/operations_queue.py @@ -6,6 +6,7 @@ from time import monotonic from typing import ( TYPE_CHECKING, + Any, Optional, ) @@ -57,7 +58,7 @@ def last_timestamp(self) -> Optional[float]: with self._lock: return self._last_timestamp - def enqueue(self, *, operation: RunOperation, size: Optional[int] = None, key: Optional[float] = None) -> None: + def enqueue(self, *, operation: RunOperation, size: Optional[int] = None, key: Optional[Any] = None) -> None: try: is_metadata_update = operation.HasField("update") serialized_operation = operation.SerializeToString() diff --git a/src/neptune_scale/sync/queue_element.py b/src/neptune_scale/sync/queue_element.py index 1c37ff11..f2752079 100644 --- a/src/neptune_scale/sync/queue_element.py +++ b/src/neptune_scale/sync/queue_element.py @@ -1,6 +1,7 @@ __all__ = ("BatchedOperations", "SingleOperation") from typing import ( + Any, NamedTuple, Optional, ) @@ -27,4 +28,4 @@ class SingleOperation(NamedTuple): # Size of the metadata in the operation (without project, family, run_id etc.) metadata_size: Optional[int] # Update metadata key - batch_key: Optional[float] + batch_key: Optional[Any] diff --git a/tests/unit/test_aggregating_queue.py b/tests/unit/test_aggregating_queue.py index de9394a0..c2c5a3b1 100644 --- a/tests/unit/test_aggregating_queue.py +++ b/tests/unit/test_aggregating_queue.py @@ -6,6 +6,7 @@ import pytest from freezegun import freeze_time +from google.protobuf.timestamp_pb2 import Timestamp from neptune_api.proto.neptune_pb.ingest.v1.common_pb2 import Run as CreateRun from neptune_api.proto.neptune_pb.ingest.v1.common_pb2 import ( Step, @@ -189,21 +190,23 @@ def test__queue_element_size_limit_with_different_steps(): update2 = UpdateRunSnapshot(step=Step(whole=2), assign={f"bb{i}": Value(int64=(i * 25)) for i in range(2)}) operation1 = RunOperation(update=update1) operation2 = RunOperation(update=update2) + timestamp1 = time.process_time() + timestamp2 = timestamp1 + 1 element1 = SingleOperation( sequence_id=1, - timestamp=time.process_time(), + timestamp=timestamp1, operation=operation1.SerializeToString(), is_batchable=True, metadata_size=update1.ByteSize(), - batch_key=1.0, + batch_key=(1.0, timestamp1), ) element2 = SingleOperation( sequence_id=2, - timestamp=time.process_time(), + timestamp=timestamp2, operation=operation2.SerializeToString(), is_batchable=True, metadata_size=update2.ByteSize(), - batch_key=2.0, + batch_key=(2.0, timestamp2), ) # and @@ -361,21 +364,23 @@ def test__merge_same_key(): operation2 = RunOperation(update=update2, project="project", run_id="run_id") # and + timestamp0 = time.process_time() + batch_key = (1.0, timestamp0) element1 = SingleOperation( sequence_id=1, - timestamp=time.process_time(), + timestamp=timestamp0, operation=operation1.SerializeToString(), is_batchable=True, metadata_size=update1.ByteSize(), - batch_key=1.0, + batch_key=batch_key, ) element2 = SingleOperation( sequence_id=2, - timestamp=time.process_time(), + timestamp=timestamp0, operation=operation2.SerializeToString(), is_batchable=True, metadata_size=update2.ByteSize(), - batch_key=1.0, + batch_key=batch_key, ) # and @@ -390,7 +395,7 @@ def test__merge_same_key(): # then assert result.sequence_id == 2 - assert result.timestamp == element2.timestamp + assert result.timestamp == timestamp0 # and batch = RunOperation() @@ -403,7 +408,7 @@ def test__merge_same_key(): @freeze_time("2024-09-01") -def test__merge_two_different_steps(): +def test__batch_two_different_steps(): # given update1 = UpdateRunSnapshot(step=Step(whole=1, micro=0), assign={f"aa{i}": Value(int64=(i * 97)) for i in range(2)}) update2 = UpdateRunSnapshot(step=Step(whole=2, micro=0), assign={f"bb{i}": Value(int64=(i * 25)) for i in range(2)}) @@ -413,21 +418,23 @@ def test__merge_two_different_steps(): operation2 = RunOperation(update=update2, project="project", run_id="run_id") # and + timestamp1 = time.process_time() + timestamp2 = timestamp1 + 1 element1 = SingleOperation( sequence_id=1, - timestamp=time.process_time(), + timestamp=timestamp1, operation=operation1.SerializeToString(), is_batchable=True, metadata_size=0, - batch_key=1.0, + batch_key=(1.0, timestamp1), ) element2 = SingleOperation( sequence_id=2, - timestamp=time.process_time(), + timestamp=timestamp2, operation=operation2.SerializeToString(), is_batchable=True, metadata_size=0, - batch_key=2.0, + batch_key=(2.0, timestamp2), ) # and @@ -454,7 +461,7 @@ def test__merge_two_different_steps(): @freeze_time("2024-09-01") -def test__merge_step_with_none(): +def test__batch_step_with_none(): # given update1 = UpdateRunSnapshot(step=Step(whole=1, micro=0), assign={f"aa{i}": Value(int64=(i * 97)) for i in range(2)}) update2 = UpdateRunSnapshot(step=None, assign={f"bb{i}": Value(int64=(i * 25)) for i in range(2)}) @@ -464,13 +471,14 @@ def test__merge_step_with_none(): operation2 = RunOperation(update=update2, project="project", run_id="run_id") # and + timestamp1 = time.process_time() element1 = SingleOperation( sequence_id=1, - timestamp=time.process_time(), + timestamp=timestamp1, operation=operation1.SerializeToString(), is_batchable=True, metadata_size=0, - batch_key=1.0, + batch_key=(1.0, timestamp1), ) element2 = SingleOperation( sequence_id=2, @@ -507,10 +515,27 @@ def test__merge_step_with_none(): @freeze_time("2024-09-01") def test__merge_two_steps_two_metrics(): # given - update1a = UpdateRunSnapshot(step=Step(whole=1, micro=0), assign={"aa": Value(int64=10)}) - update2a = UpdateRunSnapshot(step=Step(whole=2, micro=0), assign={"aa": Value(int64=20)}) - update1b = UpdateRunSnapshot(step=Step(whole=1, micro=0), assign={"bb": Value(int64=100)}) - update2b = UpdateRunSnapshot(step=Step(whole=2, micro=0), assign={"bb": Value(int64=200)}) + timestamp0 = int(time.process_time()) + update1a = UpdateRunSnapshot( + step=Step(whole=1, micro=0), + timestamp=Timestamp(seconds=timestamp0 + 1, nanos=0), + assign={"aa": Value(int64=10)}, + ) + update2a = UpdateRunSnapshot( + step=Step(whole=2, micro=0), + timestamp=Timestamp(seconds=timestamp0 + 2, nanos=0), + assign={"aa": Value(int64=20)}, + ) + update1b = UpdateRunSnapshot( + step=Step(whole=1, micro=0), + timestamp=Timestamp(seconds=timestamp0 + 3, nanos=0), + assign={"bb": Value(int64=100)}, + ) + update2b = UpdateRunSnapshot( + step=Step(whole=2, micro=0), + timestamp=Timestamp(seconds=timestamp0 + 4, nanos=0), + assign={"bb": Value(int64=200)}, + ) # and operations = [ @@ -522,13 +547,13 @@ def test__merge_two_steps_two_metrics(): elements = [ SingleOperation( sequence_id=sequence_id, - timestamp=time.process_time(), + timestamp=timestamp0 + sequence_id, operation=operation.SerializeToString(), is_batchable=True, metadata_size=0, - batch_key=batch_key, + batch_key=(step, timestamp0 + sequence_id), ) - for sequence_id, batch_key, operation in [ + for sequence_id, step, operation in [ (1, 1.0, operations[0]), (2, 2.0, operations[1]), (3, 1.0, operations[2]), @@ -554,13 +579,27 @@ def test__merge_two_steps_two_metrics(): batch = RunOperation() batch.ParseFromString(result.operation) - update1_merged = UpdateRunSnapshot( - step=Step(whole=1, micro=0), assign={"aa": Value(int64=10), "bb": Value(int64=100)} + update1 = UpdateRunSnapshot( + step=Step(whole=1, micro=0), + timestamp=Timestamp(seconds=timestamp0 + 1, nanos=0), + assign={"aa": Value(int64=10)}, + ) + update2 = UpdateRunSnapshot( + step=Step(whole=1, micro=0), + timestamp=Timestamp(seconds=timestamp0 + 3, nanos=0), + assign={"bb": Value(int64=100)}, + ) + update3 = UpdateRunSnapshot( + step=Step(whole=2, micro=0), + timestamp=Timestamp(seconds=timestamp0 + 2, nanos=0), + assign={"aa": Value(int64=20)}, ) - update2_merged = UpdateRunSnapshot( - step=Step(whole=2, micro=0), assign={"aa": Value(int64=20), "bb": Value(int64=200)} + update4 = UpdateRunSnapshot( + step=Step(whole=2, micro=0), + timestamp=Timestamp(seconds=timestamp0 + 4, nanos=0), + assign={"bb": Value(int64=200)}, ) assert batch.project == "project" assert batch.run_id == "run_id" - assert batch.update_batch.snapshots == [update1_merged, update2_merged] + assert batch.update_batch.snapshots == [update1, update2, update3, update4] From f9a3219d037c090281228fb67db3713c67e8190d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20So=C5=9Bnicki?= Date: Thu, 9 Jan 2025 19:53:27 +0100 Subject: [PATCH 2/2] fix: remove merging, exclusively perform batching --- src/neptune_scale/api/attribute.py | 2 +- src/neptune_scale/sync/aggregating_queue.py | 67 +++++---------------- src/neptune_scale/sync/operations_queue.py | 4 +- src/neptune_scale/sync/queue_element.py | 3 - tests/unit/test_aggregating_queue.py | 59 +++--------------- tests/unit/test_sync_process.py | 1 - 6 files changed, 28 insertions(+), 108 deletions(-) diff --git a/src/neptune_scale/api/attribute.py b/src/neptune_scale/api/attribute.py index 3b70672c..734bf3e5 100644 --- a/src/neptune_scale/api/attribute.py +++ b/src/neptune_scale/api/attribute.py @@ -100,7 +100,7 @@ def log( ) for operation, metadata_size in splitter: - self._operations_queue.enqueue(operation=operation, size=metadata_size, key=(step, timestamp.timestamp())) + self._operations_queue.enqueue(operation=operation, size=metadata_size) class Attribute: diff --git a/src/neptune_scale/sync/aggregating_queue.py b/src/neptune_scale/sync/aggregating_queue.py index e63b6c62..0bf47226 100644 --- a/src/neptune_scale/sync/aggregating_queue.py +++ b/src/neptune_scale/sync/aggregating_queue.py @@ -8,10 +8,7 @@ Queue, ) from threading import RLock -from typing import ( - Any, - Optional, -) +from typing import Optional from neptune_api.proto.neptune_pb.ingest.v1.pub.ingest_pb2 import RunOperation @@ -74,7 +71,7 @@ def commit(self) -> None: def get(self) -> BatchedOperations: start = time.monotonic() - batch_operations: dict[Any, RunOperation] = {} + batch_operations: list[RunOperation] = [] batch_sequence_id: Optional[int] = None batch_timestamp: Optional[float] = None @@ -98,7 +95,7 @@ def get(self) -> BatchedOperations: if not batch_operations: new_operation = RunOperation() new_operation.ParseFromString(element.operation) - batch_operations[element.batch_key] = new_operation + batch_operations.append(new_operation) batch_bytes += len(element.operation) else: if not element.is_batchable: @@ -113,10 +110,7 @@ def get(self) -> BatchedOperations: new_operation = RunOperation() new_operation.ParseFromString(element.operation) - if element.batch_key not in batch_operations: - batch_operations[element.batch_key] = new_operation - else: - merge_run_operation(batch_operations[element.batch_key], new_operation) + batch_operations.append(new_operation) batch_bytes += element.metadata_size batch_sequence_id = element.sequence_id @@ -160,54 +154,25 @@ def get(self) -> BatchedOperations: ) -def create_run_batch(operations: dict[Any, RunOperation]) -> RunOperation: +def create_run_batch(operations: list[RunOperation]) -> RunOperation: + if not operations: + raise Empty + if len(operations) == 1: - return next(iter(operations.values())) + return operations[0] - batch = None - for _, operation in sorted(operations.items(), key=lambda x: (x[0] is not None, x[0])): - if batch is None: - batch = RunOperation() - batch.project = operation.project - batch.run_id = operation.run_id - batch.create_missing_project = operation.create_missing_project - batch.api_key = operation.api_key + head = operations[0] + batch = RunOperation() + batch.project = head.project + batch.run_id = head.run_id + batch.create_missing_project = head.create_missing_project + batch.api_key = head.api_key + for operation in operations: operation_type = operation.WhichOneof("operation") if operation_type == "update": batch.update_batch.snapshots.append(operation.update) else: raise ValueError("Cannot batch operation of type %s", operation_type) - if batch is None: - raise Empty return batch - - -def merge_run_operation(batch: RunOperation, operation: RunOperation) -> None: - """ - Merge the `operation` into `batch`, taking into account the special case of `modify_sets`. - - Protobuf merges existing map keys by simply overwriting values, instead of calling - `MergeFrom` on the existing value, eg: A['foo'] = B['foo']. - - We want this instead: - - batch = {'sys/tags': 'string': { 'values': {'foo': ADD}}} - operation = {'sys/tags': 'string': { 'values': {'bar': ADD}}} - result = {'sys/tags': 'string': { 'values': {'foo': ADD, 'bar': ADD}}} - - If we called `batch.MergeFrom(operation)` we would get an overwritten value: - result = {'sys/tags': 'string': { 'values': {'bar': ADD}}} - - This function ensures that the `modify_sets` are merged correctly, leaving the default - behaviour for all other fields. - """ - - modify_sets = operation.update.modify_sets - operation.update.ClearField("modify_sets") - - batch.MergeFrom(operation) - - for k, v in modify_sets.items(): - batch.update.modify_sets[k].MergeFrom(v) diff --git a/src/neptune_scale/sync/operations_queue.py b/src/neptune_scale/sync/operations_queue.py index dd1dcc60..d518da11 100644 --- a/src/neptune_scale/sync/operations_queue.py +++ b/src/neptune_scale/sync/operations_queue.py @@ -6,7 +6,6 @@ from time import monotonic from typing import ( TYPE_CHECKING, - Any, Optional, ) @@ -58,7 +57,7 @@ def last_timestamp(self) -> Optional[float]: with self._lock: return self._last_timestamp - def enqueue(self, *, operation: RunOperation, size: Optional[int] = None, key: Optional[Any] = None) -> None: + def enqueue(self, *, operation: RunOperation, size: Optional[int] = None) -> None: try: is_metadata_update = operation.HasField("update") serialized_operation = operation.SerializeToString() @@ -76,7 +75,6 @@ def enqueue(self, *, operation: RunOperation, size: Optional[int] = None, key: O operation=serialized_operation, metadata_size=size, is_batchable=is_metadata_update, - batch_key=key, ), block=True, timeout=None, diff --git a/src/neptune_scale/sync/queue_element.py b/src/neptune_scale/sync/queue_element.py index f2752079..521f89e4 100644 --- a/src/neptune_scale/sync/queue_element.py +++ b/src/neptune_scale/sync/queue_element.py @@ -1,7 +1,6 @@ __all__ = ("BatchedOperations", "SingleOperation") from typing import ( - Any, NamedTuple, Optional, ) @@ -27,5 +26,3 @@ class SingleOperation(NamedTuple): is_batchable: bool # Size of the metadata in the operation (without project, family, run_id etc.) metadata_size: Optional[int] - # Update metadata key - batch_key: Optional[Any] diff --git a/tests/unit/test_aggregating_queue.py b/tests/unit/test_aggregating_queue.py index c2c5a3b1..0f736f57 100644 --- a/tests/unit/test_aggregating_queue.py +++ b/tests/unit/test_aggregating_queue.py @@ -33,7 +33,6 @@ def test__simple(): operation=operation.SerializeToString(), is_batchable=True, metadata_size=update.ByteSize(), - batch_key=None, ) # and @@ -61,7 +60,6 @@ def test__max_size_exceeded(): operation=operation1.SerializeToString(), is_batchable=True, metadata_size=0, - batch_key=None, ) element2 = SingleOperation( sequence_id=2, @@ -69,7 +67,6 @@ def test__max_size_exceeded(): operation=operation2.SerializeToString(), is_batchable=True, metadata_size=0, - batch_key=None, ) # and @@ -109,7 +106,6 @@ def test__batch_size_limit(): operation=operation1.SerializeToString(), is_batchable=True, metadata_size=update1.ByteSize(), - batch_key=None, ) element2 = SingleOperation( sequence_id=2, @@ -117,7 +113,6 @@ def test__batch_size_limit(): operation=operation2.SerializeToString(), is_batchable=True, metadata_size=update2.ByteSize(), - batch_key=None, ) # and @@ -149,7 +144,6 @@ def test__batching(): operation=operation1.SerializeToString(), is_batchable=True, metadata_size=update1.ByteSize(), - batch_key=None, ) element2 = SingleOperation( sequence_id=2, @@ -157,7 +151,6 @@ def test__batching(): operation=operation2.SerializeToString(), is_batchable=True, metadata_size=update2.ByteSize(), - batch_key=None, ) # and @@ -180,7 +173,8 @@ def test__batching(): assert batch.project == "project" assert batch.run_id == "run_id" - assert all(k in batch.update.assign for k in ["aa0", "aa1", "bb0", "bb1"]) + assert all(k in batch.update_batch.snapshots[0].assign for k in ["aa0", "aa1"]) + assert all(k in batch.update_batch.snapshots[1].assign for k in ["bb0", "bb1"]) @freeze_time("2024-09-01") @@ -198,7 +192,6 @@ def test__queue_element_size_limit_with_different_steps(): operation=operation1.SerializeToString(), is_batchable=True, metadata_size=update1.ByteSize(), - batch_key=(1.0, timestamp1), ) element2 = SingleOperation( sequence_id=2, @@ -206,7 +199,6 @@ def test__queue_element_size_limit_with_different_steps(): operation=operation2.SerializeToString(), is_batchable=True, metadata_size=update2.ByteSize(), - batch_key=(2.0, timestamp2), ) # and @@ -238,7 +230,6 @@ def test__not_merge_two_run_creation(): operation=operation1.SerializeToString(), is_batchable=False, metadata_size=0, - batch_key=None, ) element2 = SingleOperation( sequence_id=2, @@ -246,7 +237,6 @@ def test__not_merge_two_run_creation(): operation=operation2.SerializeToString(), is_batchable=False, metadata_size=0, - batch_key=None, ) # and @@ -304,7 +294,6 @@ def test__not_merge_run_creation_with_metadata_update(): operation=operation1.SerializeToString(), is_batchable=False, metadata_size=0, - batch_key=None, ) element2 = SingleOperation( sequence_id=2, @@ -312,7 +301,6 @@ def test__not_merge_run_creation_with_metadata_update(): operation=operation2.SerializeToString(), is_batchable=True, metadata_size=update.ByteSize(), - batch_key=None, ) # and @@ -354,7 +342,7 @@ def test__not_merge_run_creation_with_metadata_update(): @freeze_time("2024-09-01") -def test__merge_same_key(): +def test__batch_same_key(): # given update1 = UpdateRunSnapshot(step=Step(whole=1, micro=0), assign={f"aa{i}": Value(int64=(i * 97)) for i in range(2)}) update2 = UpdateRunSnapshot(step=Step(whole=1, micro=0), assign={f"bb{i}": Value(int64=(i * 25)) for i in range(2)}) @@ -365,14 +353,12 @@ def test__merge_same_key(): # and timestamp0 = time.process_time() - batch_key = (1.0, timestamp0) element1 = SingleOperation( sequence_id=1, timestamp=timestamp0, operation=operation1.SerializeToString(), is_batchable=True, metadata_size=update1.ByteSize(), - batch_key=batch_key, ) element2 = SingleOperation( sequence_id=2, @@ -380,7 +366,6 @@ def test__merge_same_key(): operation=operation2.SerializeToString(), is_batchable=True, metadata_size=update2.ByteSize(), - batch_key=batch_key, ) # and @@ -403,8 +388,10 @@ def test__merge_same_key(): assert batch.project == "project" assert batch.run_id == "run_id" - assert batch.update.step == Step(whole=1, micro=0) - assert all(k in batch.update.assign for k in ["aa0", "aa1", "bb0", "bb1"]) + assert batch.update_batch.snapshots[0].step == Step(whole=1, micro=0) + assert batch.update_batch.snapshots[1].step == Step(whole=1, micro=0) + assert all(k in batch.update_batch.snapshots[0].assign for k in ["aa0", "aa1"]) + assert all(k in batch.update_batch.snapshots[1].assign for k in ["bb0", "bb1"]) @freeze_time("2024-09-01") @@ -426,7 +413,6 @@ def test__batch_two_different_steps(): operation=operation1.SerializeToString(), is_batchable=True, metadata_size=0, - batch_key=(1.0, timestamp1), ) element2 = SingleOperation( sequence_id=2, @@ -434,7 +420,6 @@ def test__batch_two_different_steps(): operation=operation2.SerializeToString(), is_batchable=True, metadata_size=0, - batch_key=(2.0, timestamp2), ) # and @@ -478,7 +463,6 @@ def test__batch_step_with_none(): operation=operation1.SerializeToString(), is_batchable=True, metadata_size=0, - batch_key=(1.0, timestamp1), ) element2 = SingleOperation( sequence_id=2, @@ -486,7 +470,6 @@ def test__batch_step_with_none(): operation=operation2.SerializeToString(), is_batchable=True, metadata_size=0, - batch_key=None, ) # and @@ -509,11 +492,11 @@ def test__batch_step_with_none(): assert batch.project == "project" assert batch.run_id == "run_id" - assert batch.update_batch.snapshots == [update2, update1] # None is always first + assert batch.update_batch.snapshots == [update1, update2] @freeze_time("2024-09-01") -def test__merge_two_steps_two_metrics(): +def test__batch_two_steps_two_metrics(): # given timestamp0 = int(time.process_time()) update1a = UpdateRunSnapshot( @@ -551,7 +534,6 @@ def test__merge_two_steps_two_metrics(): operation=operation.SerializeToString(), is_batchable=True, metadata_size=0, - batch_key=(step, timestamp0 + sequence_id), ) for sequence_id, step, operation in [ (1, 1.0, operations[0]), @@ -579,27 +561,6 @@ def test__merge_two_steps_two_metrics(): batch = RunOperation() batch.ParseFromString(result.operation) - update1 = UpdateRunSnapshot( - step=Step(whole=1, micro=0), - timestamp=Timestamp(seconds=timestamp0 + 1, nanos=0), - assign={"aa": Value(int64=10)}, - ) - update2 = UpdateRunSnapshot( - step=Step(whole=1, micro=0), - timestamp=Timestamp(seconds=timestamp0 + 3, nanos=0), - assign={"bb": Value(int64=100)}, - ) - update3 = UpdateRunSnapshot( - step=Step(whole=2, micro=0), - timestamp=Timestamp(seconds=timestamp0 + 2, nanos=0), - assign={"aa": Value(int64=20)}, - ) - update4 = UpdateRunSnapshot( - step=Step(whole=2, micro=0), - timestamp=Timestamp(seconds=timestamp0 + 4, nanos=0), - assign={"bb": Value(int64=200)}, - ) - assert batch.project == "project" assert batch.run_id == "run_id" - assert batch.update_batch.snapshots == [update1, update2, update3, update4] + assert batch.update_batch.snapshots == [update1a, update2a, update1b, update2b] diff --git a/tests/unit/test_sync_process.py b/tests/unit/test_sync_process.py index 1709510a..25b17918 100644 --- a/tests/unit/test_sync_process.py +++ b/tests/unit/test_sync_process.py @@ -35,7 +35,6 @@ def single_operation(update: UpdateRunSnapshot, sequence_id): operation=operation.SerializeToString(), is_batchable=True, metadata_size=update.ByteSize(), - batch_key=None, )