From 08c9432bdb841ec1bdf4904dded703896141d942 Mon Sep 17 00:00:00 2001 From: Sandeep Kumawat Date: Tue, 2 Apr 2024 19:02:00 +0530 Subject: [PATCH 1/6] Introduce interface changes to read/write blob with object metadata Signed-off-by: Sandeep Kumawat --- .../common/blobstore/BlobContainer.java | 69 +++++++++++++++++++ .../blobstore/BlobDownloadResponse.java | 44 ++++++++++++ .../transfer/BlobStoreTransferService.java | 6 ++ .../translog/transfer/TransferService.java | 10 +++ 4 files changed, 129 insertions(+) create mode 100644 server/src/main/java/org/opensearch/common/blobstore/BlobDownloadResponse.java diff --git a/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java b/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java index 2e25a532b5abf..3776007e9e84c 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java +++ b/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java @@ -77,6 +77,19 @@ public interface BlobContainer { */ InputStream readBlob(String blobName) throws IOException; + /** + * Creates a new {@link BlobDownloadResponse} for the given blob name. + * + * @param blobName + * The name of the blob to get an {@link InputStream} for. + * @return The {@code InputStream} to read the blob. + * @throws NoSuchFileException if the blob does not exist + * @throws IOException if the blob can not be read. + */ + default BlobDownloadResponse readBlobWithMetadata(String blobName) throws IOException { + return null; + }; + /** * Creates a new {@link InputStream} that can be used to read the given blob starting from * a specific {@code position} in the blob. The {@code length} is an indication of the @@ -128,6 +141,33 @@ default long readBlobPreferredLength() { */ void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException; + /** + * Reads blob content from the input stream and writes it to the container in a new blob with the given name, and metadata. + * This method assumes the container does not already contain a blob of the same blobName. If a blob by the + * same name already exists, the operation will fail and an {@link IOException} will be thrown. + * + * @param blobName + * The name of the blob to write the contents of the input stream to. + * @param inputStream + * The input stream from which to retrieve the bytes to write to the blob. + * @param metadata + * The metadata to be associate with the blob upload. + * @param blobSize + * The size of the blob to be written, in bytes. It is implementation dependent whether + * this value is used in writing the blob to the repository. + * @param failIfAlreadyExists + * whether to throw a FileAlreadyExistsException if the given blob already exists + * @throws FileAlreadyExistsException if failIfAlreadyExists is true and a blob by the same name already exists + * @throws IOException if the input stream could not be read, or the target blob could not be written to. + */ + default void writeBlobWithMetadata( + String blobName, + InputStream inputStream, + Map metadata, + long blobSize, + boolean failIfAlreadyExists + ) throws IOException {}; + /** * Reads blob content from the input stream and writes it to the container in a new blob with the given name, * using an atomic write operation if the implementation supports it. @@ -149,6 +189,35 @@ default long readBlobPreferredLength() { */ void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException; + /** + * Reads blob content from the input stream and writes it to the container in a new blob with the given name,and metadata + * using an atomic write operation if the implementation supports it. + *

+ * This method assumes the container does not already contain a blob of the same blobName. If a blob by the + * same name already exists, the operation will fail and an {@link IOException} will be thrown. + * + * @param blobName + * The name of the blob to write the contents of the input stream to. + * @param inputStream + * The input stream from which to retrieve the bytes to write to the blob. + * @param metadata + * The metadata to be associate with the blob upload. + * @param blobSize + * The size of the blob to be written, in bytes. It is implementation dependent whether + * this value is used in writing the blob to the repository. + * @param failIfAlreadyExists + * whether to throw a FileAlreadyExistsException if the given blob already exists + * @throws FileAlreadyExistsException if failIfAlreadyExists is true and a blob by the same name already exists + * @throws IOException if the input stream could not be read, or the target blob could not be written to. + */ + default void writeBlobAtomicWithMetadata( + String blobName, + InputStream inputStream, + Map metadata, + long blobSize, + boolean failIfAlreadyExists + ) throws IOException {}; + /** * Deletes this container and all its contents from the repository. * diff --git a/server/src/main/java/org/opensearch/common/blobstore/BlobDownloadResponse.java b/server/src/main/java/org/opensearch/common/blobstore/BlobDownloadResponse.java new file mode 100644 index 0000000000000..c03570c71164c --- /dev/null +++ b/server/src/main/java/org/opensearch/common/blobstore/BlobDownloadResponse.java @@ -0,0 +1,44 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.blobstore; + +import java.io.InputStream; +import java.util.Map; + +/** + * A class for blob download response + * + * @opensearch.internal + */ +public class BlobDownloadResponse { + + /** + * Downloaded blob InputStream + */ + private InputStream inputStream; + + /** + * Metadata of the downloaded blob + */ + private Map metadata; + + public InputStream getInputStream() { + return inputStream; + } + + public Map getMetadata() { + return metadata; + } + + public BlobDownloadResponse(InputStream inputStream, Map metadata) { + this.inputStream = inputStream; + this.metadata = metadata; + } + +} diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java index 82dd6301ef79f..5ab0b9d5fa870 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java @@ -14,6 +14,7 @@ import org.opensearch.action.ActionRunnable; import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobDownloadResponse; import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStore; @@ -164,6 +165,11 @@ public InputStream downloadBlob(Iterable path, String fileName) throws I return blobStore.blobContainer((BlobPath) path).readBlob(fileName); } + @Override + public BlobDownloadResponse downloadBlobWithMetadata(Iterable path, String fileName) throws IOException { + return blobStore.blobContainer((BlobPath) path).readBlobWithMetadata(fileName); + } + @Override public void deleteBlobs(Iterable path, List fileNames) throws IOException { blobStore.blobContainer((BlobPath) path).deleteBlobsIgnoringIfNotExists(fileNames); diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java index cfe833dde87eb..a11532dc69f1a 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java @@ -8,6 +8,7 @@ package org.opensearch.index.translog.transfer; +import org.opensearch.common.blobstore.BlobDownloadResponse; import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.stream.write.WritePriority; @@ -125,6 +126,15 @@ void uploadBlobs( */ InputStream downloadBlob(Iterable path, String fileName) throws IOException; + /** + * + * @param path the remote path from where download should be made + * @param fileName the name of the file + * @return {@link BlobDownloadResponse} of the remote file + * @throws IOException the exception while reading the data + */ + BlobDownloadResponse downloadBlobWithMetadata(Iterable path, String fileName) throws IOException; + void listAllInSortedOrder(Iterable path, String filenamePrefix, int limit, ActionListener> listener); void listAllInSortedOrderAsync( From 950e55bb7239436f3496be68891fc6668dac133b Mon Sep 17 00:00:00 2001 From: Sandeep Kumawat Date: Wed, 3 Apr 2024 01:33:39 +0530 Subject: [PATCH 2/6] empty commit Signed-off-by: Sandeep Kumawat From 85711ae52ee793e5513524268eb46b7dc30a93df Mon Sep 17 00:00:00 2001 From: Sandeep Kumawat Date: Wed, 3 Apr 2024 15:35:59 +0530 Subject: [PATCH 3/6] provide metadata in remoteTransferContainer for async translog upload flow Signed-off-by: Sandeep Kumawat --- .../s3/S3BlobContainerMockClientTests.java | 6 +-- .../s3/S3BlobContainerRetriesTests.java | 2 +- .../blobstore/stream/write/WriteContext.java | 13 +++++- .../transfer/RemoteTransferContainer.java | 40 ++++++++++++++++++- 4 files changed, 55 insertions(+), 6 deletions(-) diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java index 8c7e196d7c812..607f186f2db1e 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java @@ -483,7 +483,7 @@ public InputStreamContainer apply(Integer partNo, Long size, Long position) thro if (throwExceptionOnFinalizeUpload) { throw new RuntimeException(); } - }, false, null), completionListener); + }, false, null, null), completionListener); assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS)); // wait for completableFuture to finish @@ -533,7 +533,7 @@ public InputStreamContainer apply(Integer partNo, Long size, Long position) thro if (throwExceptionOnFinalizeUpload) { throw new RuntimeException(); } - }, false, null), completionListener); + }, false, null, null), completionListener); assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS)); if (expectException || throwExceptionOnFinalizeUpload) { @@ -644,7 +644,7 @@ public InputStreamContainer apply(Integer partNo, Long size, Long position) thro } }, partSize, calculateLastPartSize(blobSize, partSize), calculateNumberOfParts(blobSize, partSize)); } - }, blobSize, false, WritePriority.HIGH, uploadSuccess -> { assertTrue(uploadSuccess); }, false, null), completionListener); + }, blobSize, false, WritePriority.HIGH, uploadSuccess -> { assertTrue(uploadSuccess); }, false, null, null), completionListener); assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS)); if (expectException) { diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java index ceab06bd051e9..70f41e647648e 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java @@ -344,7 +344,7 @@ public InputStreamContainer apply(Integer partNo, Long size, Long position) thro } }, partSize, calculateLastPartSize(bytes.length, partSize), calculateNumberOfParts(bytes.length, partSize)); } - }, bytes.length, false, WritePriority.NORMAL, Assert::assertTrue, false, null), completionListener); + }, bytes.length, false, WritePriority.NORMAL, Assert::assertTrue, false, null, null), completionListener); assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS)); diff --git a/server/src/main/java/org/opensearch/common/blobstore/stream/write/WriteContext.java b/server/src/main/java/org/opensearch/common/blobstore/stream/write/WriteContext.java index e74462f82400d..3373f32845c27 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/stream/write/WriteContext.java +++ b/server/src/main/java/org/opensearch/common/blobstore/stream/write/WriteContext.java @@ -13,6 +13,7 @@ import org.opensearch.common.StreamContext; import java.io.IOException; +import java.util.Map; /** * WriteContext is used to encapsulate all data needed by BlobContainer#writeStreams @@ -29,6 +30,7 @@ public class WriteContext { private final CheckedConsumer uploadFinalizer; private final boolean doRemoteDataIntegrityCheck; private final Long expectedChecksum; + private Map metadata; /** * Construct a new WriteContext object @@ -49,7 +51,8 @@ public WriteContext( WritePriority writePriority, CheckedConsumer uploadFinalizer, boolean doRemoteDataIntegrityCheck, - @Nullable Long expectedChecksum + @Nullable Long expectedChecksum, + Map metadata ) { this.fileName = fileName; this.streamContextSupplier = streamContextSupplier; @@ -59,6 +62,7 @@ public WriteContext( this.uploadFinalizer = uploadFinalizer; this.doRemoteDataIntegrityCheck = doRemoteDataIntegrityCheck; this.expectedChecksum = expectedChecksum; + this.metadata = metadata; } /** @@ -131,4 +135,11 @@ public boolean doRemoteDataIntegrityCheck() { public Long getExpectedChecksum() { return expectedChecksum; } + + /** + * @return the upload metadata. + */ + public Map getMetadata() { + return metadata; + } } diff --git a/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java b/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java index 2047c99d9e13b..8a0f7cdb4a474 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java +++ b/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java @@ -27,6 +27,7 @@ import java.io.Closeable; import java.io.IOException; import java.io.InputStream; +import java.util.Map; import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; @@ -55,6 +56,7 @@ public class RemoteTransferContainer implements Closeable { private final OffsetRangeInputStreamSupplier offsetRangeInputStreamSupplier; private final boolean isRemoteDataIntegritySupported; private final AtomicBoolean readBlock = new AtomicBoolean(); + private Map metadata = null; private static final Logger log = LogManager.getLogger(RemoteTransferContainer.class); @@ -90,6 +92,41 @@ public RemoteTransferContainer( this.isRemoteDataIntegritySupported = isRemoteDataIntegritySupported; } + /** + * Construct a new RemoteTransferContainer object with metadata. + * + * @param fileName Name of the local file + * @param remoteFileName Name of the remote file + * @param contentLength Total content length of the file to be uploaded + * @param failTransferIfFileExists A boolean to determine if upload has to be failed if file exists + * @param writePriority The {@link WritePriority} of current upload + * @param offsetRangeInputStreamSupplier A supplier to create OffsetRangeInputStreams + * @param expectedChecksum The expected checksum value for the file being uploaded. This checksum will be used for local or remote data integrity checks + * @param isRemoteDataIntegritySupported A boolean to signify whether the remote repository supports server side data integrity verification + * @param metadata Object metadata to be store with the file. + */ + public RemoteTransferContainer( + String fileName, + String remoteFileName, + long contentLength, + boolean failTransferIfFileExists, + WritePriority writePriority, + OffsetRangeInputStreamSupplier offsetRangeInputStreamSupplier, + long expectedChecksum, + boolean isRemoteDataIntegritySupported, + Map metadata + ) { + this.fileName = fileName; + this.remoteFileName = remoteFileName; + this.contentLength = contentLength; + this.failTransferIfFileExists = failTransferIfFileExists; + this.writePriority = writePriority; + this.offsetRangeInputStreamSupplier = offsetRangeInputStreamSupplier; + this.expectedChecksum = expectedChecksum; + this.isRemoteDataIntegritySupported = isRemoteDataIntegritySupported; + this.metadata = metadata; + } + /** * @return The {@link WriteContext} for the current upload */ @@ -102,7 +139,8 @@ public WriteContext createWriteContext() { writePriority, this::finalizeUpload, isRemoteDataIntegrityCheckPossible(), - isRemoteDataIntegrityCheckPossible() ? expectedChecksum : null + isRemoteDataIntegrityCheckPossible() ? expectedChecksum : null, + metadata ); } From 3a41ce3a54cb988fd21ecb61523592e2b8b662ba Mon Sep 17 00:00:00 2001 From: Sandeep Kumawat Date: Thu, 4 Apr 2024 01:21:20 +0530 Subject: [PATCH 4/6] introduce metadata support in ReadContext Signed-off-by: Sandeep Kumawat --- .../opensearch/repositories/s3/S3BlobContainer.java | 2 +- .../multipart/mocks/MockFsAsyncBlobContainer.java | 2 +- .../common/blobstore/stream/read/ReadContext.java | 10 +++++++++- .../AsyncMultiStreamEncryptedBlobContainerTests.java | 4 ++-- .../stream/read/listener/ReadContextListenerTests.java | 8 ++++---- 5 files changed, 17 insertions(+), 9 deletions(-) diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java index 25f361b40636e..56589a981003a 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java @@ -285,7 +285,7 @@ public void readBlobAsync(String blobName, ActionListener listener) ); } } - listener.onResponse(new ReadContext(blobSize, blobPartInputStreamFutures, blobChecksum)); + listener.onResponse(new ReadContext(blobSize, blobPartInputStreamFutures, blobChecksum, null)); } catch (Exception ex) { listener.onFailure(ex); } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java index 36987ac2d4991..fd089391038f7 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java @@ -131,7 +131,7 @@ public void readBlobAsync(String blobName, ActionListener listener) InputStreamContainer blobPartStream = new InputStreamContainer(readBlob(blobName, offset, partSize), partSize, offset); blobPartStreams.add(() -> CompletableFuture.completedFuture(blobPartStream)); } - ReadContext blobReadContext = new ReadContext(contentLength, blobPartStreams, null); + ReadContext blobReadContext = new ReadContext(contentLength, blobPartStreams, null, null); listener.onResponse(blobReadContext); } catch (Exception e) { listener.onFailure(e); diff --git a/server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java b/server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java index 1264551401b4c..02be0df346a5f 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java +++ b/server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java @@ -12,6 +12,7 @@ import org.opensearch.common.io.InputStreamContainer; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; @@ -25,23 +26,30 @@ public class ReadContext { private final long blobSize; private final List asyncPartStreams; private final String blobChecksum; + private Map metadata; - public ReadContext(long blobSize, List asyncPartStreams, String blobChecksum) { + public ReadContext(long blobSize, List asyncPartStreams, String blobChecksum, Map metadata) { this.blobSize = blobSize; this.asyncPartStreams = asyncPartStreams; this.blobChecksum = blobChecksum; + this.metadata = metadata; } public ReadContext(ReadContext readContext) { this.blobSize = readContext.blobSize; this.asyncPartStreams = readContext.asyncPartStreams; this.blobChecksum = readContext.blobChecksum; + this.metadata = readContext.metadata; } public String getBlobChecksum() { return blobChecksum; } + public Map getMetadata() { + return metadata; + } + public int getNumberOfParts() { return asyncPartStreams.size(); } diff --git a/server/src/test/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainerTests.java b/server/src/test/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainerTests.java index 1780819390052..6b2fd05ed4bfe 100644 --- a/server/src/test/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainerTests.java +++ b/server/src/test/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainerTests.java @@ -57,7 +57,7 @@ public void testReadBlobAsync() throws Exception { final ListenerTestUtils.CountingCompletionListener completionListener = new ListenerTestUtils.CountingCompletionListener<>(); final CompletableFuture streamContainerFuture = CompletableFuture.completedFuture(inputStreamContainer); - final ReadContext readContext = new ReadContext(size, List.of(() -> streamContainerFuture), null); + final ReadContext readContext = new ReadContext(size, List.of(() -> streamContainerFuture), null, null); Mockito.doAnswer(invocation -> { ActionListener readContextActionListener = invocation.getArgument(1); @@ -103,7 +103,7 @@ public void testReadBlobAsyncException() throws Exception { final ListenerTestUtils.CountingCompletionListener completionListener = new ListenerTestUtils.CountingCompletionListener<>(); final CompletableFuture streamContainerFuture = CompletableFuture.completedFuture(inputStreamContainer); - final ReadContext readContext = new ReadContext(size, List.of(() -> streamContainerFuture), null); + final ReadContext readContext = new ReadContext(size, List.of(() -> streamContainerFuture), null, null); Mockito.doAnswer(invocation -> { ActionListener readContextActionListener = invocation.getArgument(1); diff --git a/server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/ReadContextListenerTests.java b/server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/ReadContextListenerTests.java index 0163c2275e7f4..b9cf08fa4acb1 100644 --- a/server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/ReadContextListenerTests.java +++ b/server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/ReadContextListenerTests.java @@ -78,7 +78,7 @@ public void testReadContextListener() throws InterruptedException, IOException { UnaryOperator.identity(), MAX_CONCURRENT_STREAMS ); - ReadContext readContext = new ReadContext((long) PART_SIZE * NUMBER_OF_PARTS, blobPartStreams, null); + ReadContext readContext = new ReadContext((long) PART_SIZE * NUMBER_OF_PARTS, blobPartStreams, null, null); readContextListener.onResponse(readContext); countDownLatch.await(); @@ -125,7 +125,7 @@ public int available() { threadPool.generic() ) ); - ReadContext readContext = new ReadContext((long) (PART_SIZE + 1) * NUMBER_OF_PARTS, blobPartStreams, null); + ReadContext readContext = new ReadContext((long) (PART_SIZE + 1) * NUMBER_OF_PARTS, blobPartStreams, null, null); readContextListener.onResponse(readContext); countDownLatch.await(); @@ -178,7 +178,7 @@ public int read(byte[] b) throws IOException { threadPool.generic() ) ); - ReadContext readContext = new ReadContext((long) (PART_SIZE + 1) * NUMBER_OF_PARTS + 1, blobPartStreams, null); + ReadContext readContext = new ReadContext((long) (PART_SIZE + 1) * NUMBER_OF_PARTS + 1, blobPartStreams, null, null); readContextListener.onResponse(readContext); countDownLatch.await(); @@ -203,7 +203,7 @@ public void testWriteToTempFile_alreadyExists_replacesFile() throws Exception { UnaryOperator.identity(), MAX_CONCURRENT_STREAMS ); - ReadContext readContext = new ReadContext((long) (PART_SIZE + 1) * NUMBER_OF_PARTS, blobPartStreams, null); + ReadContext readContext = new ReadContext((long) (PART_SIZE + 1) * NUMBER_OF_PARTS, blobPartStreams, null, null); readContextListener.onResponse(readContext); countDownLatch.await(); From d95e5c9ced4d1c44dede07cd707e14f2374b5ac7 Mon Sep 17 00:00:00 2001 From: Sandeep Kumawat Date: Thu, 4 Apr 2024 13:46:26 +0530 Subject: [PATCH 5/6] empty commit Signed-off-by: Sandeep Kumawat From e63590c1e6f518b6952ec352ba9ffc8284bc9319 Mon Sep 17 00:00:00 2001 From: Sandeep Kumawat Date: Thu, 4 Apr 2024 14:31:47 +0530 Subject: [PATCH 6/6] minor changes Signed-off-by: Sandeep Kumawat --- .../opensearch/common/blobstore/stream/read/ReadContext.java | 2 +- .../opensearch/common/blobstore/stream/write/WriteContext.java | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java b/server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java index 02be0df346a5f..6877348f2fc15 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java +++ b/server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java @@ -26,7 +26,7 @@ public class ReadContext { private final long blobSize; private final List asyncPartStreams; private final String blobChecksum; - private Map metadata; + private final Map metadata; public ReadContext(long blobSize, List asyncPartStreams, String blobChecksum, Map metadata) { this.blobSize = blobSize; diff --git a/server/src/main/java/org/opensearch/common/blobstore/stream/write/WriteContext.java b/server/src/main/java/org/opensearch/common/blobstore/stream/write/WriteContext.java index 3373f32845c27..ed77350381584 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/stream/write/WriteContext.java +++ b/server/src/main/java/org/opensearch/common/blobstore/stream/write/WriteContext.java @@ -30,7 +30,7 @@ public class WriteContext { private final CheckedConsumer uploadFinalizer; private final boolean doRemoteDataIntegrityCheck; private final Long expectedChecksum; - private Map metadata; + private final Map metadata; /** * Construct a new WriteContext object @@ -77,6 +77,7 @@ protected WriteContext(WriteContext writeContext) { this.uploadFinalizer = writeContext.uploadFinalizer; this.doRemoteDataIntegrityCheck = writeContext.doRemoteDataIntegrityCheck; this.expectedChecksum = writeContext.expectedChecksum; + this.metadata = writeContext.getMetadata(); } /**