From 97c82f4539bb8a7bf9d375148e6886f20e7ab9e0 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Thu, 9 Jan 2025 05:52:10 +0000 Subject: [PATCH] Use async client for delete blob or path in S3 Blob Container (#16788) * Use async client for delete blob or path in S3 Blob Container Signed-off-by: Ashish Singh * Fix UTs Signed-off-by: Ashish Singh * Fix failures in S3BlobStoreRepositoryTests Signed-off-by: Ashish Singh * Fix S3BlobStoreRepositoryTests Signed-off-by: Ashish Singh * Fix failures in S3RepositoryThirdPartyTests Signed-off-by: Ashish Singh * Fix failures in S3RepositoryPluginTests Signed-off-by: Ashish Singh --------- Signed-off-by: Ashish Singh (cherry picked from commit 1d4b85f5ff8c4e314ecf49190b68eb995bf571d8) Signed-off-by: github-actions[bot] --- .../s3/S3BlobStoreRepositoryTests.java | 27 +- .../s3/S3RepositoryThirdPartyTests.java | 8 - .../repositories/s3/S3AsyncService.java | 30 +- .../repositories/s3/S3BlobContainer.java | 140 +------ .../repositories/s3/S3RepositoryPlugin.java | 19 +- .../s3/S3BlobStoreContainerTests.java | 347 +++++++++++++----- .../s3/S3RepositoryPluginTests.java | 11 +- .../common/settings/ClusterSettings.java | 1 - .../blobstore/BlobStoreRepository.java | 38 +- 9 files changed, 322 insertions(+), 299 deletions(-) diff --git a/plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3BlobStoreRepositoryTests.java b/plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3BlobStoreRepositoryTests.java index 909c64ce25372..c4b47f3cc899f 100644 --- a/plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3BlobStoreRepositoryTests.java +++ b/plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3BlobStoreRepositoryTests.java @@ -65,6 +65,7 @@ import org.opensearch.repositories.RepositoryStats; import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.repositories.blobstore.OpenSearchMockAPIBasedRepositoryIntegTestCase; +import org.opensearch.repositories.s3.async.AsyncTransferManager; import org.opensearch.repositories.s3.utils.AwsRequestSigner; import org.opensearch.snapshots.SnapshotId; import org.opensearch.snapshots.SnapshotsService; @@ -166,7 +167,6 @@ protected Settings nodeSettings(int nodeOrdinal) { // Disable request throttling because some random values in tests might generate too many failures for the S3 client .put(S3ClientSettings.USE_THROTTLE_RETRIES_SETTING.getConcreteSettingForNamespace("test").getKey(), false) .put(S3ClientSettings.PROXY_TYPE_SETTING.getConcreteSettingForNamespace("test").getKey(), ProxySettings.ProxyType.DIRECT) - .put(BlobStoreRepository.SNAPSHOT_ASYNC_DELETION_ENABLE_SETTING.getKey(), false) .put(super.nodeSettings(nodeOrdinal)) .setSecureSettings(secureSettings); @@ -316,22 +316,27 @@ protected S3Repository createRepository( ClusterService clusterService, RecoverySettings recoverySettings ) { - GenericStatsMetricPublisher genericStatsMetricPublisher = new GenericStatsMetricPublisher(10000L, 10, 10000L, 10); - + AsyncTransferManager asyncUploadUtils = new AsyncTransferManager( + S3Repository.PARALLEL_MULTIPART_UPLOAD_MINIMUM_PART_SIZE_SETTING.get(clusterService.getSettings()).getBytes(), + normalExecutorBuilder.getStreamReader(), + priorityExecutorBuilder.getStreamReader(), + urgentExecutorBuilder.getStreamReader(), + transferSemaphoresHolder + ); return new S3Repository( metadata, registry, service, clusterService, recoverySettings, - null, - null, - null, - null, - null, - false, - null, - null, + asyncUploadUtils, + urgentExecutorBuilder, + priorityExecutorBuilder, + normalExecutorBuilder, + s3AsyncService, + S3Repository.PARALLEL_MULTIPART_UPLOAD_ENABLED_SETTING.get(clusterService.getSettings()), + normalPrioritySizeBasedBlockingQ, + lowPrioritySizeBasedBlockingQ, genericStatsMetricPublisher ) { diff --git a/plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3RepositoryThirdPartyTests.java b/plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3RepositoryThirdPartyTests.java index f0e40db965646..7db9a0d3ba790 100644 --- a/plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3RepositoryThirdPartyTests.java +++ b/plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3RepositoryThirdPartyTests.java @@ -55,14 +55,6 @@ public class S3RepositoryThirdPartyTests extends AbstractThirdPartyRepositoryTestCase { - @Override - protected Settings nodeSettings() { - return Settings.builder() - .put(super.nodeSettings()) - .put(BlobStoreRepository.SNAPSHOT_ASYNC_DELETION_ENABLE_SETTING.getKey(), false) - .build(); - } - @Override @Before @SuppressForbidden(reason = "Need to set system property here for AWS SDK v2") diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3AsyncService.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3AsyncService.java index 8bbef168de89c..7397c3132c17c 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3AsyncService.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3AsyncService.java @@ -25,7 +25,6 @@ import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; import software.amazon.awssdk.http.nio.netty.ProxyConfiguration; import software.amazon.awssdk.http.nio.netty.SdkEventLoopGroup; -import software.amazon.awssdk.profiles.ProfileFileSystemSetting; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3AsyncClientBuilder; @@ -120,6 +119,7 @@ public AmazonAsyncS3Reference client( if (existing != null && existing.tryIncRef()) { return existing; } + final AmazonAsyncS3Reference clientReference = new AmazonAsyncS3Reference( buildClient(clientSettings, urgentExecutorBuilder, priorityExecutorBuilder, normalExecutorBuilder) ); @@ -235,17 +235,17 @@ synchronized AmazonAsyncS3WithCredentials buildClient( } static ClientOverrideConfiguration buildOverrideConfiguration(final S3ClientSettings clientSettings) { + RetryPolicy retryPolicy = SocketAccess.doPrivileged( + () -> RetryPolicy.builder() + .numRetries(clientSettings.maxRetries) + .throttlingBackoffStrategy( + clientSettings.throttleRetries ? BackoffStrategy.defaultThrottlingStrategy(RetryMode.STANDARD) : BackoffStrategy.none() + ) + .build() + ); + return ClientOverrideConfiguration.builder() - .retryPolicy( - RetryPolicy.builder() - .numRetries(clientSettings.maxRetries) - .throttlingBackoffStrategy( - clientSettings.throttleRetries - ? BackoffStrategy.defaultThrottlingStrategy(RetryMode.STANDARD) - : BackoffStrategy.none() - ) - .build() - ) + .retryPolicy(retryPolicy) .apiCallAttemptTimeout(Duration.ofMillis(clientSettings.requestTimeoutMillis)) .build(); } @@ -346,12 +346,7 @@ static AwsCredentialsProvider buildCredentials(Logger logger, S3ClientSettings c // valid paths. @SuppressForbidden(reason = "Need to provide this override to v2 SDK so that path does not default to home path") private static void setDefaultAwsProfilePath() { - if (ProfileFileSystemSetting.AWS_SHARED_CREDENTIALS_FILE.getStringValue().isEmpty()) { - System.setProperty(ProfileFileSystemSetting.AWS_SHARED_CREDENTIALS_FILE.property(), System.getProperty("opensearch.path.conf")); - } - if (ProfileFileSystemSetting.AWS_CONFIG_FILE.getStringValue().isEmpty()) { - System.setProperty(ProfileFileSystemSetting.AWS_CONFIG_FILE.property(), System.getProperty("opensearch.path.conf")); - } + S3Service.setDefaultAwsProfilePath(); } private static IrsaCredentials buildFromEnvironment(IrsaCredentials defaults) { @@ -443,5 +438,6 @@ public AwsCredentials resolveCredentials() { @Override public void close() { releaseCachedClients(); + } } diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java index 1a402e8431e25..8690a5c91680a 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java @@ -43,9 +43,6 @@ import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload; import software.amazon.awssdk.services.s3.model.CompletedPart; import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest; -import software.amazon.awssdk.services.s3.model.Delete; -import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest; -import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse; import software.amazon.awssdk.services.s3.model.GetObjectAttributesRequest; import software.amazon.awssdk.services.s3.model.GetObjectAttributesResponse; import software.amazon.awssdk.services.s3.model.GetObjectRequest; @@ -55,9 +52,7 @@ import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; import software.amazon.awssdk.services.s3.model.NoSuchKeyException; import software.amazon.awssdk.services.s3.model.ObjectAttributes; -import software.amazon.awssdk.services.s3.model.ObjectIdentifier; import software.amazon.awssdk.services.s3.model.PutObjectRequest; -import software.amazon.awssdk.services.s3.model.S3Error; import software.amazon.awssdk.services.s3.model.ServerSideEncryption; import software.amazon.awssdk.services.s3.model.UploadPartRequest; import software.amazon.awssdk.services.s3.model.UploadPartResponse; @@ -68,7 +63,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.opensearch.ExceptionsHelper; +import org.opensearch.action.support.PlainActionFuture; import org.opensearch.common.Nullable; import org.opensearch.common.SetOnce; import org.opensearch.common.StreamContext; @@ -101,11 +96,8 @@ import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; -import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; @@ -381,125 +373,17 @@ public void writeBlobAtomic(String blobName, InputStream inputStream, long blobS } @Override - public DeleteResult delete() throws IOException { - final AtomicLong deletedBlobs = new AtomicLong(); - final AtomicLong deletedBytes = new AtomicLong(); - try (AmazonS3Reference clientReference = blobStore.clientReference()) { - ListObjectsV2Iterable listObjectsIterable = SocketAccess.doPrivileged( - () -> clientReference.get() - .listObjectsV2Paginator( - ListObjectsV2Request.builder() - .bucket(blobStore.bucket()) - .prefix(keyPath) - .overrideConfiguration( - o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().listObjectsMetricPublisher) - ) - .build() - ) - ); - - Iterator listObjectsResponseIterator = listObjectsIterable.iterator(); - while (listObjectsResponseIterator.hasNext()) { - ListObjectsV2Response listObjectsResponse = SocketAccess.doPrivileged(listObjectsResponseIterator::next); - List blobsToDelete = listObjectsResponse.contents().stream().map(s3Object -> { - deletedBlobs.incrementAndGet(); - deletedBytes.addAndGet(s3Object.size()); - - return s3Object.key(); - }).collect(Collectors.toList()); - - if (!listObjectsResponseIterator.hasNext()) { - blobsToDelete.add(keyPath); - } - - doDeleteBlobs(blobsToDelete, false); - } - } catch (SdkException e) { - throw new IOException("Exception when deleting blob container [" + keyPath + "]", e); - } - - return new DeleteResult(deletedBlobs.get(), deletedBytes.get()); + public DeleteResult delete() { + PlainActionFuture future = new PlainActionFuture<>(); + deleteAsync(future); + return future.actionGet(); } @Override - public void deleteBlobsIgnoringIfNotExists(List blobNames) throws IOException { - doDeleteBlobs(blobNames, true); - } - - private void doDeleteBlobs(List blobNames, boolean relative) throws IOException { - if (blobNames.isEmpty()) { - return; - } - final Set outstanding; - if (relative) { - outstanding = blobNames.stream().map(this::buildKey).collect(Collectors.toSet()); - } else { - outstanding = new HashSet<>(blobNames); - } - try (AmazonS3Reference clientReference = blobStore.clientReference()) { - // S3 API allows 1k blobs per delete so we split up the given blobs into requests of bulk size deletes - final List deleteRequests = new ArrayList<>(); - final List partition = new ArrayList<>(); - for (String key : outstanding) { - partition.add(key); - if (partition.size() == blobStore.getBulkDeletesSize()) { - deleteRequests.add(bulkDelete(blobStore.bucket(), partition)); - partition.clear(); - } - } - if (partition.isEmpty() == false) { - deleteRequests.add(bulkDelete(blobStore.bucket(), partition)); - } - SocketAccess.doPrivilegedVoid(() -> { - SdkException aex = null; - for (DeleteObjectsRequest deleteRequest : deleteRequests) { - List keysInRequest = deleteRequest.delete() - .objects() - .stream() - .map(ObjectIdentifier::key) - .collect(Collectors.toList()); - try { - DeleteObjectsResponse deleteObjectsResponse = clientReference.get().deleteObjects(deleteRequest); - outstanding.removeAll(keysInRequest); - outstanding.addAll(deleteObjectsResponse.errors().stream().map(S3Error::key).collect(Collectors.toSet())); - if (!deleteObjectsResponse.errors().isEmpty()) { - logger.warn( - () -> new ParameterizedMessage( - "Failed to delete some blobs {}", - deleteObjectsResponse.errors() - .stream() - .map(s3Error -> "[" + s3Error.key() + "][" + s3Error.code() + "][" + s3Error.message() + "]") - .collect(Collectors.toList()) - ) - ); - } - } catch (SdkException e) { - // The AWS client threw any unexpected exception and did not execute the request at all so we do not - // remove any keys from the outstanding deletes set. - aex = ExceptionsHelper.useOrSuppress(aex, e); - } - } - if (aex != null) { - throw aex; - } - }); - } catch (Exception e) { - throw new IOException("Failed to delete blobs [" + outstanding + "]", e); - } - assert outstanding.isEmpty(); - } - - private DeleteObjectsRequest bulkDelete(String bucket, List blobs) { - return DeleteObjectsRequest.builder() - .bucket(bucket) - .delete( - Delete.builder() - .objects(blobs.stream().map(blob -> ObjectIdentifier.builder().key(blob).build()).collect(Collectors.toList())) - .quiet(true) - .build() - ) - .overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().deleteObjectsMetricPublisher)) - .build(); + public void deleteBlobsIgnoringIfNotExists(List blobNames) { + PlainActionFuture future = new PlainActionFuture<>(); + deleteBlobsAsyncIgnoringIfNotExists(blobNames, future); + future.actionGet(); } @Override @@ -886,7 +770,11 @@ public void deleteAsync(ActionListener completionListener) { try (AmazonAsyncS3Reference asyncClientReference = blobStore.asyncClientReference()) { S3AsyncClient s3AsyncClient = asyncClientReference.get().client(); - ListObjectsV2Request listRequest = ListObjectsV2Request.builder().bucket(blobStore.bucket()).prefix(keyPath).build(); + ListObjectsV2Request listRequest = ListObjectsV2Request.builder() + .bucket(blobStore.bucket()) + .prefix(keyPath) + .overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().listObjectsMetricPublisher)) + .build(); ListObjectsV2Publisher listPublisher = s3AsyncClient.listObjectsV2Paginator(listRequest); AtomicLong deletedBlobs = new AtomicLong(); diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java index 1048ec784ec4e..72a812339e387 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java @@ -93,19 +93,19 @@ public class S3RepositoryPlugin extends Plugin implements RepositoryPlugin, Relo private static final String NORMAL_TRANSFER_QUEUE_CONSUMER = "normal_transfer_queue_consumer"; protected final S3Service service; - private final S3AsyncService s3AsyncService; + protected final S3AsyncService s3AsyncService; private final Path configPath; - private AsyncExecutorContainer urgentExecutorBuilder; - private AsyncExecutorContainer priorityExecutorBuilder; - private AsyncExecutorContainer normalExecutorBuilder; + protected AsyncExecutorContainer urgentExecutorBuilder; + protected AsyncExecutorContainer priorityExecutorBuilder; + protected AsyncExecutorContainer normalExecutorBuilder; private ExecutorService lowTransferQConsumerService; private ExecutorService normalTransferQConsumerService; - private SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ; - private SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ; - private TransferSemaphoresHolder transferSemaphoresHolder; - private GenericStatsMetricPublisher genericStatsMetricPublisher; + protected SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ; + protected SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ; + protected TransferSemaphoresHolder transferSemaphoresHolder; + protected GenericStatsMetricPublisher genericStatsMetricPublisher; public S3RepositoryPlugin(final Settings settings, final Path configPath) { this(settings, configPath, new S3Service(configPath), new S3AsyncService(configPath)); @@ -387,5 +387,8 @@ public void reload(Settings settings) { public void close() throws IOException { service.close(); s3AsyncService.close(); + urgentExecutorBuilder.getAsyncTransferEventLoopGroup().close(); + priorityExecutorBuilder.getAsyncTransferEventLoopGroup().close(); + normalExecutorBuilder.getAsyncTransferEventLoopGroup().close(); } } diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java index 2cb11541d924f..53371cd1529ce 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java @@ -48,6 +48,7 @@ import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse; import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest; import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse; +import software.amazon.awssdk.services.s3.model.DeletedObject; import software.amazon.awssdk.services.s3.model.GetObjectAttributesParts; import software.amazon.awssdk.services.s3.model.GetObjectAttributesRequest; import software.amazon.awssdk.services.s3.model.GetObjectAttributesResponse; @@ -92,7 +93,6 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -102,6 +102,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -286,9 +287,8 @@ public int numberOfPagesFetched() { } } - public void testDelete() throws IOException { + public void testDelete() throws Exception { final String bucketName = randomAlphaOfLengthBetween(1, 10); - final BlobPath blobPath = new BlobPath(); int bulkDeleteSize = 5; @@ -297,147 +297,314 @@ public void testDelete() throws IOException { when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher()); when(blobStore.getBulkDeletesSize()).thenReturn(bulkDeleteSize); - final S3Client client = mock(S3Client.class); - doAnswer(invocation -> new AmazonS3Reference(client)).when(blobStore).clientReference(); + final S3AsyncClient s3AsyncClient = mock(S3AsyncClient.class); + final AmazonAsyncS3Reference asyncClientReference = mock(AmazonAsyncS3Reference.class); + when(blobStore.asyncClientReference()).thenReturn(asyncClientReference); + AmazonAsyncS3WithCredentials amazonAsyncS3WithCredentials = AmazonAsyncS3WithCredentials.create( + s3AsyncClient, + s3AsyncClient, + s3AsyncClient, + null + ); + when(asyncClientReference.get()).thenReturn(amazonAsyncS3WithCredentials); - ListObjectsV2Iterable listObjectsV2Iterable = mock(ListObjectsV2Iterable.class); + final ListObjectsV2Publisher listPublisher = mock(ListObjectsV2Publisher.class); final int totalPageCount = 3; final long s3ObjectSize = ByteSizeUnit.MB.toBytes(5); final int s3ObjectsPerPage = 5; - MockListObjectsV2ResponseIterator listObjectsV2ResponseIterator = new MockListObjectsV2ResponseIterator( - totalPageCount, - s3ObjectsPerPage, - s3ObjectSize - ); - when(listObjectsV2Iterable.iterator()).thenReturn(listObjectsV2ResponseIterator); - when(client.listObjectsV2Paginator(any(ListObjectsV2Request.class))).thenReturn(listObjectsV2Iterable); - final List keysDeleted = new ArrayList<>(); - AtomicInteger deleteCount = new AtomicInteger(); + List responses = new ArrayList<>(); + List allObjects = new ArrayList<>(); + long totalSize = 0; + + for (int i = 0; i < totalPageCount; i++) { + List pageObjects = new ArrayList<>(); + for (int j = 0; j < s3ObjectsPerPage; j++) { + pageObjects.add(S3Object.builder().key(randomAlphaOfLength(10)).size(s3ObjectSize).build()); + totalSize += s3ObjectSize; + } + allObjects.addAll(pageObjects); + responses.add(ListObjectsV2Response.builder().contents(pageObjects).build()); + } + + AtomicInteger counter = new AtomicInteger(); doAnswer(invocation -> { - DeleteObjectsRequest deleteObjectsRequest = invocation.getArgument(0); - deleteCount.getAndIncrement(); - logger.info("Object sizes are{}", deleteObjectsRequest.delete().objects().size()); - keysDeleted.addAll(deleteObjectsRequest.delete().objects().stream().map(ObjectIdentifier::key).collect(Collectors.toList())); - return DeleteObjectsResponse.builder().build(); - }).when(client).deleteObjects(any(DeleteObjectsRequest.class)); + Subscriber subscriber = invocation.getArgument(0); + subscriber.onSubscribe(new Subscription() { + @Override + public void request(long n) { + int currentCounter = counter.getAndIncrement(); + if (currentCounter < responses.size()) { + subscriber.onNext(responses.get(currentCounter)); + } + if (currentCounter == responses.size() - 1) { + subscriber.onComplete(); + } + } + + @Override + public void cancel() {} + }); + return null; + }).when(listPublisher).subscribe(ArgumentMatchers.>any()); + + when(s3AsyncClient.listObjectsV2Paginator(any(ListObjectsV2Request.class))).thenReturn(listPublisher); + + when(s3AsyncClient.deleteObjects(any(DeleteObjectsRequest.class))).thenReturn( + CompletableFuture.completedFuture(DeleteObjectsResponse.builder().build()) + ); final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore); - DeleteResult deleteResult = blobContainer.delete(); - assertEquals(s3ObjectSize * s3ObjectsPerPage * totalPageCount, deleteResult.bytesDeleted()); - assertEquals(s3ObjectsPerPage * totalPageCount, deleteResult.blobsDeleted()); - // keysDeleted will have blobPath also - assertEquals(listObjectsV2ResponseIterator.getKeysListed().size(), keysDeleted.size() - 1); - assertTrue(keysDeleted.contains(blobPath.buildAsString())); - // keysDeleted will have blobPath also - assertEquals((int) Math.ceil(((double) keysDeleted.size() + 1) / bulkDeleteSize), deleteCount.get()); - keysDeleted.remove(blobPath.buildAsString()); - assertEquals(new HashSet<>(listObjectsV2ResponseIterator.getKeysListed()), new HashSet<>(keysDeleted)); + CountDownLatch latch = new CountDownLatch(1); + AtomicReference resultRef = new AtomicReference<>(); + + blobContainer.deleteAsync(new ActionListener<>() { + @Override + public void onResponse(DeleteResult deleteResult) { + resultRef.set(deleteResult); + latch.countDown(); + } + + @Override + public void onFailure(Exception e) { + fail("Unexpected failure: " + e.getMessage()); + } + }); + + assertTrue(latch.await(5, TimeUnit.SECONDS)); + DeleteResult result = resultRef.get(); + + assertEquals(totalSize, result.bytesDeleted()); + assertEquals(allObjects.size(), result.blobsDeleted()); + + verify(s3AsyncClient, times(1)).listObjectsV2Paginator(any(ListObjectsV2Request.class)); + int expectedDeleteCalls = (int) Math.ceil((double) allObjects.size() / bulkDeleteSize); + verify(s3AsyncClient, times(expectedDeleteCalls)).deleteObjects(any(DeleteObjectsRequest.class)); } - public void testDeleteItemLevelErrorsDuringDelete() { + public void testDeleteItemLevelErrorsDuringDelete() throws Exception { final String bucketName = randomAlphaOfLengthBetween(1, 10); - final BlobPath blobPath = new BlobPath(); final S3BlobStore blobStore = mock(S3BlobStore.class); when(blobStore.bucket()).thenReturn(bucketName); when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher()); + int bulkDeleteSize = 3; // Small size to force multiple delete requests + when(blobStore.getBulkDeletesSize()).thenReturn(bulkDeleteSize); - final S3Client client = mock(S3Client.class); - doAnswer(invocation -> new AmazonS3Reference(client)).when(blobStore).clientReference(); + final S3AsyncClient s3AsyncClient = mock(S3AsyncClient.class); + final AmazonAsyncS3Reference asyncClientReference = mock(AmazonAsyncS3Reference.class); + when(blobStore.asyncClientReference()).thenReturn(asyncClientReference); + when(asyncClientReference.get()).thenReturn(AmazonAsyncS3WithCredentials.create(s3AsyncClient, s3AsyncClient, s3AsyncClient, null)); - ListObjectsV2Iterable listObjectsV2Iterable = mock(ListObjectsV2Iterable.class); - final int totalPageCount = 3; - final long s3ObjectSize = ByteSizeUnit.MB.toBytes(5); - final int s3ObjectsPerPage = 5; - MockListObjectsV2ResponseIterator listObjectsV2ResponseIterator = new MockListObjectsV2ResponseIterator( - totalPageCount, - s3ObjectsPerPage, - s3ObjectSize - ); - when(listObjectsV2Iterable.iterator()).thenReturn(listObjectsV2ResponseIterator); - when(client.listObjectsV2Paginator(any(ListObjectsV2Request.class))).thenReturn(listObjectsV2Iterable); + final ListObjectsV2Publisher listPublisher = mock(ListObjectsV2Publisher.class); + final int totalObjects = 10; + List s3Objects = new ArrayList<>(); + for (int i = 0; i < totalObjects; i++) { + s3Objects.add(S3Object.builder().key("key-" + i).size(100L).build()); + } - final List keysFailedDeletion = new ArrayList<>(); + AtomicBoolean onNext = new AtomicBoolean(false); doAnswer(invocation -> { - DeleteObjectsRequest deleteObjectsRequest = invocation.getArgument(0); - int i = 0; - for (ObjectIdentifier objectIdentifier : deleteObjectsRequest.delete().objects()) { + Subscriber subscriber = invocation.getArgument(0); + subscriber.onSubscribe(new Subscription() { + @Override + public void request(long n) { + if (onNext.compareAndSet(false, true)) { + subscriber.onNext(ListObjectsV2Response.builder().contents(s3Objects).build()); + } else { + subscriber.onComplete(); + } + } + + @Override + public void cancel() {} + }); + return null; + }).when(listPublisher).subscribe(ArgumentMatchers.>any()); + + when(s3AsyncClient.listObjectsV2Paginator(any(ListObjectsV2Request.class))).thenReturn(listPublisher); + + // Simulate item-level errors during delete + AtomicInteger deleteCallCount = new AtomicInteger(0); + when(s3AsyncClient.deleteObjects(any(DeleteObjectsRequest.class))).thenAnswer(invocation -> { + DeleteObjectsRequest request = invocation.getArgument(0); + List errors = new ArrayList<>(); + List deletedObjects = new ArrayList<>(); + + for (int i = 0; i < request.delete().objects().size(); i++) { if (i % 2 == 0) { - keysFailedDeletion.add(objectIdentifier.key()); + errors.add( + S3Error.builder() + .key(request.delete().objects().get(i).key()) + .code("InternalError") + .message("Simulated error") + .build() + ); + } else { + deletedObjects.add(DeletedObject.builder().key(request.delete().objects().get(i).key()).build()); } - i++; } - return DeleteObjectsResponse.builder() - .errors(keysFailedDeletion.stream().map(key -> S3Error.builder().key(key).build()).collect(Collectors.toList())) - .build(); - }).when(client).deleteObjects(any(DeleteObjectsRequest.class)); + + deleteCallCount.incrementAndGet(); + return CompletableFuture.completedFuture(DeleteObjectsResponse.builder().errors(errors).deleted(deletedObjects).build()); + }); final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore); - assertThrows(AssertionError.class, blobContainer::delete); + CountDownLatch latch = new CountDownLatch(1); + AtomicReference resultRef = new AtomicReference<>(); + AtomicReference exceptionRef = new AtomicReference<>(); + + blobContainer.deleteAsync(new ActionListener<>() { + @Override + public void onResponse(DeleteResult deleteResult) { + resultRef.set(deleteResult); + latch.countDown(); + } + + @Override + public void onFailure(Exception e) { + exceptionRef.set(e); + latch.countDown(); + } + }); + + assertTrue(latch.await(5, TimeUnit.SECONDS)); + + assertNull("Unexpected exception: " + exceptionRef.get(), exceptionRef.get()); + DeleteResult result = resultRef.get(); + assertNotNull("Expected DeleteResult but got null", result); + + // We expect half of the objects to be deleted successfully + // But as of today, the blob delete count and bytes is updated a bit earlier. + assertEquals(totalObjects, result.blobsDeleted()); + assertEquals(totalObjects * 100L, result.bytesDeleted()); + + verify(s3AsyncClient, times(1)).listObjectsV2Paginator(any(ListObjectsV2Request.class)); + + // Calculate expected number of deleteObjects calls + int expectedDeleteCalls = (int) Math.ceil((double) totalObjects / bulkDeleteSize); + assertEquals(expectedDeleteCalls, deleteCallCount.get()); } - public void testDeleteSdkExceptionDuringListOperation() { + public void testDeleteSdkExceptionDuringListOperation() throws Exception { final String bucketName = randomAlphaOfLengthBetween(1, 10); - final BlobPath blobPath = new BlobPath(); final S3BlobStore blobStore = mock(S3BlobStore.class); when(blobStore.bucket()).thenReturn(bucketName); when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher()); - final S3Client client = mock(S3Client.class); - doAnswer(invocation -> new AmazonS3Reference(client)).when(blobStore).clientReference(); + final S3AsyncClient s3AsyncClient = mock(S3AsyncClient.class); + final AmazonAsyncS3Reference asyncClientReference = mock(AmazonAsyncS3Reference.class); + when(blobStore.asyncClientReference()).thenReturn(asyncClientReference); + when(asyncClientReference.get()).thenReturn(AmazonAsyncS3WithCredentials.create(s3AsyncClient, s3AsyncClient, s3AsyncClient, null)); - ListObjectsV2Iterable listObjectsV2Iterable = mock(ListObjectsV2Iterable.class); - final int totalPageCount = 3; - final long s3ObjectSize = ByteSizeUnit.MB.toBytes(5); - final int s3ObjectsPerPage = 5; - MockListObjectsV2ResponseIterator listObjectsV2ResponseIterator = new MockListObjectsV2ResponseIterator( - totalPageCount, - s3ObjectsPerPage, - s3ObjectSize - ); - when(listObjectsV2Iterable.iterator()).thenReturn(listObjectsV2ResponseIterator); - when(client.listObjectsV2Paginator(any(ListObjectsV2Request.class))).thenReturn(listObjectsV2Iterable); + final ListObjectsV2Publisher listPublisher = mock(ListObjectsV2Publisher.class); + doAnswer(invocation -> { + Subscriber subscriber = invocation.getArgument(0); + subscriber.onSubscribe(new Subscription() { + @Override + public void request(long n) { + subscriber.onError(new RuntimeException("Simulated listing error")); + } + + @Override + public void cancel() {} + }); + return null; + }).when(listPublisher).subscribe(ArgumentMatchers.>any()); + + when(s3AsyncClient.listObjectsV2Paginator(any(ListObjectsV2Request.class))).thenReturn(listPublisher); final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore); - assertThrows(IOException.class, blobContainer::delete); + CountDownLatch latch = new CountDownLatch(1); + AtomicReference exceptionRef = new AtomicReference<>(); + + blobContainer.deleteAsync(new ActionListener<>() { + @Override + public void onResponse(DeleteResult deleteResult) { + fail("Expected failure but got success"); + } + + @Override + public void onFailure(Exception e) { + exceptionRef.set(e); + latch.countDown(); + } + }); + + assertTrue(latch.await(5, TimeUnit.SECONDS)); + assertNotNull(exceptionRef.get()); + assertEquals(IOException.class, exceptionRef.get().getClass()); + assertEquals("Failed to list objects for deletion", exceptionRef.get().getMessage()); } - public void testDeleteSdkExceptionDuringDeleteOperation() { + public void testDeleteSdkExceptionDuringDeleteOperation() throws Exception { final String bucketName = randomAlphaOfLengthBetween(1, 10); - final BlobPath blobPath = new BlobPath(); + int bulkDeleteSize = 5; final S3BlobStore blobStore = mock(S3BlobStore.class); when(blobStore.bucket()).thenReturn(bucketName); + when(blobStore.getBulkDeletesSize()).thenReturn(bulkDeleteSize); when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher()); - final S3Client client = mock(S3Client.class); - doAnswer(invocation -> new AmazonS3Reference(client)).when(blobStore).clientReference(); + final S3AsyncClient s3AsyncClient = mock(S3AsyncClient.class); + final AmazonAsyncS3Reference asyncClientReference = mock(AmazonAsyncS3Reference.class); + when(blobStore.asyncClientReference()).thenReturn(asyncClientReference); + when(asyncClientReference.get()).thenReturn(AmazonAsyncS3WithCredentials.create(s3AsyncClient, s3AsyncClient, s3AsyncClient, null)); - ListObjectsV2Iterable listObjectsV2Iterable = mock(ListObjectsV2Iterable.class); - final int totalPageCount = 3; - final long s3ObjectSize = ByteSizeUnit.MB.toBytes(5); - final int s3ObjectsPerPage = 5; - MockListObjectsV2ResponseIterator listObjectsV2ResponseIterator = new MockListObjectsV2ResponseIterator( - totalPageCount, - s3ObjectsPerPage, - s3ObjectSize - ); - when(listObjectsV2Iterable.iterator()).thenReturn(listObjectsV2ResponseIterator); - when(client.listObjectsV2Paginator(any(ListObjectsV2Request.class))).thenReturn(listObjectsV2Iterable); + final ListObjectsV2Publisher listPublisher = mock(ListObjectsV2Publisher.class); + doAnswer(invocation -> { + Subscriber subscriber = invocation.getArgument(0); + subscriber.onSubscribe(new Subscription() { + @Override + public void request(long n) { + subscriber.onNext( + ListObjectsV2Response.builder().contents(S3Object.builder().key("test-key").size(100L).build()).build() + ); + subscriber.onComplete(); + } + + @Override + public void cancel() {} + }); + return null; + }).when(listPublisher).subscribe(ArgumentMatchers.>any()); - when(client.deleteObjects(any(DeleteObjectsRequest.class))).thenThrow(SdkException.builder().build()); + when(s3AsyncClient.listObjectsV2Paginator(any(ListObjectsV2Request.class))).thenReturn(listPublisher); + + CompletableFuture failedFuture = new CompletableFuture<>(); + failedFuture.completeExceptionally(new RuntimeException("Simulated delete error")); + when(s3AsyncClient.deleteObjects(any(DeleteObjectsRequest.class))).thenReturn(failedFuture); final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore); - assertThrows(IOException.class, blobContainer::delete); + CountDownLatch latch = new CountDownLatch(1); + AtomicReference exceptionRef = new AtomicReference<>(); + + blobContainer.deleteAsync(new ActionListener<>() { + @Override + public void onResponse(DeleteResult deleteResult) { + fail("Expected failure but got success"); + } + + @Override + public void onFailure(Exception e) { + exceptionRef.set(e); + latch.countDown(); + } + }); + + assertTrue(latch.await(5, TimeUnit.SECONDS)); + assertNotNull(exceptionRef.get()); + logger.error("", exceptionRef.get()); + assertTrue(exceptionRef.get() instanceof CompletionException); + assertEquals("java.lang.RuntimeException: Simulated delete error", exceptionRef.get().getMessage()); } public void testExecuteSingleUpload() throws IOException { diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3RepositoryPluginTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3RepositoryPluginTests.java index 9ac1564c807c3..c0ee9cb6d980f 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3RepositoryPluginTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3RepositoryPluginTests.java @@ -8,6 +8,7 @@ package org.opensearch.repositories.s3; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.SizeUnit; import org.opensearch.common.unit.SizeValue; @@ -25,6 +26,8 @@ import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.notNullValue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class S3RepositoryPluginTests extends OpenSearchTestCase { @@ -37,8 +40,6 @@ public void testGetExecutorBuilders() throws IOException { ThreadPool threadPool = null; try (S3RepositoryPlugin plugin = new S3RepositoryPlugin(settings, configPath)) { List> executorBuilders = plugin.getExecutorBuilders(settings); - assertNotNull(executorBuilders); - assertFalse(executorBuilders.isEmpty()); threadPool = new ThreadPool(settings, executorBuilders.toArray(new ExecutorBuilder[0])); final Executor executor = threadPool.executor(URGENT_FUTURE_COMPLETION); assertNotNull(executor); @@ -57,6 +58,12 @@ public void testGetExecutorBuilders() throws IOException { assertThat(info.getMax(), equalTo(size)); assertThat(openSearchThreadPoolExecutor.getMaximumPoolSize(), equalTo(size)); + ClusterService clusterService = mock(ClusterService.class); + when(clusterService.getSettings()).thenReturn(Settings.EMPTY); + plugin.createComponents(null, clusterService, threadPool, null, null, null, null, null, null, null, null); + assertNotNull(executorBuilders); + assertFalse(executorBuilders.isEmpty()); + final int availableProcessors = Runtime.getRuntime().availableProcessors(); if (processors > availableProcessors) { assertWarnings( diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index c4f3e771143b1..92dfd96531877 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -788,7 +788,6 @@ public void apply(Settings value, Settings current, Settings previous) { // Snapshot related Settings BlobStoreRepository.SNAPSHOT_SHARD_PATH_PREFIX_SETTING, - BlobStoreRepository.SNAPSHOT_ASYNC_DELETION_ENABLE_SETTING, BlobStoreRepository.SNAPSHOT_REPOSITORY_DATA_CACHE_THRESHOLD, // Composite index settings diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index 0cbe49690e773..c23aefdceb065 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -50,7 +50,6 @@ import org.opensearch.action.ActionRunnable; import org.opensearch.action.StepListener; import org.opensearch.action.support.GroupedActionListener; -import org.opensearch.action.support.PlainActionFuture; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateUpdateTask; import org.opensearch.cluster.RepositoryCleanupInProgress; @@ -70,7 +69,6 @@ import org.opensearch.common.Randomness; import org.opensearch.common.SetOnce; import org.opensearch.common.UUIDs; -import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; @@ -429,16 +427,6 @@ protected static long calculateMaxWithinIntLimit(long defaultThresholdOfHeap, lo Setting.Property.Final ); - /** - * Controls the fixed prefix for the snapshot shard blob path. cluster.snapshot.async-deletion.enable - */ - public static final Setting SNAPSHOT_ASYNC_DELETION_ENABLE_SETTING = Setting.boolSetting( - "cluster.snapshot.async-deletion.enable", - true, - Setting.Property.NodeScope, - Setting.Property.Dynamic - ); - protected volatile boolean supportURLRepo; private volatile int maxShardBlobDeleteBatch; @@ -532,8 +520,6 @@ protected static long calculateMaxWithinIntLimit(long defaultThresholdOfHeap, lo private final String snapshotShardPathPrefix; - private volatile boolean enableAsyncDeletion; - protected final long repositoryDataCacheThreshold; /** @@ -591,8 +577,6 @@ protected BlobStoreRepository( this.recoverySettings = recoverySettings; this.remoteStoreSettings = new RemoteStoreSettings(clusterService.getSettings(), clusterService.getClusterSettings()); this.snapshotShardPathPrefix = SNAPSHOT_SHARD_PATH_PREFIX_SETTING.get(clusterService.getSettings()); - this.enableAsyncDeletion = SNAPSHOT_ASYNC_DELETION_ENABLE_SETTING.get(clusterService.getSettings()); - clusterService.getClusterSettings().addSettingsUpdateConsumer(SNAPSHOT_ASYNC_DELETION_ENABLE_SETTING, this::setEnableAsyncDeletion); this.repositoryDataCacheThreshold = SNAPSHOT_REPOSITORY_DATA_CACHE_THRESHOLD.get(clusterService.getSettings()).getBytes(); } @@ -2293,15 +2277,7 @@ private void executeOneStaleIndexDelete( private DeleteResult deleteContainer(BlobContainer container) throws IOException { long startTime = System.nanoTime(); - DeleteResult deleteResult; - if (enableAsyncDeletion && container instanceof AsyncMultiStreamBlobContainer) { - // Use deleteAsync and wait for the result - PlainActionFuture future = new PlainActionFuture<>(); - ((AsyncMultiStreamBlobContainer) container).deleteAsync(future); - deleteResult = future.actionGet(); - } else { - deleteResult = container.delete(); - } + DeleteResult deleteResult = container.delete(); logger.debug(new ParameterizedMessage("[{}] Deleted {} in {}ns", metadata.name(), container.path(), startTime - System.nanoTime())); return deleteResult; } @@ -2961,13 +2937,7 @@ public IndexMetadata getSnapshotIndexMetaData(RepositoryData repositoryData, Sna private void deleteFromContainer(BlobContainer container, List blobs) throws IOException { logger.trace(() -> new ParameterizedMessage("[{}] Deleting {} from [{}]", metadata.name(), blobs, container.path())); long startTime = System.nanoTime(); - if (enableAsyncDeletion && container instanceof AsyncMultiStreamBlobContainer) { - PlainActionFuture future = new PlainActionFuture<>(); - ((AsyncMultiStreamBlobContainer) container).deleteBlobsAsyncIgnoringIfNotExists(blobs, future); - future.actionGet(); - } else { - container.deleteBlobsIgnoringIfNotExists(blobs); - } + container.deleteBlobsIgnoringIfNotExists(blobs); logger.debug( () -> new ParameterizedMessage( "[{}] Deletion {} from [{}] took {}ns", @@ -4894,8 +4864,4 @@ public String toString() { return name; } } - - public void setEnableAsyncDeletion(boolean enableAsyncDeletion) { - this.enableAsyncDeletion = enableAsyncDeletion; - } }