From fe8ba21c32a98a7f4a971e41024dd2017f00de16 Mon Sep 17 00:00:00 2001 From: Sandeep Kumawat Date: Mon, 8 Apr 2024 13:04:54 +0530 Subject: [PATCH] Implement interface changes for s3 plugin to read/write blob with object metadata Signed-off-by: Sandeep Kumawat --- .../common/io/InputStreamContainer.java | 12 ++- .../repositories/s3/S3BlobContainer.java | 82 +++++++++++++++++-- .../s3/S3RetryingInputStream.java | 7 ++ .../s3/async/AsyncTransferManager.java | 2 + .../repositories/s3/async/UploadRequest.java | 11 ++- .../s3/S3BlobContainerMockClientTests.java | 15 +++- .../s3/S3BlobContainerRetriesTests.java | 61 ++++++++++++-- .../s3/S3BlobStoreContainerTests.java | 32 ++++++-- .../s3/async/AsyncTransferManagerTests.java | 16 ++-- .../mocks/MockFsAsyncBlobContainer.java | 7 +- ...syncMultiStreamEncryptedBlobContainer.java | 2 +- .../blobstore/EncryptedBlobContainer.java | 2 +- .../transfer/RemoteTransferContainer.java | 2 +- ...ultiStreamEncryptedBlobContainerTests.java | 4 +- .../read/listener/FilePartWriterTests.java | 6 +- .../listener/ReadContextListenerTests.java | 6 +- 16 files changed, 217 insertions(+), 50 deletions(-) diff --git a/libs/common/src/main/java/org/opensearch/common/io/InputStreamContainer.java b/libs/common/src/main/java/org/opensearch/common/io/InputStreamContainer.java index 3095336338f7f..cc5d60b0964a1 100644 --- a/libs/common/src/main/java/org/opensearch/common/io/InputStreamContainer.java +++ b/libs/common/src/main/java/org/opensearch/common/io/InputStreamContainer.java @@ -11,6 +11,7 @@ import org.opensearch.common.annotation.ExperimentalApi; import java.io.InputStream; +import java.util.Map; /** * Model composed of an input stream and the total content length of the stream @@ -23,6 +24,7 @@ public class InputStreamContainer { private final InputStream inputStream; private final long contentLength; private final long offset; + private final Map metadata; /** * Construct a new stream object @@ -30,10 +32,11 @@ public class InputStreamContainer { * @param inputStream The input stream that is to be encapsulated * @param contentLength The total content length that is to be read from the stream */ - public InputStreamContainer(InputStream inputStream, long contentLength, long offset) { + public InputStreamContainer(InputStream inputStream, long contentLength, long offset, Map metadata) { this.inputStream = inputStream; this.contentLength = contentLength; this.offset = offset; + this.metadata = metadata; } /** @@ -56,4 +59,11 @@ public long getContentLength() { public long getOffset() { return offset; } + + /** + * @return metadata of the source content. + */ + public Map getMetadata() { + return metadata; + } } diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java index 1f23a09a047f2..5e3bdf5269cf4 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java @@ -73,6 +73,7 @@ import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobDownloadResponse; import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStoreException; @@ -138,6 +139,13 @@ public boolean blobExists(String blobName) { } } + @ExperimentalApi + @Override + public BlobDownloadResponse readBlobWithMetadata(String blobName) throws IOException { + S3RetryingInputStream s3RetryingInputStream = new S3RetryingInputStream(blobStore, buildKey(blobName)); + return new BlobDownloadResponse(s3RetryingInputStream, s3RetryingInputStream.getMetadata()); + } + @Override public InputStream readBlob(String blobName) throws IOException { return new S3RetryingInputStream(blobStore, buildKey(blobName)); @@ -172,9 +180,32 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b assert inputStream.markSupported() : "No mark support on inputStream breaks the S3 SDK's ability to retry requests"; SocketAccess.doPrivilegedIOException(() -> { if (blobSize <= getLargeBlobThresholdInBytes()) { - executeSingleUpload(blobStore, buildKey(blobName), inputStream, blobSize); + executeSingleUpload(blobStore, buildKey(blobName), inputStream, blobSize, null); } else { - executeMultipartUpload(blobStore, buildKey(blobName), inputStream, blobSize); + executeMultipartUpload(blobStore, buildKey(blobName), inputStream, blobSize, null); + } + return null; + }); + } + + /** + * Write blob with its object metadata. + */ + @ExperimentalApi + @Override + public void writeBlobWithMetadata( + String blobName, + InputStream inputStream, + Map metadata, + long blobSize, + boolean failIfAlreadyExists + ) throws IOException { + assert inputStream.markSupported() : "No mark support on inputStream breaks the S3 SDK's ability to retry requests"; + SocketAccess.doPrivilegedIOException(() -> { + if (blobSize <= getLargeBlobThresholdInBytes()) { + executeSingleUpload(blobStore, buildKey(blobName), inputStream, blobSize, metadata); + } else { + executeMultipartUpload(blobStore, buildKey(blobName), inputStream, blobSize, metadata); } return null; }); @@ -190,7 +221,8 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener comp writeContext.getUploadFinalizer(), writeContext.doRemoteDataIntegrityCheck(), writeContext.getExpectedChecksum(), - blobStore.isUploadRetryEnabled() + blobStore.isUploadRetryEnabled(), + writeContext.getMetadata() ); try { if (uploadRequest.getContentLength() > ByteSizeUnit.GB.toBytes(10) && blobStore.isRedirectLargeUploads()) { @@ -203,7 +235,8 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener comp blobStore, uploadRequest.getKey(), inputStream.getInputStream(), - uploadRequest.getContentLength() + uploadRequest.getContentLength(), + uploadRequest.getMetadata() ); completionListener.onResponse(null); } catch (Exception ex) { @@ -309,6 +342,18 @@ public void writeBlobAtomic(String blobName, InputStream inputStream, long blobS writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists); } + @ExperimentalApi + @Override + public void writeBlobAtomicWithMetadata( + String blobName, + InputStream inputStream, + Map metadata, + long blobSize, + boolean failIfAlreadyExists + ) throws IOException { + writeBlobWithMetadata(blobName, inputStream, metadata, blobSize, failIfAlreadyExists); + } + @Override public DeleteResult delete() throws IOException { final AtomicLong deletedBlobs = new AtomicLong(); @@ -542,8 +587,13 @@ private String buildKey(String blobName) { /** * Uploads a blob using a single upload request */ - void executeSingleUpload(final S3BlobStore blobStore, final String blobName, final InputStream input, final long blobSize) - throws IOException { + void executeSingleUpload( + final S3BlobStore blobStore, + final String blobName, + final InputStream input, + final long blobSize, + final Map metadata + ) throws IOException { // Extra safety checks if (blobSize > MAX_FILE_SIZE.getBytes()) { @@ -560,6 +610,10 @@ void executeSingleUpload(final S3BlobStore blobStore, final String blobName, fin .storageClass(blobStore.getStorageClass()) .acl(blobStore.getCannedACL()) .overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().putObjectMetricPublisher)); + + if (metadata != null) { + putObjectRequestBuilder = putObjectRequestBuilder.metadata(metadata); + } if (blobStore.serverSideEncryption()) { putObjectRequestBuilder.serverSideEncryption(ServerSideEncryption.AES256); } @@ -583,8 +637,13 @@ void executeSingleUpload(final S3BlobStore blobStore, final String blobName, fin /** * Uploads a blob using multipart upload requests. */ - void executeMultipartUpload(final S3BlobStore blobStore, final String blobName, final InputStream input, final long blobSize) - throws IOException { + void executeMultipartUpload( + final S3BlobStore blobStore, + final String blobName, + final InputStream input, + final long blobSize, + final Map metadata + ) throws IOException { ensureMultiPartUploadSize(blobSize); final long partSize = blobStore.bufferSizeInBytes(); @@ -609,6 +668,10 @@ void executeMultipartUpload(final S3BlobStore blobStore, final String blobName, .acl(blobStore.getCannedACL()) .overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().multipartUploadMetricCollector)); + if (metadata != null) { + createMultipartUploadRequestBuilder.metadata(metadata); + } + if (blobStore.serverSideEncryption()) { createMultipartUploadRequestBuilder.serverSideEncryption(ServerSideEncryption.AES256); } @@ -767,11 +830,12 @@ static InputStreamContainer transformResponseToInputStreamContainer( final GetObjectResponse getObjectResponse = streamResponse.response(); final String contentRange = getObjectResponse.contentRange(); final Long contentLength = getObjectResponse.contentLength(); + final Map metadata = getObjectResponse.metadata(); if ((isMultipartObject && contentRange == null) || contentLength == null) { throw SdkException.builder().message("Failed to fetch required metadata for blob part").build(); } final long offset = isMultipartObject ? HttpRangeUtils.getStartOffsetFromRangeHeader(getObjectResponse.contentRange()) : 0L; - return new InputStreamContainer(streamResponse, getObjectResponse.contentLength(), offset); + return new InputStreamContainer(streamResponse, getObjectResponse.contentLength(), offset, metadata); } /** diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RetryingInputStream.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RetryingInputStream.java index d7e47e0ab1bcc..9459a9ab9d88b 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RetryingInputStream.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RetryingInputStream.java @@ -48,6 +48,7 @@ import java.nio.file.NoSuchFileException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -77,6 +78,7 @@ class S3RetryingInputStream extends InputStream { private long currentOffset; private boolean closed; private boolean eof; + private Map metadata; S3RetryingInputStream(S3BlobStore blobStore, String blobKey) throws IOException { this(blobStore, blobKey, 0, Long.MAX_VALUE - 1); @@ -122,6 +124,7 @@ private void openStream() throws IOException { getObjectResponseInputStream.response().contentLength() ); this.currentStream = getObjectResponseInputStream; + this.metadata = getObjectResponseInputStream.response().metadata(); this.isStreamAborted.set(false); } catch (final SdkException e) { if (e instanceof S3Exception) { @@ -265,4 +268,8 @@ boolean isEof() { boolean isAborted() { return isStreamAborted.get(); } + + public Map getMetadata() { + return this.metadata; + } } diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java index 2259780c95276..9d74ecc64f480 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java @@ -129,6 +129,7 @@ private void uploadInParts( CreateMultipartUploadRequest.Builder createMultipartUploadRequestBuilder = CreateMultipartUploadRequest.builder() .bucket(uploadRequest.getBucket()) + .metadata(uploadRequest.getMetadata()) .key(uploadRequest.getKey()) .overrideConfiguration(o -> o.addMetricPublisher(statsMetricPublisher.multipartUploadMetricCollector)); if (uploadRequest.doRemoteDataIntegrityCheck()) { @@ -324,6 +325,7 @@ private void uploadInOneChunk( ) { PutObjectRequest.Builder putObjectRequestBuilder = PutObjectRequest.builder() .bucket(uploadRequest.getBucket()) + .metadata(uploadRequest.getMetadata()) .key(uploadRequest.getKey()) .contentLength(uploadRequest.getContentLength()) .overrideConfiguration(o -> o.addMetricPublisher(statsMetricPublisher.putObjectMetricPublisher)); diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/UploadRequest.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/UploadRequest.java index a5304dc4a97d6..7cfbcd85c13cc 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/UploadRequest.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/UploadRequest.java @@ -12,6 +12,7 @@ import org.opensearch.common.blobstore.stream.write.WritePriority; import java.io.IOException; +import java.util.Map; /** * A model encapsulating all details for an upload to S3 @@ -24,8 +25,8 @@ public class UploadRequest { private final CheckedConsumer uploadFinalizer; private final boolean doRemoteDataIntegrityCheck; private final Long expectedChecksum; - private boolean uploadRetryEnabled; + private final Map metadata; /** * Construct a new UploadRequest object @@ -46,7 +47,8 @@ public UploadRequest( CheckedConsumer uploadFinalizer, boolean doRemoteDataIntegrityCheck, Long expectedChecksum, - boolean uploadRetryEnabled + boolean uploadRetryEnabled, + Map metadata ) { this.bucket = bucket; this.key = key; @@ -56,6 +58,7 @@ public UploadRequest( this.doRemoteDataIntegrityCheck = doRemoteDataIntegrityCheck; this.expectedChecksum = expectedChecksum; this.uploadRetryEnabled = uploadRetryEnabled; + this.metadata = metadata; } public String getBucket() { @@ -89,4 +92,8 @@ public Long getExpectedChecksum() { public boolean isUploadRetryEnabled() { return uploadRetryEnabled; } + + public Map getMetadata() { + return metadata; + } } diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java index 9e830c409a58b..abc838a5a3c70 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java @@ -75,6 +75,7 @@ import static org.opensearch.repositories.s3.S3Repository.BULK_DELETE_SIZE; import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doAnswer; @@ -471,7 +472,7 @@ private void testWriteBlobByStreams(boolean expectException, boolean throwExcept StreamContextSupplier streamContextSupplier = partSize -> new StreamContext((partNo, size, position) -> { InputStream inputStream = new OffsetRangeIndexInputStream(new ByteArrayIndexInput("desc", bytes), size, position); openInputStreams.add(inputStream); - return new InputStreamContainer(inputStream, size, position); + return new InputStreamContainer(inputStream, size, position, null); }, partSize, calculateLastPartSize(bytes.length, partSize), calculateNumberOfParts(bytes.length, partSize)); CheckedConsumer uploadFinalizer = uploadSuccess -> { @@ -527,7 +528,7 @@ private void testWriteBlobByStreamsLargeBlob(boolean expectException, boolean th StreamContextSupplier streamContextSupplier = partSize1 -> new StreamContext((partNo, size, position) -> { InputStream inputStream = new OffsetRangeIndexInputStream(new ZeroIndexInput("desc", blobSize), size, position); openInputStreams.add(inputStream); - return new InputStreamContainer(inputStream, size, position); + return new InputStreamContainer(inputStream, size, position, null); }, partSize1, calculateLastPartSize(blobSize, partSize1), calculateNumberOfParts(blobSize, partSize1)); CheckedConsumer uploadFinalizer = uploadSuccess -> { @@ -649,7 +650,7 @@ private void testLargeFilesRedirectedToSlowSyncClient(boolean expectException) t StreamContextSupplier streamContextSupplier = partSize1 -> new StreamContext((partNo, size, position) -> { InputStream inputStream = new OffsetRangeIndexInputStream(new ZeroIndexInput("desc", blobSize), size, position); openInputStreams.add(inputStream); - return new InputStreamContainer(inputStream, size, position); + return new InputStreamContainer(inputStream, size, position, null); }, partSize1, calculateLastPartSize(blobSize, partSize1), calculateNumberOfParts(blobSize, partSize1)); WriteContext writeContext = new WriteContext.Builder().fileName("write_large_blob") @@ -668,7 +669,13 @@ private void testLargeFilesRedirectedToSlowSyncClient(boolean expectException) t } else { assertNull(exceptionRef.get()); } - verify(s3BlobContainer, times(1)).executeMultipartUpload(any(S3BlobStore.class), anyString(), any(InputStream.class), anyLong()); + verify(s3BlobContainer, times(1)).executeMultipartUpload( + any(S3BlobStore.class), + anyString(), + any(InputStream.class), + anyLong(), + anyMap() + ); if (expectException) { verify(client, times(1)).abortMultipartUpload(any(AbortMultipartUploadRequest.class)); diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java index 8e25ba4d950ef..acd9b3539dfe1 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java @@ -79,8 +79,10 @@ import java.net.SocketTimeoutException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.Objects; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -240,7 +242,7 @@ public InputStream readBlob(String blobName, long position, long length) throws }; } - public void testWriteBlobWithRetries() throws Exception { + public void writeBlobWithRetriesHelper(Map metadata) throws Exception { final int maxRetries = randomInt(5); final CountDown countDown = new CountDown(maxRetries + 1); @@ -280,11 +282,26 @@ public void testWriteBlobWithRetries() throws Exception { final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, null); try (InputStream stream = new ByteArrayInputStream(bytes)) { - blobContainer.writeBlob("write_blob_max_retries", stream, bytes.length, false); + if (metadata != null) { + blobContainer.writeBlobWithMetadata("write_blob_max_retries", stream, metadata, bytes.length, false); + } else { + blobContainer.writeBlob("write_blob_max_retries", stream, bytes.length, false); + } } assertThat(countDown.isCountedDown(), is(true)); } + public void testWriteBlobWithMetadataWithRetries() throws Exception { + Map metadata = new HashMap<>(); + metadata.put("key1", "value1"); + metadata.put("key2", "value2"); + writeBlobWithRetriesHelper(metadata); + } + + public void testWriteBlobWithRetries() throws Exception { + writeBlobWithRetriesHelper(null); + } + public void testWriteBlobByStreamsWithRetries() throws Exception { final int maxRetries = randomInt(5); final CountDown countDown = new CountDown(maxRetries + 1); @@ -335,7 +352,7 @@ public void testWriteBlobByStreamsWithRetries() throws Exception { StreamContextSupplier streamContextSupplier = partSize -> new StreamContext((partNo, size, position) -> { InputStream inputStream = new OffsetRangeIndexInputStream(new ByteArrayIndexInput("desc", bytes), size, position); openInputStreams.add(inputStream); - return new InputStreamContainer(inputStream, size, position); + return new InputStreamContainer(inputStream, size, position, null); }, partSize, calculateLastPartSize(bytes.length, partSize), calculateNumberOfParts(bytes.length, partSize)); WriteContext writeContext = new WriteContext.Builder().fileName("write_blob_by_streams_max_retries") @@ -368,7 +385,7 @@ private int calculateNumberOfParts(long contentLength, long partSize) { return (int) ((contentLength % partSize) == 0 ? contentLength / partSize : (contentLength / partSize) + 1); } - public void testWriteBlobWithReadTimeouts() { + public void writeBlobWithReadTimeoutsHelper(Map metadata) { final byte[] bytes = randomByteArrayOfLength(randomIntBetween(10, 128)); final TimeValue readTimeout = TimeValue.timeValueMillis(randomIntBetween(100, 500)); final BlobContainer blobContainer = createBlobContainer(1, readTimeout, true, null); @@ -386,7 +403,11 @@ public void testWriteBlobWithReadTimeouts() { Exception exception = expectThrows(IOException.class, () -> { try (InputStream stream = new InputStreamIndexInput(new ByteArrayIndexInput("desc", bytes), bytes.length)) { - blobContainer.writeBlob("write_blob_timeout", stream, bytes.length, false); + if (metadata != null) { + blobContainer.writeBlobWithMetadata("write_blob_timeout", stream, metadata, bytes.length, false); + } else { + blobContainer.writeBlob("write_blob_timeout", stream, bytes.length, false); + } } }); assertThat( @@ -401,7 +422,18 @@ public void testWriteBlobWithReadTimeouts() { assertThat(exception.getCause().getCause().getMessage().toLowerCase(Locale.ROOT), containsString("read timed out")); } - public void testWriteLargeBlob() throws Exception { + public void testWriteBlobWithMetadataWithReadTimeouts() throws Exception { + Map metadata = new HashMap<>(); + metadata.put("key1", "value1"); + metadata.put("key2", "value2"); + writeBlobWithReadTimeoutsHelper(metadata); + } + + public void testWriteBlobWithReadTimeouts() throws Exception { + writeBlobWithReadTimeoutsHelper(null); + } + + public void WriteLargeBlobHelper(Map metadata) throws Exception { final boolean useTimeout = rarely(); final TimeValue readTimeout = useTimeout ? TimeValue.timeValueMillis(randomIntBetween(100, 500)) : null; final ByteSizeValue bufferSize = new ByteSizeValue(5, ByteSizeUnit.MB); @@ -487,13 +519,28 @@ public void testWriteLargeBlob() throws Exception { } }); - blobContainer.writeBlob("write_large_blob", new ZeroInputStream(blobSize), blobSize, false); + if (metadata != null) { + blobContainer.writeBlobWithMetadata("write_large_blob", new ZeroInputStream(blobSize), metadata, blobSize, false); + } else { + blobContainer.writeBlob("write_large_blob", new ZeroInputStream(blobSize), blobSize, false); + } assertThat(countDownInitiate.isCountedDown(), is(true)); assertThat(countDownUploads.get(), equalTo(0)); assertThat(countDownComplete.isCountedDown(), is(true)); } + public void testWriteLargeBlobWithMetadata() throws Exception { + Map metadata = new HashMap<>(); + metadata.put("key1", "value1"); + metadata.put("key2", "value2"); + WriteLargeBlobHelper(metadata); + } + + public void testWriteLargeBlob() throws Exception { + WriteLargeBlobHelper(null); + } + /** * Asserts that an InputStream is fully consumed, or aborted, when it is closed */ diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java index 2b45e9cfe2d4b..654d8a72690c4 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java @@ -90,6 +90,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -125,7 +126,7 @@ public void testExecuteSingleUploadBlobSizeTooLarge() { final IllegalArgumentException e = expectThrows( IllegalArgumentException.class, - () -> blobContainer.executeSingleUpload(blobStore, randomAlphaOfLengthBetween(1, 10), null, blobSize) + () -> blobContainer.executeSingleUpload(blobStore, randomAlphaOfLengthBetween(1, 10), null, blobSize, null) ); assertEquals("Upload request size [" + blobSize + "] can't be larger than 5gb", e.getMessage()); } @@ -139,7 +140,13 @@ public void testExecuteSingleUploadBlobSizeLargerThanBufferSize() { final IllegalArgumentException e = expectThrows( IllegalArgumentException.class, - () -> blobContainer.executeSingleUpload(blobStore, blobName, new ByteArrayInputStream(new byte[0]), ByteSizeUnit.MB.toBytes(2)) + () -> blobContainer.executeSingleUpload( + blobStore, + blobName, + new ByteArrayInputStream(new byte[0]), + ByteSizeUnit.MB.toBytes(2), + null + ) ); assertEquals("Upload request size [2097152] can't be larger than buffer size", e.getMessage()); } @@ -430,6 +437,10 @@ public void testExecuteSingleUpload() throws IOException { final String bucketName = randomAlphaOfLengthBetween(1, 10); final String blobName = randomAlphaOfLengthBetween(1, 10); + final Map metadata = new HashMap<>(); + metadata.put("key1", "value1"); + metadata.put("key2", "value2"); + final BlobPath blobPath = new BlobPath(); if (randomBoolean()) { IntStream.of(randomIntBetween(1, 5)).forEach(value -> blobPath.add("path_" + value)); @@ -467,7 +478,7 @@ public void testExecuteSingleUpload() throws IOException { ); final ByteArrayInputStream inputStream = new ByteArrayInputStream(new byte[blobSize]); - blobContainer.executeSingleUpload(blobStore, blobName, inputStream, blobSize); + blobContainer.executeSingleUpload(blobStore, blobName, inputStream, blobSize, metadata); final PutObjectRequest request = putObjectRequestArgumentCaptor.getValue(); final RequestBody requestBody = requestBodyArgumentCaptor.getValue(); @@ -480,6 +491,7 @@ public void testExecuteSingleUpload() throws IOException { assertEquals(blobSize, request.contentLength().longValue()); assertEquals(storageClass, request.storageClass()); assertEquals(cannedAccessControlList, request.acl()); + assertEquals(metadata, request.metadata()); if (serverSideEncryption) { assertEquals(ServerSideEncryption.AES256, request.serverSideEncryption()); } @@ -492,7 +504,7 @@ public void testExecuteMultipartUploadBlobSizeTooLarge() { final IllegalArgumentException e = expectThrows( IllegalArgumentException.class, - () -> blobContainer.executeMultipartUpload(blobStore, randomAlphaOfLengthBetween(1, 10), null, blobSize) + () -> blobContainer.executeMultipartUpload(blobStore, randomAlphaOfLengthBetween(1, 10), null, blobSize, null) ); assertEquals("Multipart upload request size [" + blobSize + "] can't be larger than 5tb", e.getMessage()); } @@ -504,7 +516,7 @@ public void testExecuteMultipartUploadBlobSizeTooSmall() { final IllegalArgumentException e = expectThrows( IllegalArgumentException.class, - () -> blobContainer.executeMultipartUpload(blobStore, randomAlphaOfLengthBetween(1, 10), null, blobSize) + () -> blobContainer.executeMultipartUpload(blobStore, randomAlphaOfLengthBetween(1, 10), null, blobSize, null) ); assertEquals("Multipart upload request size [" + blobSize + "] can't be smaller than 5mb", e.getMessage()); } @@ -513,6 +525,10 @@ public void testExecuteMultipartUpload() throws IOException { final String bucketName = randomAlphaOfLengthBetween(1, 10); final String blobName = randomAlphaOfLengthBetween(1, 10); + final Map metadata = new HashMap<>(); + metadata.put("key1", "value1"); + metadata.put("key2", "value2"); + final BlobPath blobPath = new BlobPath(); if (randomBoolean()) { IntStream.of(randomIntBetween(1, 5)).forEach(value -> blobPath.add("path_" + value)); @@ -577,13 +593,15 @@ public void testExecuteMultipartUpload() throws IOException { final ByteArrayInputStream inputStream = new ByteArrayInputStream(new byte[0]); final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore); - blobContainer.executeMultipartUpload(blobStore, blobName, inputStream, blobSize); + blobContainer.executeMultipartUpload(blobStore, blobName, inputStream, blobSize, metadata); final CreateMultipartUploadRequest initRequest = createMultipartUploadRequestArgumentCaptor.getValue(); assertEquals(bucketName, initRequest.bucket()); assertEquals(blobPath.buildAsString() + blobName, initRequest.key()); assertEquals(storageClass, initRequest.storageClass()); assertEquals(cannedAccessControlList, initRequest.acl()); + assertEquals(metadata, initRequest.metadata()); + if (serverSideEncryption) { assertEquals(ServerSideEncryption.AES256, initRequest.serverSideEncryption()); } @@ -686,7 +704,7 @@ public void testExecuteMultipartUploadAborted() { final IOException e = expectThrows(IOException.class, () -> { final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore); - blobContainer.executeMultipartUpload(blobStore, blobName, new ByteArrayInputStream(new byte[0]), blobSize); + blobContainer.executeMultipartUpload(blobStore, blobName, new ByteArrayInputStream(new byte[0]), blobSize, null); }); assertEquals("Unable to upload object [" + blobName + "] using multipart upload", e.getMessage()); diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/AsyncTransferManagerTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/AsyncTransferManagerTests.java index b753b847df869..895718102af82 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/AsyncTransferManagerTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/AsyncTransferManagerTests.java @@ -82,10 +82,10 @@ public void testOneChunkUpload() { s3AsyncClient, new UploadRequest("bucket", "key", ByteSizeUnit.MB.toBytes(1), WritePriority.HIGH, uploadSuccess -> { // do nothing - }, false, null, true), + }, false, null, true, null), new StreamContext((partIdx, partSize, position) -> { streamRef.set(new ZeroInputStream(partSize)); - return new InputStreamContainer(streamRef.get(), partSize, position); + return new InputStreamContainer(streamRef.get(), partSize, position, null); }, ByteSizeUnit.MB.toBytes(1), ByteSizeUnit.MB.toBytes(1), 1), new StatsMetricPublisher() ); @@ -127,9 +127,9 @@ public void testOneChunkUploadCorruption() { s3AsyncClient, new UploadRequest("bucket", "key", ByteSizeUnit.MB.toBytes(1), WritePriority.HIGH, uploadSuccess -> { // do nothing - }, false, null, true), + }, false, null, true, null), new StreamContext( - (partIdx, partSize, position) -> new InputStreamContainer(new ZeroInputStream(partSize), partSize, position), + (partIdx, partSize, position) -> new InputStreamContainer(new ZeroInputStream(partSize), partSize, position, null), ByteSizeUnit.MB.toBytes(1), ByteSizeUnit.MB.toBytes(1), 1 @@ -180,11 +180,11 @@ public void testMultipartUpload() { s3AsyncClient, new UploadRequest("bucket", "key", ByteSizeUnit.MB.toBytes(5), WritePriority.HIGH, uploadSuccess -> { // do nothing - }, true, 3376132981L, true), + }, true, 3376132981L, true, null), new StreamContext((partIdx, partSize, position) -> { InputStream stream = new ZeroInputStream(partSize); streams.add(stream); - return new InputStreamContainer(stream, partSize, position); + return new InputStreamContainer(stream, partSize, position, null); }, ByteSizeUnit.MB.toBytes(1), ByteSizeUnit.MB.toBytes(1), 5), new StatsMetricPublisher() ); @@ -240,9 +240,9 @@ public void testMultipartUploadCorruption() { s3AsyncClient, new UploadRequest("bucket", "key", ByteSizeUnit.MB.toBytes(5), WritePriority.HIGH, uploadSuccess -> { // do nothing - }, true, 0L, true), + }, true, 0L, true, null), new StreamContext( - (partIdx, partSize, position) -> new InputStreamContainer(new ZeroInputStream(partSize), partSize, position), + (partIdx, partSize, position) -> new InputStreamContainer(new ZeroInputStream(partSize), partSize, position, null), ByteSizeUnit.MB.toBytes(1), ByteSizeUnit.MB.toBytes(1), 5 diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java index d45b4e3deb798..3e37741000289 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java @@ -128,7 +128,12 @@ public void readBlobAsync(String blobName, ActionListener listener) List blobPartStreams = new ArrayList<>(); for (int partNumber = 0; partNumber < numberOfParts; partNumber++) { long offset = partNumber * partSize; - InputStreamContainer blobPartStream = new InputStreamContainer(readBlob(blobName, offset, partSize), partSize, offset); + InputStreamContainer blobPartStream = new InputStreamContainer( + readBlob(blobName, offset, partSize), + partSize, + offset, + null + ); blobPartStreams.add(() -> CompletableFuture.completedFuture(blobPartStream)); } ReadContext blobReadContext = new ReadContext.Builder(contentLength, blobPartStreams).build(); diff --git a/server/src/main/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainer.java b/server/src/main/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainer.java index 82bc7a0baed50..a73bfbb2e1cfc 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainer.java +++ b/server/src/main/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainer.java @@ -168,7 +168,7 @@ private InputStreamContainer decryptInputStreamContainer(InputStreamContainer in long adjustedLength = decryptedStreamProvider.getAdjustedRange()[1] - adjustedPos + 1; final InputStream decryptedStream = decryptedStreamProvider.getDecryptedStreamProvider() .apply(inputStreamContainer.getInputStream()); - return new InputStreamContainer(decryptedStream, adjustedLength, adjustedPos); + return new InputStreamContainer(decryptedStream, adjustedLength, adjustedPos, null); } } } diff --git a/server/src/main/java/org/opensearch/common/blobstore/EncryptedBlobContainer.java b/server/src/main/java/org/opensearch/common/blobstore/EncryptedBlobContainer.java index d0933741339d9..d041a31e2334a 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/EncryptedBlobContainer.java +++ b/server/src/main/java/org/opensearch/common/blobstore/EncryptedBlobContainer.java @@ -84,7 +84,7 @@ public long readBlobPreferredLength() { private void executeWrite(InputStream inputStream, long blobSize, CheckedBiConsumer writeConsumer) throws IOException { T cryptoContext = cryptoHandler.initEncryptionMetadata(); - InputStreamContainer streamContainer = new InputStreamContainer(inputStream, blobSize, 0); + InputStreamContainer streamContainer = new InputStreamContainer(inputStream, blobSize, 0, null); InputStreamContainer encryptedStream = cryptoHandler.createEncryptingStream(cryptoContext, streamContainer); long cryptoLength = cryptoHandler.estimateEncryptedLengthOfEntireContent(cryptoContext, blobSize); writeConsumer.accept(encryptedStream.getInputStream(), cryptoLength); diff --git a/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java b/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java index cd2ef22327ebb..6042e3d84bbc6 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java +++ b/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java @@ -221,7 +221,7 @@ private LocalStreamSupplier getMultipartStreamSupplier( inputStream = offsetRangeInputStream; } - return new InputStreamContainer(inputStream, size, position); + return new InputStreamContainer(inputStream, size, position, null); } catch (IOException e) { log.error("Failed to create input stream", e); throw e; diff --git a/server/src/test/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainerTests.java b/server/src/test/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainerTests.java index aee4ae40d16a5..a92ccabb9920d 100644 --- a/server/src/test/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainerTests.java +++ b/server/src/test/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainerTests.java @@ -53,7 +53,7 @@ public void testReadBlobAsync() throws Exception { final byte[] data = new byte[size]; Randomness.get().nextBytes(data); - final InputStreamContainer inputStreamContainer = new InputStreamContainer(new ByteArrayInputStream(data), data.length, 0); + final InputStreamContainer inputStreamContainer = new InputStreamContainer(new ByteArrayInputStream(data), data.length, 0, null); final ListenerTestUtils.CountingCompletionListener completionListener = new ListenerTestUtils.CountingCompletionListener<>(); final CompletableFuture streamContainerFuture = CompletableFuture.completedFuture(inputStreamContainer); @@ -99,7 +99,7 @@ public void testReadBlobAsyncException() throws Exception { // Objects needed for API call final byte[] data = new byte[size]; Randomness.get().nextBytes(data); - final InputStreamContainer inputStreamContainer = new InputStreamContainer(new ByteArrayInputStream(data), data.length, 0); + final InputStreamContainer inputStreamContainer = new InputStreamContainer(new ByteArrayInputStream(data), data.length, 0, null); final ListenerTestUtils.CountingCompletionListener completionListener = new ListenerTestUtils.CountingCompletionListener<>(); final CompletableFuture streamContainerFuture = CompletableFuture.completedFuture(inputStreamContainer); diff --git a/server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/FilePartWriterTests.java b/server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/FilePartWriterTests.java index f2a758b9bbe10..359eca54cf0fa 100644 --- a/server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/FilePartWriterTests.java +++ b/server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/FilePartWriterTests.java @@ -32,7 +32,7 @@ public void testFilePartWriter() throws Exception { Path segmentFilePath = path.resolve(UUID.randomUUID().toString()); int contentLength = 100; InputStream inputStream = new ByteArrayInputStream(randomByteArrayOfLength(contentLength)); - InputStreamContainer inputStreamContainer = new InputStreamContainer(inputStream, inputStream.available(), 0); + InputStreamContainer inputStreamContainer = new InputStreamContainer(inputStream, inputStream.available(), 0, null); FilePartWriter.write(segmentFilePath, inputStreamContainer, UnaryOperator.identity()); @@ -45,7 +45,7 @@ public void testFilePartWriterWithOffset() throws Exception { int contentLength = 100; int offset = 10; InputStream inputStream = new ByteArrayInputStream(randomByteArrayOfLength(contentLength)); - InputStreamContainer inputStreamContainer = new InputStreamContainer(inputStream, inputStream.available(), offset); + InputStreamContainer inputStreamContainer = new InputStreamContainer(inputStream, inputStream.available(), offset, null); FilePartWriter.write(segmentFilePath, inputStreamContainer, UnaryOperator.identity()); @@ -57,7 +57,7 @@ public void testFilePartWriterLargeInput() throws Exception { Path segmentFilePath = path.resolve(UUID.randomUUID().toString()); int contentLength = 20 * 1024 * 1024; InputStream inputStream = new ByteArrayInputStream(randomByteArrayOfLength(contentLength)); - InputStreamContainer inputStreamContainer = new InputStreamContainer(inputStream, contentLength, 0); + InputStreamContainer inputStreamContainer = new InputStreamContainer(inputStream, contentLength, 0, null); FilePartWriter.write(segmentFilePath, inputStreamContainer, UnaryOperator.identity()); diff --git a/server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/ReadContextListenerTests.java b/server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/ReadContextListenerTests.java index c47874f3ba294..2e745abf4a910 100644 --- a/server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/ReadContextListenerTests.java +++ b/server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/ReadContextListenerTests.java @@ -121,7 +121,7 @@ public int available() { blobPartStreams.add( NUMBER_OF_PARTS, () -> CompletableFuture.supplyAsync( - () -> new InputStreamContainer(badInputStream, PART_SIZE, PART_SIZE * NUMBER_OF_PARTS), + () -> new InputStreamContainer(badInputStream, PART_SIZE, PART_SIZE * NUMBER_OF_PARTS, null), threadPool.generic() ) ); @@ -174,7 +174,7 @@ public int read(byte[] b) throws IOException { blobPartStreams.add( NUMBER_OF_PARTS, () -> CompletableFuture.supplyAsync( - () -> new InputStreamContainer(assertingStream, PART_SIZE, PART_SIZE * NUMBER_OF_PARTS), + () -> new InputStreamContainer(assertingStream, PART_SIZE, PART_SIZE * NUMBER_OF_PARTS, null), threadPool.generic() ) ); @@ -219,7 +219,7 @@ private List initializeBlobPartStreams() { int finalPartNumber = partNumber; blobPartStreams.add( () -> CompletableFuture.supplyAsync( - () -> new InputStreamContainer(testStream, PART_SIZE, (long) finalPartNumber * PART_SIZE), + () -> new InputStreamContainer(testStream, PART_SIZE, (long) finalPartNumber * PART_SIZE, null), threadPool.generic() ) );