From 394753152c472a8e1d70da742df612f7822b7a93 Mon Sep 17 00:00:00 2001 From: Shubh Sahu Date: Tue, 7 Jan 2025 17:28:49 +0530 Subject: [PATCH] Addressing comments Signed-off-by: Shubh Sahu --- .../opensearch/index/shard/IndexShard.java | 16 +++------- .../opensearch/repositories/Repository.java | 2 +- .../blobstore/BlobStoreRepository.java | 31 ++++++++++++++++--- .../snapshots/SnapshotShardsService.java | 15 +++++---- 4 files changed, 42 insertions(+), 22 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index ce4f0d01f74ca..f5de4dfb5a933 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1625,25 +1625,19 @@ public org.apache.lucene.util.Version minimumCompatibleVersion() { } /** - * reads the last metadata file from remote store and fetches files present in commit and their sizes. - * @return Tuple(Tuple(primaryTerm, commitGeneration), indexFilesToFileLengthMap) + * Fetches the last remote uploaded segment metadata file + * @return {@link RemoteSegmentMetadata} * @throws IOException */ - public Tuple, Map> acquireLastRemoteUploadedIndexCommit() throws IOException { + public RemoteSegmentMetadata fetchLastRemoteUploadedSegmentMetadata() throws IOException { if (!indexSettings.isAssignedOnRemoteNode()) { throw new IllegalStateException("Index is not assigned on Remote Node"); } RemoteSegmentMetadata lastUploadedMetadata = getRemoteDirectory().readLatestMetadataFile(); if (lastUploadedMetadata == null) { - throw new IllegalStateException("No metadata file found in remote store"); + throw new FileNotFoundException("No metadata file found in remote store"); } - final Map indexFilesToFileLengthMap = lastUploadedMetadata.getMetadata() - .entrySet() - .stream() - .collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().getLength())); - long primaryTerm = lastUploadedMetadata.getPrimaryTerm(); - long commitGeneration = lastUploadedMetadata.getGeneration(); - return new Tuple<>(new Tuple<>(primaryTerm, commitGeneration), indexFilesToFileLengthMap); + return lastUploadedMetadata; } /** diff --git a/server/src/main/java/org/opensearch/repositories/Repository.java b/server/src/main/java/org/opensearch/repositories/Repository.java index eeadc25cac3c9..259c4a6e09ce7 100644 --- a/server/src/main/java/org/opensearch/repositories/Repository.java +++ b/server/src/main/java/org/opensearch/repositories/Repository.java @@ -420,7 +420,7 @@ default void snapshotRemoteStoreIndexShard( * Adds a reference of remote store data for a index commit point. *

* The index commit point can be obtained by using {@link org.opensearch.index.engine.Engine#acquireLastIndexCommit} method. - * Or for closed index can be obtained by reading last remote uploaded metadata by using {@link org.opensearch.index.shard.IndexShard#acquireLastRemoteUploadedIndexCommit} method. + * Or for closed index can be obtained by reading last remote uploaded metadata by using {@link org.opensearch.index.shard.IndexShard#fetchLastRemoteUploadedSegmentMetadata()} method. * Repository implementations shouldn't release the snapshot index commit point. It is done by the method caller. *

* As snapshot process progresses, implementation of this method should update {@link IndexShardSnapshotStatus} object and check diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index e6c6fabd848a4..012c064b51e72 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -3744,6 +3744,33 @@ private void writeAtomic(BlobContainer container, final String blobName, final B } } + @Override + public void snapshotRemoteStoreIndexShard( + Store store, + SnapshotId snapshotId, + IndexId indexId, + IndexCommit snapshotIndexCommit, + @Nullable String shardStateIdentifier, + IndexShardSnapshotStatus snapshotStatus, + long primaryTerm, + long startTime, + ActionListener listener + ) { + snapshotRemoteStoreIndexShard( + store, + snapshotId, + indexId, + snapshotIndexCommit, + shardStateIdentifier, + snapshotStatus, + primaryTerm, + snapshotIndexCommit.getGeneration(), + startTime, + null, + listener + ); + } + @Override public void snapshotRemoteStoreIndexShard( Store store, @@ -3762,10 +3789,6 @@ public void snapshotRemoteStoreIndexShard( listener.onFailure(new RepositoryException(metadata.name(), "cannot snapshot shard on a readonly repository")); return; } - if (snapshotIndexCommit == null && indexFilesToFileLengthMap == null) { - listener.onFailure(new RepositoryException(metadata.name(), "both snapshot index commit and index files map cannot be null")); - return; - } final ShardId shardId = store.shardId(); try { diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java index ffcee94cd6f95..1e2264593310d 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java @@ -48,7 +48,6 @@ import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Nullable; -import org.opensearch.common.collect.Tuple; import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.lifecycle.AbstractLifecycleComponent; import org.opensearch.common.settings.Settings; @@ -65,6 +64,7 @@ import org.opensearch.index.shard.IndexShardState; import org.opensearch.index.snapshots.IndexShardSnapshotStatus; import org.opensearch.index.snapshots.IndexShardSnapshotStatus.Stage; +import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; import org.opensearch.indices.IndicesService; import org.opensearch.repositories.IndexId; import org.opensearch.repositories.RepositoriesService; @@ -407,10 +407,13 @@ private void snapshot( try { if (closedIndex) { - final Tuple, Map> tuple = indexShard.acquireLastRemoteUploadedIndexCommit(); - primaryTerm = tuple.v1().v1(); - commitGeneration = tuple.v1().v2(); - indexFilesToFileLengthMap = tuple.v2(); + RemoteSegmentMetadata lastRemoteUploadedIndexCommit = indexShard.fetchLastRemoteUploadedSegmentMetadata(); + indexFilesToFileLengthMap = lastRemoteUploadedIndexCommit.getMetadata() + .entrySet() + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().getLength())); + primaryTerm = lastRemoteUploadedIndexCommit.getPrimaryTerm(); + commitGeneration = lastRemoteUploadedIndexCommit.getGeneration(); } else { wrappedSnapshot = indexShard.acquireLastIndexCommitAndRefresh(true); snapshotIndexCommit = wrappedSnapshot.get(); @@ -420,7 +423,7 @@ private void snapshot( } catch (IOException e) { if (closedIndex) { logger.warn("Exception while reading latest metadata file from remote store"); - throw e; + listener.onFailure(e); } else { wrappedSnapshot.close(); logger.warn(