Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restore remote index shards with ExistingStoreRecoverySource after restore from remote state #10665

Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
}

protected void restore(String... indices) {
boolean restoreAllShards = randomBoolean();
restore(randomBoolean(), indices);
}

protected void restore(boolean restoreAllShards, String... indices) {
if (restoreAllShards) {
assertAcked(client().admin().indices().prepareClose(indices));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,13 @@ private void resetCluster(int dataNodeCount, int clusterManagerNodeCount) {
internalCluster().startDataOnlyNodes(dataNodeCount);
}

protected void verifyRedIndicesAndTriggerRestore(Map<String, Long> indexStats, String indexName, boolean indexMoreDocs)
throws Exception {
ensureRed(indexName);
restore(false, indexName);
verifyRestoredData(indexStats, indexName, indexMoreDocs);
}

public void testFullClusterRestore() throws Exception {
int shardCount = randomIntBetween(1, 2);
int replicaCount = 1;
Expand All @@ -83,7 +90,7 @@ public void testFullClusterRestore() throws Exception {

// Step - 3 Trigger full cluster restore and validate
validateMetadata(List.of(INDEX_NAME));
verifyRestoredData(indexStats, INDEX_NAME);
verifyRedIndicesAndTriggerRestore(indexStats, INDEX_NAME, true);
}

public void testFullClusterRestoreMultipleIndices() throws Exception {
Expand Down Expand Up @@ -112,8 +119,8 @@ public void testFullClusterRestoreMultipleIndices() throws Exception {

// Step - 3 Trigger full cluster restore
validateMetadata(List.of(INDEX_NAME, secondIndexName));
verifyRestoredData(indexStats, INDEX_NAME);
verifyRestoredData(indexStats2, secondIndexName, false);
verifyRedIndicesAndTriggerRestore(indexStats, INDEX_NAME, false);
verifyRedIndicesAndTriggerRestore(indexStats2, secondIndexName, false);
assertTrue(INDEX_READ_ONLY_SETTING.get(clusterService().state().metadata().index(secondIndexName).getSettings()));
assertThrows(ClusterBlockException.class, () -> indexSingleDoc(secondIndexName));
// Test is complete
Expand Down Expand Up @@ -181,7 +188,7 @@ public void testRemoteStateFullRestart() throws Exception {
String newClusterUUID = clusterService().state().metadata().clusterUUID();
assert Objects.equals(newClusterUUID, prevClusterUUID) : "Full restart not successful. cluster uuid has changed";
validateCurrentMetadata();
verifyRestoredData(indexStats, INDEX_NAME);
verifyRedIndicesAndTriggerRestore(indexStats, INDEX_NAME, true);
}

private void validateMetadata(List<String> indexNames) {
Expand Down Expand Up @@ -246,19 +253,18 @@ public void testFullClusterRestoreGlobalMetadata() throws Exception {

// Step - 3 Trigger full cluster restore and validate
// validateCurrentMetadata();
verifyRestoredData(indexStats, INDEX_NAME, false);

// validate global metadata restored
verifyRestoredRepositories();
verifyRestoredIndexTemplate();
assertEquals(Integer.valueOf(34), SETTING_CLUSTER_MAX_SHARDS_PER_NODE.get(clusterService().state().metadata().settings()));
assertEquals(true, SETTING_READ_ONLY_SETTING.get(clusterService().state().metadata().settings()));
assertTrue(clusterService().state().blocks().hasGlobalBlock(CLUSTER_READ_ONLY_BLOCK));
// Test is complete

// Remote the cluster read only block to ensure proper cleanup
updatePersistentSettings(Settings.builder().put(SETTING_READ_ONLY_SETTING.getKey(), false).build());
assertFalse(clusterService().state().blocks().hasGlobalBlock(CLUSTER_READ_ONLY_BLOCK));

verifyRedIndicesAndTriggerRestore(indexStats, INDEX_NAME, false);

// validate global metadata restored
verifyRestoredRepositories();
verifyRestoredIndexTemplate();
}

private void registerCustomRepository() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,12 +466,12 @@ public Builder initializeAsRemoteStoreRestore(
}
for (int shardNumber = 0; shardNumber < indexMetadata.getNumberOfShards(); shardNumber++) {
ShardId shardId = new ShardId(index, shardNumber);
if (forceRecoverAllPrimaries == false && indexShardRoutingTableMap.containsKey(shardId) == false) {
if (indexShardRoutingTableMap.containsKey(shardId) == false) {
throw new IllegalStateException("IndexShardRoutingTable is not present for shardId: " + shardId);
}
IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId);
IndexShardRoutingTable indexShardRoutingTable = indexShardRoutingTableMap.get(shardId);
if (forceRecoverAllPrimaries || indexShardRoutingTable == null || indexShardRoutingTable.primaryShard().unassigned()) {
if (forceRecoverAllPrimaries || indexShardRoutingTable.primaryShard().unassigned()) {
// Primary shard to be recovered from remote store.
indexShardRoutingBuilder.addShard(ShardRouting.newUnassigned(shardId, true, recoverySource, unassignedInfo));
// All the replica shards to be recovered from peer recovery.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.common.settings.ClusterSettings;

Expand Down Expand Up @@ -121,21 +120,7 @@ static ClusterState updateRoutingTable(final ClusterState state) {
// initialize all index routing tables as empty
final RoutingTable.Builder routingTableBuilder = RoutingTable.builder(state.routingTable());
for (final IndexMetadata cursor : state.metadata().indices().values()) {
// Whether IndexMetadata is recovered from local disk or remote it doesn't matter to us at this point.
// We are only concerned about index data recovery here. Which is why we only check for remote store enabled and not for remote
// cluster state enabled.
if (cursor.getSettings().getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, false) == false
|| state.routingTable().hasIndex(cursor.getIndex()) == false
|| state.routingTable()
.index(cursor.getIndex())
.shardsMatchingPredicateCount(
shardRouting -> shardRouting.primary()
// We need to ensure atleast one of the primaries is being recovered from remote.
// This ensures we have gone through the RemoteStoreRestoreService and routing table is updated
&& shardRouting.recoverySource() instanceof RecoverySource.RemoteStoreRecoverySource
) == 0) {
routingTableBuilder.addAsRecovery(cursor);
}
routingTableBuilder.addAsRecovery(cursor);
}
// start with 0 based versions for routing table
routingTableBuilder.version(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,14 +183,15 @@ private RemoteRestoreResult executeRestore(
final String restoreUUID = UUIDs.randomBase64UUID();
List<String> indicesToBeRestored = new ArrayList<>();
int totalShards = 0;
boolean metadataFromRemoteStore = false;
ClusterState.Builder builder = ClusterState.builder(currentState);
Metadata.Builder mdBuilder = Metadata.builder(currentState.metadata());
ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
RoutingTable.Builder rtBuilder = RoutingTable.builder(currentState.routingTable());
for (Map.Entry<String, Tuple<Boolean, IndexMetadata>> indexMetadataEntry : indexMetadataMap.entrySet()) {
String indexName = indexMetadataEntry.getKey();
IndexMetadata indexMetadata = indexMetadataEntry.getValue().v2();
boolean metadataFromRemoteStore = indexMetadataEntry.getValue().v1();
metadataFromRemoteStore = indexMetadataEntry.getValue().v1();
IndexMetadata updatedIndexMetadata = indexMetadata;
if (metadataFromRemoteStore == false && restoreAllShards) {
updatedIndexMetadata = IndexMetadata.builder(indexMetadata)
Expand All @@ -204,27 +205,23 @@ private RemoteRestoreResult executeRestore(

IndexId indexId = new IndexId(indexName, updatedIndexMetadata.getIndexUUID());

Map<ShardId, IndexShardRoutingTable> indexShardRoutingTableMap = new HashMap<>();
if (metadataFromRemoteStore == false) {
indexShardRoutingTableMap = currentState.routingTable()
Map<ShardId, IndexShardRoutingTable> indexShardRoutingTableMap = currentState.routingTable()
.index(indexName)
.shards()
.values()
.stream()
.collect(Collectors.toMap(IndexShardRoutingTable::shardId, Function.identity()));

RecoverySource.RemoteStoreRecoverySource recoverySource = new RecoverySource.RemoteStoreRecoverySource(
restoreUUID,
updatedIndexMetadata.getCreationVersion(),
indexId
);

rtBuilder.addAsRemoteStoreRestore(updatedIndexMetadata, recoverySource, indexShardRoutingTableMap, restoreAllShards);
}

RecoverySource.RemoteStoreRecoverySource recoverySource = new RecoverySource.RemoteStoreRecoverySource(
restoreUUID,
updatedIndexMetadata.getCreationVersion(),
indexId
);
rtBuilder.addAsRemoteStoreRestore(
updatedIndexMetadata,
recoverySource,
indexShardRoutingTableMap,
restoreAllShards || metadataFromRemoteStore
);
blocks.updateBlocks(updatedIndexMetadata);
mdBuilder.put(updatedIndexMetadata, true);
indicesToBeRestored.add(indexName);
Expand All @@ -239,7 +236,10 @@ private RemoteRestoreResult executeRestore(

RoutingTable rt = rtBuilder.build();
ClusterState updatedState = builder.metadata(mdBuilder).blocks(blocks).routingTable(rt).build();
return RemoteRestoreResult.build(restoreUUID, restoreInfo, allocationService.reroute(updatedState, "restored from remote store"));
if (metadataFromRemoteStore == false) {
updatedState = allocationService.reroute(updatedState, "restored from remote store");
}
return RemoteRestoreResult.build(restoreUUID, restoreInfo, updatedState);
}

private void restoreGlobalMetadata(Metadata.Builder mdBuilder, Metadata remoteMetadata) {
Expand Down
12 changes: 11 additions & 1 deletion server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.opensearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.common.Booleans;
import org.opensearch.common.CheckedConsumer;
import org.opensearch.common.CheckedFunction;
Expand Down Expand Up @@ -3545,7 +3546,16 @@ public void startRecovery(
executeRecovery("from store", recoveryState, recoveryListener, this::recoverFromStore);
break;
case REMOTE_STORE:
executeRecovery("from remote store", recoveryState, recoveryListener, l -> restoreFromRemoteStore(l));
// When remote indices are restored via remote metadata,
// the recovery source is REMOTE_STORE and unassigned Reason is CLUSTER_RECOVERED
// During remote index restore from local disk metadata, the unassigned Reason is still CLUSTER_RECOVERED
// but the RecoverySource is not REMOTE_STORE but EXISTING_STORE
linuxpi marked this conversation as resolved.
Show resolved Hide resolved
if (shardRouting.unassignedInfo().getReason() == UnassignedInfo.Reason.CLUSTER_RECOVERED) {
// At this stage the shard is in INITIALIZING state
logger.info("Cannot start recovery yet!");
} else {
executeRecovery("from remote store", recoveryState, recoveryListener, l -> restoreFromRemoteStore(l));
}
break;
case PEER:
try {
Expand Down
Loading