From 44f67ae9b72d34bb760dc72c0f9bfa585e872141 Mon Sep 17 00:00:00 2001 From: Taylor Gray Date: Thu, 9 Jan 2025 12:36:07 -0600 Subject: [PATCH] Do not filter out objects based on last modified timestamp when delete_objects_on_read is enabled Signed-off-by: Taylor Gray --- .../source/s3/S3ScanPartitionCreationSupplier.java | 12 +++++++++--- .../plugins/source/s3/ScanObjectWorker.java | 2 +- .../s3/S3ScanPartitionCreationSupplierTest.java | 14 +++++++++++++- 3 files changed, 23 insertions(+), 5 deletions(-) diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplier.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplier.java index 1d4bf6ea81..b411c4f8d3 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplier.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplier.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.plugins.source.s3; +import com.google.common.annotations.VisibleForTesting; import org.opensearch.dataprepper.model.source.coordinator.PartitionIdentifier; import org.opensearch.dataprepper.plugins.source.s3.configuration.FolderPartitioningOptions; import org.opensearch.dataprepper.plugins.source.s3.configuration.S3ScanKeyPathOption; @@ -50,17 +51,21 @@ public class S3ScanPartitionCreationSupplier implements Function scanOptionsList, final S3ScanSchedulingOptions schedulingOptions, - final FolderPartitioningOptions folderPartitioningOptions) { + final FolderPartitioningOptions folderPartitioningOptions, + final boolean deleteS3ObjectsOnRead) { this.s3Client = s3Client; this.bucketOwnerProvider = bucketOwnerProvider; this.scanOptionsList = scanOptionsList; this.schedulingOptions = schedulingOptions; this.folderPartitioningOptions = folderPartitioningOptions; + this.deleteS3ObjectsOnRead = deleteS3ObjectsOnRead; } @Override @@ -186,9 +191,10 @@ private void initializeGlobalStateMap(final Map globalStateMap) globalStateMap.put(SINGLE_SCAN_COMPLETE, false); } - private boolean isLastModifiedTimeAfterMostRecentScanForBucket(final Instant previousScanTime, + @VisibleForTesting + boolean isLastModifiedTimeAfterMostRecentScanForBucket(final Instant previousScanTime, final S3Object s3Object) { - if (previousScanTime == null) { + if (previousScanTime == null || deleteS3ObjectsOnRead) { return true; } diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ScanObjectWorker.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ScanObjectWorker.java index 4ecd17c584..6de0f6e274 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ScanObjectWorker.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ScanObjectWorker.java @@ -129,7 +129,7 @@ public ScanObjectWorker(final S3Client s3Client, this.folderPartitioningOptions = s3SourceConfig.getS3ScanScanOptions().getPartitioningOptions(); this.acknowledgmentSetTimeout = s3SourceConfig.getS3ScanScanOptions().getAcknowledgmentTimeout(); - this.partitionCreationSupplier = new S3ScanPartitionCreationSupplier(s3Client, bucketOwnerProvider, scanOptionsBuilderList, s3ScanSchedulingOptions, s3SourceConfig.getS3ScanScanOptions().getPartitioningOptions()); + this.partitionCreationSupplier = new S3ScanPartitionCreationSupplier(s3Client, bucketOwnerProvider, scanOptionsBuilderList, s3ScanSchedulingOptions, s3SourceConfig.getS3ScanScanOptions().getPartitioningOptions(), s3SourceConfig.isDeleteS3ObjectsOnRead()); this.acknowledgmentsRemainingForPartitions = new ConcurrentHashMap<>(); this.objectsToDeleteForAcknowledgmentSets = new ConcurrentHashMap<>(); } diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplierTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplierTest.java index 867bcb1dc5..cb0058a7a0 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplierTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplierTest.java @@ -69,15 +69,18 @@ public class S3ScanPartitionCreationSupplierTest { private FolderPartitioningOptions folderPartitioningOptions; + private boolean isDeleteS3ObjectsOnRead; + @BeforeEach void setup() { scanOptionsList = new ArrayList<>(); folderPartitioningOptions = null; + isDeleteS3ObjectsOnRead = false; } private Function, List> createObjectUnderTest() { - return new S3ScanPartitionCreationSupplier(s3Client, bucketOwnerProvider, scanOptionsList, schedulingOptions, folderPartitioningOptions); + return new S3ScanPartitionCreationSupplier(s3Client, bucketOwnerProvider, scanOptionsList, schedulingOptions, folderPartitioningOptions, isDeleteS3ObjectsOnRead); } @Test @@ -452,4 +455,13 @@ void getNextPartition_with_folder_partitioning_enabled_returns_the_expected_part assertThat(resultingPartitions.stream().map(PartitionIdentifier::getPartitionKey).collect(Collectors.toList()), containsInAnyOrder(expectedPartitionIdentifiers.stream().map(PartitionIdentifier::getPartitionKey).map(Matchers::equalTo).collect(Collectors.toList()))); } + + @Test + void isLastModifiedTimeAfterMostRecentScanForBucket_returns_true_when_disable_s3_objects_on_read_is_enabled() { + isDeleteS3ObjectsOnRead = true; + final S3ScanPartitionCreationSupplier objectUnderTest = new S3ScanPartitionCreationSupplier(s3Client, bucketOwnerProvider, scanOptionsList, schedulingOptions, folderPartitioningOptions, isDeleteS3ObjectsOnRead); + + final boolean result = objectUnderTest.isLastModifiedTimeAfterMostRecentScanForBucket(Instant.now(), mock(S3Object.class)); + assertThat(result, equalTo(true)); + } }