Skip to content

Commit

Permalink
Do not filter out objects based on last modified timestamp when delet…
Browse files Browse the repository at this point in the history
…e_objects_on_read is enabled

Signed-off-by: Taylor Gray <[email protected]>
  • Loading branch information
graytaylor0 committed Jan 9, 2025
1 parent f264c09 commit 44f67ae
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.dataprepper.plugins.source.s3;

import com.google.common.annotations.VisibleForTesting;
import org.opensearch.dataprepper.model.source.coordinator.PartitionIdentifier;
import org.opensearch.dataprepper.plugins.source.s3.configuration.FolderPartitioningOptions;
import org.opensearch.dataprepper.plugins.source.s3.configuration.S3ScanKeyPathOption;
Expand Down Expand Up @@ -50,17 +51,21 @@ public class S3ScanPartitionCreationSupplier implements Function<Map<String, Obj

private final FolderPartitioningOptions folderPartitioningOptions;

private final boolean deleteS3ObjectsOnRead;

public S3ScanPartitionCreationSupplier(final S3Client s3Client,
final BucketOwnerProvider bucketOwnerProvider,
final List<ScanOptions> scanOptionsList,
final S3ScanSchedulingOptions schedulingOptions,
final FolderPartitioningOptions folderPartitioningOptions) {
final FolderPartitioningOptions folderPartitioningOptions,
final boolean deleteS3ObjectsOnRead) {

this.s3Client = s3Client;
this.bucketOwnerProvider = bucketOwnerProvider;
this.scanOptionsList = scanOptionsList;
this.schedulingOptions = schedulingOptions;
this.folderPartitioningOptions = folderPartitioningOptions;
this.deleteS3ObjectsOnRead = deleteS3ObjectsOnRead;
}

@Override
Expand Down Expand Up @@ -186,9 +191,10 @@ private void initializeGlobalStateMap(final Map<String, Object> globalStateMap)
globalStateMap.put(SINGLE_SCAN_COMPLETE, false);
}

private boolean isLastModifiedTimeAfterMostRecentScanForBucket(final Instant previousScanTime,
@VisibleForTesting
boolean isLastModifiedTimeAfterMostRecentScanForBucket(final Instant previousScanTime,
final S3Object s3Object) {
if (previousScanTime == null) {
if (previousScanTime == null || deleteS3ObjectsOnRead) {
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public ScanObjectWorker(final S3Client s3Client,
this.folderPartitioningOptions = s3SourceConfig.getS3ScanScanOptions().getPartitioningOptions();
this.acknowledgmentSetTimeout = s3SourceConfig.getS3ScanScanOptions().getAcknowledgmentTimeout();

this.partitionCreationSupplier = new S3ScanPartitionCreationSupplier(s3Client, bucketOwnerProvider, scanOptionsBuilderList, s3ScanSchedulingOptions, s3SourceConfig.getS3ScanScanOptions().getPartitioningOptions());
this.partitionCreationSupplier = new S3ScanPartitionCreationSupplier(s3Client, bucketOwnerProvider, scanOptionsBuilderList, s3ScanSchedulingOptions, s3SourceConfig.getS3ScanScanOptions().getPartitioningOptions(), s3SourceConfig.isDeleteS3ObjectsOnRead());
this.acknowledgmentsRemainingForPartitions = new ConcurrentHashMap<>();
this.objectsToDeleteForAcknowledgmentSets = new ConcurrentHashMap<>();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,18 @@ public class S3ScanPartitionCreationSupplierTest {

private FolderPartitioningOptions folderPartitioningOptions;

private boolean isDeleteS3ObjectsOnRead;

@BeforeEach
void setup() {
scanOptionsList = new ArrayList<>();
folderPartitioningOptions = null;
isDeleteS3ObjectsOnRead = false;
}


private Function<Map<String, Object>, List<PartitionIdentifier>> createObjectUnderTest() {
return new S3ScanPartitionCreationSupplier(s3Client, bucketOwnerProvider, scanOptionsList, schedulingOptions, folderPartitioningOptions);
return new S3ScanPartitionCreationSupplier(s3Client, bucketOwnerProvider, scanOptionsList, schedulingOptions, folderPartitioningOptions, isDeleteS3ObjectsOnRead);
}

@Test
Expand Down Expand Up @@ -452,4 +455,13 @@ void getNextPartition_with_folder_partitioning_enabled_returns_the_expected_part
assertThat(resultingPartitions.stream().map(PartitionIdentifier::getPartitionKey).collect(Collectors.toList()),
containsInAnyOrder(expectedPartitionIdentifiers.stream().map(PartitionIdentifier::getPartitionKey).map(Matchers::equalTo).collect(Collectors.toList())));
}

@Test
void isLastModifiedTimeAfterMostRecentScanForBucket_returns_true_when_disable_s3_objects_on_read_is_enabled() {
isDeleteS3ObjectsOnRead = true;
final S3ScanPartitionCreationSupplier objectUnderTest = new S3ScanPartitionCreationSupplier(s3Client, bucketOwnerProvider, scanOptionsList, schedulingOptions, folderPartitioningOptions, isDeleteS3ObjectsOnRead);

final boolean result = objectUnderTest.isLastModifiedTimeAfterMostRecentScanForBucket(Instant.now(), mock(S3Object.class));
assertThat(result, equalTo(true));
}
}

0 comments on commit 44f67ae

Please sign in to comment.