Skip to content

Commit

Permalink
Prefer replicas on remote nodes
Browse files Browse the repository at this point in the history
Signed-off-by: Lakshya Taragi <[email protected]>
  • Loading branch information
ltaragi committed Apr 15, 2024
1 parent 6bc04b4 commit f1ed40d
Show file tree
Hide file tree
Showing 2 changed files with 178 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.common.Randomness;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.core.Assertions;
import org.opensearch.core.index.Index;
import org.opensearch.core.index.shard.ShardId;
Expand All @@ -61,12 +62,15 @@
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Queue;
import java.util.Random;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.node.remotestore.RemoteStoreNodeService.isMigratingToRemoteStore;

/**
* {@link RoutingNodes} represents a copy the routing information contained in the {@link ClusterState cluster state}.
* It can be either initialized as mutable or immutable (see {@link #RoutingNodes(ClusterState, boolean)}), allowing
Expand Down Expand Up @@ -418,6 +422,29 @@ public ShardRouting activeReplicaWithOldestVersion(ShardId shardId) {
.orElse(null);
}

/**
* Returns one active replica shard on a remote node for the given shard id or <code>null</code> if
* no such replica is found.
* <p>
* Since we aim to continue moving forward during remote store migration, replicas already migrated to remote nodes
* are preferred for primary promotion
*/
public ShardRouting activeReplicaOnRemoteNode(ShardId shardId) {
List<ShardRouting> replicaShardsOnRemote = assignedShards(shardId).stream()
.filter(shr -> !shr.primary() && shr.active())
.filter((shr) -> {
RoutingNode nd = node(shr.currentNodeId());
return (nd != null && nd.node().isRemoteStoreNode());
})
.collect(Collectors.toList());
ShardRouting replicaShard = null;
if (replicaShardsOnRemote.isEmpty() == false) {
Random rand = Randomness.get();
replicaShard = replicaShardsOnRemote.get(rand.nextInt(replicaShardsOnRemote.size()));
}
return replicaShard;
}

/**
* Returns <code>true</code> iff all replicas are active for the given shard routing. Otherwise <code>false</code>
*/
Expand Down Expand Up @@ -735,11 +762,17 @@ private void unassignPrimaryAndPromoteActiveReplicaIfExists(
RoutingChangesObserver routingChangesObserver
) {
assert failedShard.primary();
ShardRouting activeReplica;
if (metadata.isSegmentReplicationEnabled(failedShard.getIndexName())) {
activeReplica = activeReplicaWithOldestVersion(failedShard.shardId());
} else {
activeReplica = activeReplicaWithHighestVersion(failedShard.shardId());
ShardRouting activeReplica = null;
if (isMigratingToRemoteStore(new ClusterSettings(metadata.settings(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))) {
// we might not find any replica on remote node
activeReplica = activeReplicaOnRemoteNode(failedShard.shardId());
}
if (activeReplica == null) {
if (metadata.isSegmentReplicationEnabled(failedShard.getIndexName())) {
activeReplica = activeReplicaWithOldestVersion(failedShard.shardId());
} else {
activeReplica = activeReplicaWithHighestVersion(failedShard.shardId());
}
}
if (activeReplica == null) {
moveToUnassigned(failedShard, unassignedInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,27 +40,37 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodeRole;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.RoutingNodes;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.command.AllocationCommands;
import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.opensearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
import org.opensearch.common.UUIDs;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
import org.opensearch.test.VersionUtils;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING;
import static org.opensearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.opensearch.cluster.routing.ShardRoutingState.RELOCATING;
import static org.opensearch.cluster.routing.ShardRoutingState.STARTED;
import static org.opensearch.cluster.routing.ShardRoutingState.UNASSIGNED;
import static org.opensearch.common.util.FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThan;
Expand Down Expand Up @@ -812,4 +822,134 @@ private void testReplicaIsPromoted(boolean isSegmentReplicationEnabled) {
}
}
}

public void testPreferReplicaOnRemoteNodeForPrimaryPromotion() {
FeatureFlags.initializeFeatureFlags(Settings.builder().put(REMOTE_STORE_MIGRATION_EXPERIMENTAL, "true").build());
AllocationService allocation = createAllocationService(Settings.builder().build());

// segment replication enabled
Settings.Builder settingsBuilder = settings(Version.CURRENT).put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT);

// remote store migration metadata settings
Metadata metadata = Metadata.builder()
.put(IndexMetadata.builder("test").settings(settingsBuilder).numberOfShards(1).numberOfReplicas(4))
.persistentSettings(
Settings.builder()
.put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), RemoteStoreNodeService.CompatibilityMode.MIXED.mode)
.put(MIGRATION_DIRECTION_SETTING.getKey(), RemoteStoreNodeService.Direction.REMOTE_STORE.direction)
.build()
)
.build();

RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build();

ClusterState clusterState = ClusterState.builder(CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.metadata(metadata)
.routingTable(initialRoutingTable)
.build();

ShardId shardId = new ShardId(metadata.index("test").getIndex(), 0);

// add a remote node and start primary shard
Map<String, String> remoteStoreNodeAttributes = Map.of(
REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY,
"REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_VALUE"
);
DiscoveryNode remoteNode1 = new DiscoveryNode(
UUIDs.base64UUID(),
buildNewFakeTransportAddress(),
remoteStoreNodeAttributes,
DiscoveryNodeRole.BUILT_IN_ROLES,
Version.CURRENT
);
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().add(remoteNode1)).build();
clusterState = ClusterState.builder(clusterState).routingTable(allocation.reroute(clusterState, "reroute").routingTable()).build();
assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1));
assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(4));

clusterState = startInitializingShardsAndReroute(allocation, clusterState);
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(1));
assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(4));

// add remote and non-remote nodes and start replica shards
DiscoveryNode remoteNode2 = new DiscoveryNode(
UUIDs.base64UUID(),
buildNewFakeTransportAddress(),
remoteStoreNodeAttributes,
DiscoveryNodeRole.BUILT_IN_ROLES,
Version.CURRENT
);
DiscoveryNode remoteNode3 = new DiscoveryNode(
UUIDs.base64UUID(),
buildNewFakeTransportAddress(),
remoteStoreNodeAttributes,
DiscoveryNodeRole.BUILT_IN_ROLES,
Version.CURRENT
);
DiscoveryNode nonRemoteNode1 = new DiscoveryNode(UUIDs.base64UUID(), buildNewFakeTransportAddress(), Version.CURRENT);
DiscoveryNode nonRemoteNode2 = new DiscoveryNode(UUIDs.base64UUID(), buildNewFakeTransportAddress(), Version.CURRENT);
List<DiscoveryNode> replicaShardNodes = List.of(remoteNode2, remoteNode3, nonRemoteNode1, nonRemoteNode2);

for (int i = 0; i < 4; i++) {
clusterState = ClusterState.builder(clusterState)
.nodes(DiscoveryNodes.builder(clusterState.nodes()).add(replicaShardNodes.get(i)))
.build();

clusterState = allocation.reroute(clusterState, "reroute");
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(1 + i));
assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1));
assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(4 - (i + 1)));

clusterState = startInitializingShardsAndReroute(allocation, clusterState);
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(1 + (i + 1)));
assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(4 - (i + 1)));
}

// fail primary shard
ShardRouting primaryShard0 = clusterState.routingTable().index("test").shard(0).primaryShard();
ClusterState newState = allocation.applyFailedShard(clusterState, primaryShard0, randomBoolean());
assertNotEquals(clusterState, newState);
clusterState = newState;

// verify that promoted replica exists on a remote node
assertEquals(4, clusterState.getRoutingNodes().shardsWithState(STARTED).size());
ShardRouting primaryShardRouting1 = clusterState.routingTable().index("test").shard(0).primaryShard();
assertNotEquals(primaryShard0, primaryShardRouting1);
assertTrue(
primaryShardRouting1.currentNodeId().equals(remoteNode2.getId())
|| primaryShardRouting1.currentNodeId().equals(remoteNode3.getId())
);

// fail primary shard again
newState = allocation.applyFailedShard(clusterState, primaryShardRouting1, randomBoolean());
assertNotEquals(clusterState, newState);
clusterState = newState;

// verify that promoted replica again exists on a remote node
assertEquals(3, clusterState.getRoutingNodes().shardsWithState(STARTED).size());
ShardRouting primaryShardRouting2 = clusterState.routingTable().index("test").shard(0).primaryShard();
assertNotEquals(primaryShardRouting1, primaryShardRouting2);
assertTrue(
primaryShardRouting2.currentNodeId().equals(remoteNode2.getId())
|| primaryShardRouting2.currentNodeId().equals(remoteNode3.getId())
);
assertNotEquals(primaryShardRouting1.currentNodeId(), primaryShardRouting2.currentNodeId());

ShardRouting expectedCandidateForSegRep = clusterState.getRoutingNodes().activeReplicaWithOldestVersion(shardId);

// fail primary shard again
newState = allocation.applyFailedShard(clusterState, primaryShardRouting2, randomBoolean());
assertNotEquals(clusterState, newState);
clusterState = newState;

// verify that promoted replica exists on a non-remote node
assertEquals(2, clusterState.getRoutingNodes().shardsWithState(STARTED).size());
ShardRouting primaryShardRouting3 = clusterState.routingTable().index("test").shard(0).primaryShard();
assertNotEquals(primaryShardRouting2, primaryShardRouting3);
assertTrue(
primaryShardRouting3.currentNodeId().equals(nonRemoteNode1.getId())
|| primaryShardRouting3.currentNodeId().equals(nonRemoteNode2.getId())
);
assertEquals(expectedCandidateForSegRep.allocationId(), primaryShardRouting3.allocationId());
}
}

0 comments on commit f1ed40d

Please sign in to comment.