From 933c1175ad53285d2b1e03c63619d503f61c80b2 Mon Sep 17 00:00:00 2001 From: Taylor Gray Date: Thu, 9 Jan 2025 12:36:07 -0600 Subject: [PATCH] Do not filter out objects based on last modified timestamp when delete_objects_on_read is enabled Signed-off-by: Taylor Gray --- .../s3/S3ScanPartitionCreationSupplier.java | 12 +- .../plugins/source/s3/ScanObjectWorker.java | 2 +- .../S3ScanPartitionCreationSupplierTest.java | 134 +++++++++++++++++- 3 files changed, 143 insertions(+), 5 deletions(-) diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplier.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplier.java index 1d4bf6ea81..f757bf3e6e 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplier.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplier.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.plugins.source.s3; +import com.google.common.annotations.VisibleForTesting; import org.opensearch.dataprepper.model.source.coordinator.PartitionIdentifier; import org.opensearch.dataprepper.plugins.source.s3.configuration.FolderPartitioningOptions; import org.opensearch.dataprepper.plugins.source.s3.configuration.S3ScanKeyPathOption; @@ -50,17 +51,21 @@ public class S3ScanPartitionCreationSupplier implements Function scanOptionsList, final S3ScanSchedulingOptions schedulingOptions, - final FolderPartitioningOptions folderPartitioningOptions) { + final FolderPartitioningOptions folderPartitioningOptions, + final boolean deleteS3ObjectsOnRead) { this.s3Client = s3Client; this.bucketOwnerProvider = bucketOwnerProvider; this.scanOptionsList = scanOptionsList; this.schedulingOptions = schedulingOptions; this.folderPartitioningOptions = folderPartitioningOptions; + this.deleteS3ObjectsOnRead = deleteS3ObjectsOnRead; } @Override @@ -120,7 +125,7 @@ private List listFilteredS3ObjectsForBucket(final List isLastModifiedTimeAfterMostRecentScanForBucket(previousScanTime, s3Object)) + .filter(s3Object -> deleteS3ObjectsOnRead || isLastModifiedTimeAfterMostRecentScanForBucket(previousScanTime, s3Object)) .map(s3Object -> Pair.of(s3Object.key(), instantToLocalDateTime(s3Object.lastModified()))) .filter(keyTimestampPair -> !keyTimestampPair.left().endsWith("/")) .filter(keyTimestampPair -> excludeKeyPaths.stream() @@ -186,7 +191,8 @@ private void initializeGlobalStateMap(final Map globalStateMap) globalStateMap.put(SINGLE_SCAN_COMPLETE, false); } - private boolean isLastModifiedTimeAfterMostRecentScanForBucket(final Instant previousScanTime, + @VisibleForTesting + boolean isLastModifiedTimeAfterMostRecentScanForBucket(final Instant previousScanTime, final S3Object s3Object) { if (previousScanTime == null) { return true; diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ScanObjectWorker.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ScanObjectWorker.java index 4ecd17c584..6de0f6e274 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ScanObjectWorker.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ScanObjectWorker.java @@ -129,7 +129,7 @@ public ScanObjectWorker(final S3Client s3Client, this.folderPartitioningOptions = s3SourceConfig.getS3ScanScanOptions().getPartitioningOptions(); this.acknowledgmentSetTimeout = s3SourceConfig.getS3ScanScanOptions().getAcknowledgmentTimeout(); - this.partitionCreationSupplier = new S3ScanPartitionCreationSupplier(s3Client, bucketOwnerProvider, scanOptionsBuilderList, s3ScanSchedulingOptions, s3SourceConfig.getS3ScanScanOptions().getPartitioningOptions()); + this.partitionCreationSupplier = new S3ScanPartitionCreationSupplier(s3Client, bucketOwnerProvider, scanOptionsBuilderList, s3ScanSchedulingOptions, s3SourceConfig.getS3ScanScanOptions().getPartitioningOptions(), s3SourceConfig.isDeleteS3ObjectsOnRead()); this.acknowledgmentsRemainingForPartitions = new ConcurrentHashMap<>(); this.objectsToDeleteForAcknowledgmentSets = new ConcurrentHashMap<>(); } diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplierTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplierTest.java index 867bcb1dc5..5249573ee8 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplierTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplierTest.java @@ -48,6 +48,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.BDDMockito.given; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -69,15 +70,18 @@ public class S3ScanPartitionCreationSupplierTest { private FolderPartitioningOptions folderPartitioningOptions; + private boolean isDeleteS3ObjectsOnRead; + @BeforeEach void setup() { scanOptionsList = new ArrayList<>(); folderPartitioningOptions = null; + isDeleteS3ObjectsOnRead = false; } private Function, List> createObjectUnderTest() { - return new S3ScanPartitionCreationSupplier(s3Client, bucketOwnerProvider, scanOptionsList, schedulingOptions, folderPartitioningOptions); + return new S3ScanPartitionCreationSupplier(s3Client, bucketOwnerProvider, scanOptionsList, schedulingOptions, folderPartitioningOptions, isDeleteS3ObjectsOnRead); } @Test @@ -452,4 +456,132 @@ void getNextPartition_with_folder_partitioning_enabled_returns_the_expected_part assertThat(resultingPartitions.stream().map(PartitionIdentifier::getPartitionKey).collect(Collectors.toList()), containsInAnyOrder(expectedPartitionIdentifiers.stream().map(PartitionIdentifier::getPartitionKey).map(Matchers::equalTo).collect(Collectors.toList()))); } + + @Test + void object_is_not_filtered_out_based_on_last_modified_timestamp_when_delete_objects_on_read_is_enabled() { + schedulingOptions = mock(S3ScanSchedulingOptions.class); + given(schedulingOptions.getInterval()).willReturn(Duration.ofMillis(0)); + given(schedulingOptions.getCount()).willReturn(2); + isDeleteS3ObjectsOnRead = true; + + final String firstBucket = "bucket-one"; + final String secondBucket = "bucket-two"; + + final ScanOptions firstBucketScanOptions = mock(ScanOptions.class); + final S3ScanBucketOption firstBucketScanBucketOption = mock(S3ScanBucketOption.class); + given(firstBucketScanOptions.getBucketOption()).willReturn(firstBucketScanBucketOption); + given(firstBucketScanBucketOption.getName()).willReturn(firstBucket); + given(firstBucketScanOptions.getUseStartDateTime()).willReturn(null); + given(firstBucketScanOptions.getUseEndDateTime()).willReturn(null); + final S3ScanKeyPathOption firstBucketScanKeyPath = mock(S3ScanKeyPathOption.class); + given(firstBucketScanBucketOption.getS3ScanFilter()).willReturn(firstBucketScanKeyPath); + given(firstBucketScanKeyPath.getS3scanIncludePrefixOptions()).willReturn(List.of(UUID.randomUUID().toString())); + given(firstBucketScanKeyPath.getS3ScanExcludeSuffixOptions()).willReturn(List.of(".invalid")); + scanOptionsList.add(firstBucketScanOptions); + + final ScanOptions secondBucketScanOptions = mock(ScanOptions.class); + final S3ScanBucketOption secondBucketScanBucketOption = mock(S3ScanBucketOption.class); + given(secondBucketScanOptions.getBucketOption()).willReturn(secondBucketScanBucketOption); + given(secondBucketScanBucketOption.getName()).willReturn(secondBucket); + given(secondBucketScanOptions.getUseStartDateTime()).willReturn(null); + given(secondBucketScanOptions.getUseEndDateTime()).willReturn(null); + final S3ScanKeyPathOption secondBucketScanKeyPath = mock(S3ScanKeyPathOption.class); + given(secondBucketScanBucketOption.getS3ScanFilter()).willReturn(secondBucketScanKeyPath); + given(secondBucketScanKeyPath.getS3scanIncludePrefixOptions()).willReturn(null); + given(secondBucketScanKeyPath.getS3ScanExcludeSuffixOptions()).willReturn(null); + scanOptionsList.add(secondBucketScanOptions); + + final Function, List> partitionCreationSupplier = createObjectUnderTest(); + + final List expectedPartitionIdentifiers = new ArrayList<>(); + + final ListObjectsV2Response listObjectsResponse = mock(ListObjectsV2Response.class); + final List s3ObjectsList = new ArrayList<>(); + + final S3Object invalidFolderObject = mock(S3Object.class); + given(invalidFolderObject.key()).willReturn("folder-key/"); + given(invalidFolderObject.lastModified()).willReturn(Instant.now()); + s3ObjectsList.add(invalidFolderObject); + + final S3Object invalidForFirstBucketSuffixObject = mock(S3Object.class); + given(invalidForFirstBucketSuffixObject.key()).willReturn("test.invalid"); + given(invalidForFirstBucketSuffixObject.lastModified()).willReturn(Instant.now().minusSeconds(2)); + s3ObjectsList.add(invalidForFirstBucketSuffixObject); + expectedPartitionIdentifiers.add(PartitionIdentifier.builder().withPartitionKey(secondBucket + "|" + invalidForFirstBucketSuffixObject.key()).build()); + + final Instant mostRecentFirstScan = Instant.now().plusSeconds(2); + final S3Object validObject = mock(S3Object.class); + given(validObject.key()).willReturn("valid"); + given(validObject.lastModified()).willReturn(mostRecentFirstScan); + s3ObjectsList.add(validObject); + expectedPartitionIdentifiers.add(PartitionIdentifier.builder().withPartitionKey(firstBucket + "|" + validObject.key()).build()); + expectedPartitionIdentifiers.add(PartitionIdentifier.builder().withPartitionKey(secondBucket + "|" + validObject.key()).build()); + + final S3Object secondScanObject = mock(S3Object.class); + final Instant mostRecentSecondScan = Instant.now().plusSeconds(10); + given(secondScanObject.key()).willReturn("second-scan"); + given(secondScanObject.lastModified()).willReturn(mostRecentSecondScan); + + final List expectedPartitionIdentifiersSecondScan = new ArrayList<>(); + expectedPartitionIdentifiersSecondScan.add(PartitionIdentifier.builder().withPartitionKey(firstBucket + "|" + secondScanObject.key()).build()); + expectedPartitionIdentifiersSecondScan.add(PartitionIdentifier.builder().withPartitionKey(firstBucket + "|" + validObject.key()).build()); + expectedPartitionIdentifiersSecondScan.add(PartitionIdentifier.builder().withPartitionKey(secondBucket + "|" + secondScanObject.key()).build()); + expectedPartitionIdentifiersSecondScan.add(PartitionIdentifier.builder().withPartitionKey(secondBucket + "|" + validObject.key()).build()); + + // Since delete objects on read is enabled, the second scan will pick up the same object from the first scan + expectedPartitionIdentifiersSecondScan.add(PartitionIdentifier.builder().withPartitionKey(secondBucket + "|" + invalidForFirstBucketSuffixObject.key()).build()); + + final List secondScanObjects = new ArrayList<>(s3ObjectsList); + secondScanObjects.add(secondScanObject); + given(listObjectsResponse.contents()) + .willReturn(s3ObjectsList) + .willReturn(s3ObjectsList) + .willReturn(secondScanObjects) + .willReturn(secondScanObjects); + + final ArgumentCaptor listObjectsV2RequestArgumentCaptor = ArgumentCaptor.forClass(ListObjectsV2Request.class); + given(s3Client.listObjectsV2(listObjectsV2RequestArgumentCaptor.capture())).willReturn(listObjectsResponse); + + final Map globalStateMap = new HashMap<>(); + + final Instant beforeFirstScan = Instant.now(); + final List resultingPartitions = partitionCreationSupplier.apply(globalStateMap); + + assertThat(resultingPartitions, notNullValue()); + assertThat(resultingPartitions.size(), equalTo(expectedPartitionIdentifiers.size())); + assertThat(resultingPartitions.stream().map(PartitionIdentifier::getPartitionKey).collect(Collectors.toList()), + containsInAnyOrder(expectedPartitionIdentifiers.stream().map(PartitionIdentifier::getPartitionKey) + .map(Matchers::equalTo).collect(Collectors.toList()))); + + assertThat(globalStateMap, notNullValue()); + assertThat(globalStateMap.containsKey(SCAN_COUNT), equalTo(true)); + assertThat(globalStateMap.get(SCAN_COUNT), equalTo(1)); + assertThat(globalStateMap.containsKey(firstBucket), equalTo(true)); + assertThat(Instant.parse((CharSequence) globalStateMap.get(firstBucket)), lessThanOrEqualTo(mostRecentFirstScan)); + assertThat(Instant.parse((CharSequence) globalStateMap.get(firstBucket)), greaterThanOrEqualTo(beforeFirstScan)); + assertThat(globalStateMap.containsKey(secondBucket), equalTo(true)); + assertThat(Instant.parse((CharSequence) globalStateMap.get(secondBucket)), lessThanOrEqualTo(mostRecentFirstScan)); + assertThat(Instant.parse((CharSequence) globalStateMap.get(secondBucket)), greaterThanOrEqualTo(beforeFirstScan)); + + final Instant beforeSecondScan = Instant.now(); + final List secondScanPartitions = partitionCreationSupplier.apply(globalStateMap); + assertThat(secondScanPartitions.size(), equalTo(expectedPartitionIdentifiersSecondScan.size())); + assertThat(secondScanPartitions.stream().map(PartitionIdentifier::getPartitionKey).collect(Collectors.toList()), + containsInAnyOrder(expectedPartitionIdentifiersSecondScan.stream().map(PartitionIdentifier::getPartitionKey).map(Matchers::equalTo).collect(Collectors.toList()))); + + assertThat(globalStateMap, notNullValue()); + assertThat(globalStateMap.containsKey(SCAN_COUNT), equalTo(true)); + assertThat(globalStateMap.get(SCAN_COUNT), equalTo(2)); + assertThat(globalStateMap.containsKey(firstBucket), equalTo(true)); + assertThat(Instant.parse((CharSequence) globalStateMap.get(firstBucket)), lessThanOrEqualTo(mostRecentSecondScan)); + assertThat(Instant.parse((CharSequence) globalStateMap.get(firstBucket)), greaterThanOrEqualTo(beforeSecondScan)); + assertThat(globalStateMap.containsKey(secondBucket), equalTo(true)); + assertThat(Instant.parse((CharSequence) globalStateMap.get(secondBucket)), lessThanOrEqualTo(mostRecentSecondScan)); + assertThat(Instant.parse((CharSequence) globalStateMap.get(secondBucket)), greaterThan(beforeSecondScan)); + assertThat(Instant.ofEpochMilli((Long) globalStateMap.get(LAST_SCAN_TIME)).isBefore(Instant.now()), equalTo(true)); + + assertThat(partitionCreationSupplier.apply(globalStateMap), equalTo(Collections.emptyList())); + + verify(listObjectsResponse, times(4)).contents(); + } }