diff --git a/data-prepper-plugins/dynamodb-source-coordination-store/src/main/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbClientWrapper.java b/data-prepper-plugins/dynamodb-source-coordination-store/src/main/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbClientWrapper.java index 9ed4973fc9..189fbf01f6 100644 --- a/data-prepper-plugins/dynamodb-source-coordination-store/src/main/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbClientWrapper.java +++ b/data-prepper-plugins/dynamodb-source-coordination-store/src/main/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbClientWrapper.java @@ -237,7 +237,8 @@ public Optional getAvailablePartition(final String own final Duration ownershipTimeout, final SourcePartitionStatus sourcePartitionStatus, final String sourceStatusCombinationKey, - final int pageLimit) { + final int pageLimit, + final Duration ttl) { try { final DynamoDbIndex sourceStatusIndex = table.index(SOURCE_STATUS_COMBINATION_KEY_GLOBAL_SECONDARY_INDEX); @@ -273,8 +274,11 @@ public Optional getAvailablePartition(final String own item.setSourcePartitionStatus(SourcePartitionStatus.ASSIGNED); item.setSourceStatusCombinationKey(String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, item.getSourceIdentifier(), SourcePartitionStatus.ASSIGNED)); item.setPartitionPriority(partitionOwnershipTimeout.toString()); - final boolean acquired = this.tryAcquirePartitionItem(item); + if (Objects.nonNull(ttl)) { + item.setExpirationTime(Instant.now().plus(ttl).getEpochSecond()); + } + final boolean acquired = this.tryAcquirePartitionItem(item); if (acquired) { return Optional.of(item); } diff --git a/data-prepper-plugins/dynamodb-source-coordination-store/src/main/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbSourceCoordinationStore.java b/data-prepper-plugins/dynamodb-source-coordination-store/src/main/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbSourceCoordinationStore.java index ffc10f4b64..25dde68499 100644 --- a/data-prepper-plugins/dynamodb-source-coordination-store/src/main/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbSourceCoordinationStore.java +++ b/data-prepper-plugins/dynamodb-source-coordination-store/src/main/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbSourceCoordinationStore.java @@ -98,7 +98,7 @@ public boolean tryCreatePartitionItem(final String sourceIdentifier, public Optional tryAcquireAvailablePartition(final String sourceIdentifier, final String ownerId, final Duration ownershipTimeout) { final Optional acquiredAssignedItem = dynamoDbClientWrapper.getAvailablePartition( ownerId, ownershipTimeout, SourcePartitionStatus.ASSIGNED, - String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.ASSIGNED), 1); + String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.ASSIGNED), 1, dynamoStoreSettings.getTtl()); if (acquiredAssignedItem.isPresent()) { return acquiredAssignedItem; @@ -106,7 +106,7 @@ public Optional tryAcquireAvailablePartition(final Str final Optional acquiredUnassignedItem = dynamoDbClientWrapper.getAvailablePartition( ownerId, ownershipTimeout, SourcePartitionStatus.UNASSIGNED, - String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.UNASSIGNED), 5); + String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.UNASSIGNED), 5, dynamoStoreSettings.getTtl()); if (acquiredUnassignedItem.isPresent()) { return acquiredUnassignedItem; @@ -114,7 +114,7 @@ public Optional tryAcquireAvailablePartition(final Str return dynamoDbClientWrapper.getAvailablePartition( ownerId, ownershipTimeout, SourcePartitionStatus.CLOSED, - String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.CLOSED), 1); + String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.CLOSED), 1, dynamoStoreSettings.getTtl()); } @Override diff --git a/data-prepper-plugins/dynamodb-source-coordination-store/src/test/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbClientWrapperTest.java b/data-prepper-plugins/dynamodb-source-coordination-store/src/test/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbClientWrapperTest.java index 8740086539..d7c340c6a0 100644 --- a/data-prepper-plugins/dynamodb-source-coordination-store/src/test/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbClientWrapperTest.java +++ b/data-prepper-plugins/dynamodb-source-coordination-store/src/test/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbClientWrapperTest.java @@ -62,6 +62,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.notNullValue; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; @@ -464,8 +465,10 @@ void getAvailablePartition_with_no_items_from_query_returns_empty_optional(final final int pageLimit = new Random().nextInt(20); + final Duration ttl = Duration.ofSeconds(new Random().nextInt(5)+10); + final Optional result = objectUnderTest.getAvailablePartition( - ownerId, ownershipTimeout, SourcePartitionStatus.valueOf(sourcePartitionStatus), sourceStatusCombinationKey, pageLimit); + ownerId, ownershipTimeout, SourcePartitionStatus.valueOf(sourcePartitionStatus), sourceStatusCombinationKey, pageLimit, ttl); assertThat(result.isEmpty(), equalTo(true)); @@ -513,8 +516,10 @@ void getAvailablePartition_will_continue_until_tryAcquirePartition_succeeds(fina final DynamoDbClientWrapper objectUnderTest = createObjectUnderTest(); reflectivelySetField(objectUnderTest, "table", table); + final Duration ttl = Duration.ofSeconds(new Random().nextInt(5)+10); + final Optional result = objectUnderTest.getAvailablePartition( - ownerId, ownershipTimeout, SourcePartitionStatus.valueOf(sourcePartitionStatus), sourceStatusCombinationKey, new Random().nextInt(20)); + ownerId, ownershipTimeout, SourcePartitionStatus.valueOf(sourcePartitionStatus), sourceStatusCombinationKey, new Random().nextInt(20), ttl); assertThat(result.isPresent(), equalTo(true)); assertThat(result.get(), equalTo(acquiredItem)); @@ -531,6 +536,10 @@ void getAvailablePartition_will_continue_until_tryAcquirePartition_succeeds(fina assertThat(newPartitionOwnershipTimeout.isAfter(now.plus(ownershipTimeout)), equalTo(true)); + final ArgumentCaptor expiryTimeArgumentCaptor = ArgumentCaptor.forClass(Long.class); + verify(acquiredItem).setExpirationTime(expiryTimeArgumentCaptor.capture()); + assertThat(expiryTimeArgumentCaptor.getValue(), greaterThan(Instant.now().getEpochSecond())); + verify(acquiredItem).setPartitionPriority(newPartitionOwnershipTimeout.toString()); } @@ -574,8 +583,10 @@ void getAvailablePartition_with_multiple_pages_continue_until_tryAcquirePartitio final DynamoDbClientWrapper objectUnderTest = createObjectUnderTest(); reflectivelySetField(objectUnderTest, "table", table); + final Duration ttl = Duration.ofSeconds(new Random().nextInt(5)+10); + final Optional result = objectUnderTest.getAvailablePartition( - ownerId, ownershipTimeout, SourcePartitionStatus.valueOf(sourcePartitionStatus), sourceStatusCombinationKey, new Random().nextInt(20)); + ownerId, ownershipTimeout, SourcePartitionStatus.valueOf(sourcePartitionStatus), sourceStatusCombinationKey, new Random().nextInt(20), ttl); assertThat(result.isPresent(), equalTo(true)); assertThat(result.get(), equalTo(acquiredItem)); @@ -593,6 +604,10 @@ void getAvailablePartition_with_multiple_pages_continue_until_tryAcquirePartitio assertThat(newPartitionOwnershipTimeout.isAfter(now.plus(ownershipTimeout)), equalTo(true)); verify(acquiredItem).setPartitionPriority(newPartitionOwnershipTimeout.toString()); + + final ArgumentCaptor expiryTimeArgumentCaptor = ArgumentCaptor.forClass(Long.class); + verify(acquiredItem).setExpirationTime(expiryTimeArgumentCaptor.capture()); + assertThat(expiryTimeArgumentCaptor.getValue(), greaterThan(Instant.now().getEpochSecond())); } @ParameterizedTest @@ -635,8 +650,10 @@ void getAvailablePartition_with_multiple_pages_will_iterate_through_all_items_wi final DynamoDbClientWrapper objectUnderTest = createObjectUnderTest(); reflectivelySetField(objectUnderTest, "table", table); + final Duration ttl = Duration.ofSeconds(new Random().nextInt(5)+10); + final Optional result = objectUnderTest.getAvailablePartition( - ownerId, ownershipTimeout, SourcePartitionStatus.valueOf(sourcePartitionStatus), sourceStatusCombinationKey, new Random().nextInt(20)); + ownerId, ownershipTimeout, SourcePartitionStatus.valueOf(sourcePartitionStatus), sourceStatusCombinationKey, new Random().nextInt(20), ttl); assertThat(result.isEmpty(), equalTo(true)); @@ -653,6 +670,10 @@ void getAvailablePartition_with_multiple_pages_will_iterate_through_all_items_wi assertThat(newPartitionOwnershipTimeout.isAfter(now.plus(ownershipTimeout)), equalTo(true)); verify(acquiredItem).setPartitionPriority(newPartitionOwnershipTimeout.toString()); + + final ArgumentCaptor expiryTimeArgumentCaptor = ArgumentCaptor.forClass(Long.class); + verify(acquiredItem).setExpirationTime(expiryTimeArgumentCaptor.capture()); + assertThat(expiryTimeArgumentCaptor.getValue(), greaterThan(Instant.now().getEpochSecond())); } @Test @@ -681,7 +702,7 @@ void getAvailablePartition_with_assigned_partition_with_unexpired_partitionOwner reflectivelySetField(objectUnderTest, "table", table); final Optional result = objectUnderTest.getAvailablePartition( - ownerId, ownershipTimeout, SourcePartitionStatus.ASSIGNED, sourceStatusCombinationKey, new Random().nextInt(20)); + ownerId, ownershipTimeout, SourcePartitionStatus.ASSIGNED, sourceStatusCombinationKey, new Random().nextInt(20), Duration.ofSeconds(new Random().nextInt())); assertThat(result.isEmpty(), equalTo(true)); @@ -716,7 +737,7 @@ void getAvailablePartition_with_closed_partition_with_unreached_reOpenAt_time_re reflectivelySetField(objectUnderTest, "table", table); final Optional result = objectUnderTest.getAvailablePartition( - ownerId, ownershipTimeout, SourcePartitionStatus.CLOSED, sourceStatusCombinationKey, new Random().nextInt(20)); + ownerId, ownershipTimeout, SourcePartitionStatus.CLOSED, sourceStatusCombinationKey, new Random().nextInt(20), Duration.ofSeconds(new Random().nextInt())); assertThat(result.isEmpty(), equalTo(true)); diff --git a/data-prepper-plugins/dynamodb-source-coordination-store/src/test/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbSourceCoordinationStoreTest.java b/data-prepper-plugins/dynamodb-source-coordination-store/src/test/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbSourceCoordinationStoreTest.java index 9730d028ab..2150fa55c8 100644 --- a/data-prepper-plugins/dynamodb-source-coordination-store/src/test/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbSourceCoordinationStoreTest.java +++ b/data-prepper-plugins/dynamodb-source-coordination-store/src/test/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbSourceCoordinationStoreTest.java @@ -245,21 +245,23 @@ void getAvailablePartition_with_no_item_acquired_returns_empty_optional() { final String ownerId = UUID.randomUUID().toString(); final String sourceIdentifier = UUID.randomUUID().toString(); final Duration ownershipTimeout = Duration.ofMinutes(2); + final Duration ttl = Duration.ofSeconds(new Random().nextInt(5)+10); + given(dynamoStoreSettings.getTtl()).willReturn(ttl); given(dynamoDbClientWrapper.getAvailablePartition(ownerId, ownershipTimeout, SourcePartitionStatus.ASSIGNED, String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.ASSIGNED), - 1)) + 1, ttl)) .willReturn(Optional.empty()); given(dynamoDbClientWrapper.getAvailablePartition(ownerId, ownershipTimeout, SourcePartitionStatus.CLOSED, String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.CLOSED), - 1)) + 1, ttl)) .willReturn(Optional.empty()); given(dynamoDbClientWrapper.getAvailablePartition(ownerId, ownershipTimeout, SourcePartitionStatus.UNASSIGNED, String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.UNASSIGNED), - 5)) + 5, ttl)) .willReturn(Optional.empty()); final Optional result = createObjectUnderTest().tryAcquireAvailablePartition(sourceIdentifier, ownerId, ownershipTimeout); @@ -272,13 +274,14 @@ void getAvailablePartition_with_acquired_ASSIGNED_partition_returns_the_partitio final String ownerId = UUID.randomUUID().toString(); final String sourceIdentifier = UUID.randomUUID().toString(); final Duration ownershipTimeout = Duration.ofMinutes(2); - + final Duration ttl = Duration.ofSeconds(new Random().nextInt(5)+10); + given(dynamoStoreSettings.getTtl()).willReturn(ttl); final DynamoDbSourcePartitionItem acquiredItem = mock(DynamoDbSourcePartitionItem.class); given(dynamoDbClientWrapper.getAvailablePartition(ownerId, ownershipTimeout, SourcePartitionStatus.ASSIGNED, String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.ASSIGNED), - 1)) + 1, ttl)) .willReturn(Optional.of(acquiredItem)); final Optional result = createObjectUnderTest().tryAcquireAvailablePartition(sourceIdentifier, ownerId, ownershipTimeout); @@ -294,23 +297,25 @@ void getAvailablePartition_with_acquired_CLOSED_partition_returns_the_partition( final String ownerId = UUID.randomUUID().toString(); final String sourceIdentifier = UUID.randomUUID().toString(); final Duration ownershipTimeout = Duration.ofMinutes(2); + final Duration ttl = Duration.ofSeconds(new Random().nextInt(5)+10); + given(dynamoStoreSettings.getTtl()).willReturn(ttl); final DynamoDbSourcePartitionItem acquiredItem = mock(DynamoDbSourcePartitionItem.class); given(dynamoDbClientWrapper.getAvailablePartition(ownerId, ownershipTimeout, SourcePartitionStatus.ASSIGNED, String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.ASSIGNED), - 1)) + 1, ttl)) .willReturn(Optional.empty()); given(dynamoDbClientWrapper.getAvailablePartition(ownerId, ownershipTimeout, SourcePartitionStatus.UNASSIGNED, String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.UNASSIGNED), - 5)) + 5, ttl)) .willReturn(Optional.empty()); given(dynamoDbClientWrapper.getAvailablePartition(ownerId, ownershipTimeout, SourcePartitionStatus.CLOSED, String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.CLOSED), - 1)) + 1, ttl)) .willReturn(Optional.of(acquiredItem)); final Optional result = createObjectUnderTest().tryAcquireAvailablePartition(sourceIdentifier, ownerId, ownershipTimeout); @@ -326,18 +331,20 @@ void getAvailablePartition_with_acquired_UNASSIGNED_partition_returns_the_partit final String ownerId = UUID.randomUUID().toString(); final String sourceIdentifier = UUID.randomUUID().toString(); final Duration ownershipTimeout = Duration.ofMinutes(2); + final Duration ttl = Duration.ofSeconds(new Random().nextInt(5)+10); + given(dynamoStoreSettings.getTtl()).willReturn(ttl); final DynamoDbSourcePartitionItem acquiredItem = mock(DynamoDbSourcePartitionItem.class); given(dynamoDbClientWrapper.getAvailablePartition(ownerId, ownershipTimeout, SourcePartitionStatus.ASSIGNED, String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.ASSIGNED), - 1)) + 1, ttl)) .willReturn(Optional.empty()); given(dynamoDbClientWrapper.getAvailablePartition(ownerId, ownershipTimeout, SourcePartitionStatus.UNASSIGNED, String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.UNASSIGNED), - 5)) + 5, ttl)) .willReturn(Optional.of(acquiredItem)); final Optional result = createObjectUnderTest().tryAcquireAvailablePartition(sourceIdentifier, ownerId, ownershipTimeout);