Skip to content

Commit

Permalink
Do not filter out objects based on last modified timestamp when delet…
Browse files Browse the repository at this point in the history
…e_objects_on_read is enabled

Signed-off-by: Taylor Gray <[email protected]>
  • Loading branch information
graytaylor0 committed Jan 10, 2025
1 parent f264c09 commit ef84434
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.dataprepper.plugins.source.s3;

import com.google.common.annotations.VisibleForTesting;
import org.opensearch.dataprepper.model.source.coordinator.PartitionIdentifier;
import org.opensearch.dataprepper.plugins.source.s3.configuration.FolderPartitioningOptions;
import org.opensearch.dataprepper.plugins.source.s3.configuration.S3ScanKeyPathOption;
Expand Down Expand Up @@ -50,17 +51,21 @@ public class S3ScanPartitionCreationSupplier implements Function<Map<String, Obj

private final FolderPartitioningOptions folderPartitioningOptions;

private final boolean deleteS3ObjectsOnRead;

public S3ScanPartitionCreationSupplier(final S3Client s3Client,
final BucketOwnerProvider bucketOwnerProvider,
final List<ScanOptions> scanOptionsList,
final S3ScanSchedulingOptions schedulingOptions,
final FolderPartitioningOptions folderPartitioningOptions) {
final FolderPartitioningOptions folderPartitioningOptions,
final boolean deleteS3ObjectsOnRead) {

this.s3Client = s3Client;
this.bucketOwnerProvider = bucketOwnerProvider;
this.scanOptionsList = scanOptionsList;
this.schedulingOptions = schedulingOptions;
this.folderPartitioningOptions = folderPartitioningOptions;
this.deleteS3ObjectsOnRead = deleteS3ObjectsOnRead;
}

@Override
Expand Down Expand Up @@ -120,7 +125,7 @@ private List<PartitionIdentifier> listFilteredS3ObjectsForBucket(final List<Stri
do {
listObjectsV2Response = s3Client.listObjectsV2(listObjectsV2Request.fetchOwner(true).continuationToken(Objects.nonNull(listObjectsV2Response) ? listObjectsV2Response.nextContinuationToken() : null).build());
allPartitionIdentifiers.addAll(listObjectsV2Response.contents().stream()
.filter(s3Object -> isLastModifiedTimeAfterMostRecentScanForBucket(previousScanTime, s3Object))
.filter(s3Object -> deleteS3ObjectsOnRead || isLastModifiedTimeAfterMostRecentScanForBucket(previousScanTime, s3Object))
.map(s3Object -> Pair.of(s3Object.key(), instantToLocalDateTime(s3Object.lastModified())))
.filter(keyTimestampPair -> !keyTimestampPair.left().endsWith("/"))
.filter(keyTimestampPair -> excludeKeyPaths.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public ScanObjectWorker(final S3Client s3Client,
this.folderPartitioningOptions = s3SourceConfig.getS3ScanScanOptions().getPartitioningOptions();
this.acknowledgmentSetTimeout = s3SourceConfig.getS3ScanScanOptions().getAcknowledgmentTimeout();

this.partitionCreationSupplier = new S3ScanPartitionCreationSupplier(s3Client, bucketOwnerProvider, scanOptionsBuilderList, s3ScanSchedulingOptions, s3SourceConfig.getS3ScanScanOptions().getPartitioningOptions());
this.partitionCreationSupplier = new S3ScanPartitionCreationSupplier(s3Client, bucketOwnerProvider, scanOptionsBuilderList, s3ScanSchedulingOptions, s3SourceConfig.getS3ScanScanOptions().getPartitioningOptions(), s3SourceConfig.isDeleteS3ObjectsOnRead());
this.acknowledgmentsRemainingForPartitions = new ConcurrentHashMap<>();
this.objectsToDeleteForAcknowledgmentSets = new ConcurrentHashMap<>();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand All @@ -69,15 +70,18 @@ public class S3ScanPartitionCreationSupplierTest {

private FolderPartitioningOptions folderPartitioningOptions;

private boolean isDeleteS3ObjectsOnRead;

@BeforeEach
void setup() {
scanOptionsList = new ArrayList<>();
folderPartitioningOptions = null;
isDeleteS3ObjectsOnRead = false;
}


private Function<Map<String, Object>, List<PartitionIdentifier>> createObjectUnderTest() {
return new S3ScanPartitionCreationSupplier(s3Client, bucketOwnerProvider, scanOptionsList, schedulingOptions, folderPartitioningOptions);
return new S3ScanPartitionCreationSupplier(s3Client, bucketOwnerProvider, scanOptionsList, schedulingOptions, folderPartitioningOptions, isDeleteS3ObjectsOnRead);
}

@Test
Expand Down Expand Up @@ -452,4 +456,132 @@ void getNextPartition_with_folder_partitioning_enabled_returns_the_expected_part
assertThat(resultingPartitions.stream().map(PartitionIdentifier::getPartitionKey).collect(Collectors.toList()),
containsInAnyOrder(expectedPartitionIdentifiers.stream().map(PartitionIdentifier::getPartitionKey).map(Matchers::equalTo).collect(Collectors.toList())));
}

@Test
void object_is_not_filtered_out_based_on_last_modified_timestamp_when_delete_objects_on_read_is_enabled() {
schedulingOptions = mock(S3ScanSchedulingOptions.class);
given(schedulingOptions.getInterval()).willReturn(Duration.ofMillis(0));
given(schedulingOptions.getCount()).willReturn(2);
isDeleteS3ObjectsOnRead = true;

final String firstBucket = "bucket-one";
final String secondBucket = "bucket-two";

final ScanOptions firstBucketScanOptions = mock(ScanOptions.class);
final S3ScanBucketOption firstBucketScanBucketOption = mock(S3ScanBucketOption.class);
given(firstBucketScanOptions.getBucketOption()).willReturn(firstBucketScanBucketOption);
given(firstBucketScanBucketOption.getName()).willReturn(firstBucket);
given(firstBucketScanOptions.getUseStartDateTime()).willReturn(null);
given(firstBucketScanOptions.getUseEndDateTime()).willReturn(null);
final S3ScanKeyPathOption firstBucketScanKeyPath = mock(S3ScanKeyPathOption.class);
given(firstBucketScanBucketOption.getS3ScanFilter()).willReturn(firstBucketScanKeyPath);
given(firstBucketScanKeyPath.getS3scanIncludePrefixOptions()).willReturn(List.of(UUID.randomUUID().toString()));
given(firstBucketScanKeyPath.getS3ScanExcludeSuffixOptions()).willReturn(List.of(".invalid"));
scanOptionsList.add(firstBucketScanOptions);

final ScanOptions secondBucketScanOptions = mock(ScanOptions.class);
final S3ScanBucketOption secondBucketScanBucketOption = mock(S3ScanBucketOption.class);
given(secondBucketScanOptions.getBucketOption()).willReturn(secondBucketScanBucketOption);
given(secondBucketScanBucketOption.getName()).willReturn(secondBucket);
given(secondBucketScanOptions.getUseStartDateTime()).willReturn(null);
given(secondBucketScanOptions.getUseEndDateTime()).willReturn(null);
final S3ScanKeyPathOption secondBucketScanKeyPath = mock(S3ScanKeyPathOption.class);
given(secondBucketScanBucketOption.getS3ScanFilter()).willReturn(secondBucketScanKeyPath);
given(secondBucketScanKeyPath.getS3scanIncludePrefixOptions()).willReturn(null);
given(secondBucketScanKeyPath.getS3ScanExcludeSuffixOptions()).willReturn(null);
scanOptionsList.add(secondBucketScanOptions);

final Function<Map<String, Object>, List<PartitionIdentifier>> partitionCreationSupplier = createObjectUnderTest();

final List<PartitionIdentifier> expectedPartitionIdentifiers = new ArrayList<>();

final ListObjectsV2Response listObjectsResponse = mock(ListObjectsV2Response.class);
final List<S3Object> s3ObjectsList = new ArrayList<>();

final S3Object invalidFolderObject = mock(S3Object.class);
given(invalidFolderObject.key()).willReturn("folder-key/");
given(invalidFolderObject.lastModified()).willReturn(Instant.now());
s3ObjectsList.add(invalidFolderObject);

final S3Object invalidForFirstBucketSuffixObject = mock(S3Object.class);
given(invalidForFirstBucketSuffixObject.key()).willReturn("test.invalid");
given(invalidForFirstBucketSuffixObject.lastModified()).willReturn(Instant.now().minusSeconds(2));
s3ObjectsList.add(invalidForFirstBucketSuffixObject);
expectedPartitionIdentifiers.add(PartitionIdentifier.builder().withPartitionKey(secondBucket + "|" + invalidForFirstBucketSuffixObject.key()).build());

final Instant mostRecentFirstScan = Instant.now().plusSeconds(2);
final S3Object validObject = mock(S3Object.class);
given(validObject.key()).willReturn("valid");
given(validObject.lastModified()).willReturn(mostRecentFirstScan);
s3ObjectsList.add(validObject);
expectedPartitionIdentifiers.add(PartitionIdentifier.builder().withPartitionKey(firstBucket + "|" + validObject.key()).build());
expectedPartitionIdentifiers.add(PartitionIdentifier.builder().withPartitionKey(secondBucket + "|" + validObject.key()).build());

final S3Object secondScanObject = mock(S3Object.class);
final Instant mostRecentSecondScan = Instant.now().plusSeconds(10);
given(secondScanObject.key()).willReturn("second-scan");
given(secondScanObject.lastModified()).willReturn(mostRecentSecondScan);

final List<PartitionIdentifier> expectedPartitionIdentifiersSecondScan = new ArrayList<>();
expectedPartitionIdentifiersSecondScan.add(PartitionIdentifier.builder().withPartitionKey(firstBucket + "|" + secondScanObject.key()).build());
expectedPartitionIdentifiersSecondScan.add(PartitionIdentifier.builder().withPartitionKey(firstBucket + "|" + validObject.key()).build());
expectedPartitionIdentifiersSecondScan.add(PartitionIdentifier.builder().withPartitionKey(secondBucket + "|" + secondScanObject.key()).build());
expectedPartitionIdentifiersSecondScan.add(PartitionIdentifier.builder().withPartitionKey(secondBucket + "|" + validObject.key()).build());

// Since delete objects on read is enabled, the second scan will pick up the same object from the first scan
expectedPartitionIdentifiersSecondScan.add(PartitionIdentifier.builder().withPartitionKey(secondBucket + "|" + invalidForFirstBucketSuffixObject.key()).build());

final List<S3Object> secondScanObjects = new ArrayList<>(s3ObjectsList);
secondScanObjects.add(secondScanObject);
given(listObjectsResponse.contents())
.willReturn(s3ObjectsList)
.willReturn(s3ObjectsList)
.willReturn(secondScanObjects)
.willReturn(secondScanObjects);

final ArgumentCaptor<ListObjectsV2Request> listObjectsV2RequestArgumentCaptor = ArgumentCaptor.forClass(ListObjectsV2Request.class);
given(s3Client.listObjectsV2(listObjectsV2RequestArgumentCaptor.capture())).willReturn(listObjectsResponse);

final Map<String, Object> globalStateMap = new HashMap<>();

final Instant beforeFirstScan = Instant.now();
final List<PartitionIdentifier> resultingPartitions = partitionCreationSupplier.apply(globalStateMap);

assertThat(resultingPartitions, notNullValue());
assertThat(resultingPartitions.size(), equalTo(expectedPartitionIdentifiers.size()));
assertThat(resultingPartitions.stream().map(PartitionIdentifier::getPartitionKey).collect(Collectors.toList()),
containsInAnyOrder(expectedPartitionIdentifiers.stream().map(PartitionIdentifier::getPartitionKey)
.map(Matchers::equalTo).collect(Collectors.toList())));

assertThat(globalStateMap, notNullValue());
assertThat(globalStateMap.containsKey(SCAN_COUNT), equalTo(true));
assertThat(globalStateMap.get(SCAN_COUNT), equalTo(1));
assertThat(globalStateMap.containsKey(firstBucket), equalTo(true));
assertThat(Instant.parse((CharSequence) globalStateMap.get(firstBucket)), lessThanOrEqualTo(mostRecentFirstScan));
assertThat(Instant.parse((CharSequence) globalStateMap.get(firstBucket)), greaterThanOrEqualTo(beforeFirstScan));
assertThat(globalStateMap.containsKey(secondBucket), equalTo(true));
assertThat(Instant.parse((CharSequence) globalStateMap.get(secondBucket)), lessThanOrEqualTo(mostRecentFirstScan));
assertThat(Instant.parse((CharSequence) globalStateMap.get(secondBucket)), greaterThanOrEqualTo(beforeFirstScan));

final Instant beforeSecondScan = Instant.now();
final List<PartitionIdentifier> secondScanPartitions = partitionCreationSupplier.apply(globalStateMap);
assertThat(secondScanPartitions.size(), equalTo(expectedPartitionIdentifiersSecondScan.size()));
assertThat(secondScanPartitions.stream().map(PartitionIdentifier::getPartitionKey).collect(Collectors.toList()),
containsInAnyOrder(expectedPartitionIdentifiersSecondScan.stream().map(PartitionIdentifier::getPartitionKey).map(Matchers::equalTo).collect(Collectors.toList())));

assertThat(globalStateMap, notNullValue());
assertThat(globalStateMap.containsKey(SCAN_COUNT), equalTo(true));
assertThat(globalStateMap.get(SCAN_COUNT), equalTo(2));
assertThat(globalStateMap.containsKey(firstBucket), equalTo(true));
assertThat(Instant.parse((CharSequence) globalStateMap.get(firstBucket)), lessThanOrEqualTo(mostRecentSecondScan));
assertThat(Instant.parse((CharSequence) globalStateMap.get(firstBucket)), greaterThanOrEqualTo(beforeSecondScan));
assertThat(globalStateMap.containsKey(secondBucket), equalTo(true));
assertThat(Instant.parse((CharSequence) globalStateMap.get(secondBucket)), lessThanOrEqualTo(mostRecentSecondScan));
assertThat(Instant.parse((CharSequence) globalStateMap.get(secondBucket)), greaterThan(beforeSecondScan));
assertThat(Instant.ofEpochMilli((Long) globalStateMap.get(LAST_SCAN_TIME)).isBefore(Instant.now()), equalTo(true));

assertThat(partitionCreationSupplier.apply(globalStateMap), equalTo(Collections.emptyList()));

verify(listObjectsResponse, times(4)).contents();
}
}

0 comments on commit ef84434

Please sign in to comment.