diff --git a/CHANGELOG.md b/CHANGELOG.md index e9df5879182ac..b42d35580add8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -59,6 +59,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Make entries for dependencies from server/build.gradle to gradle version catalog ([#16707](https://github.com/opensearch-project/OpenSearch/pull/16707)) - Allow extended plugins to be optional ([#16909](https://github.com/opensearch-project/OpenSearch/pull/16909)) - Use the correct type to widen the sort fields when merging top docs ([#16881](https://github.com/opensearch-project/OpenSearch/pull/16881)) +- Limit reader writer separation to remote store enabled clusters [#16760](https://github.com/opensearch-project/OpenSearch/pull/16760) ### Deprecated - Performing update operation with default pipeline or final pipeline is deprecated ([#16712](https://github.com/opensearch-project/OpenSearch/pull/16712)) diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/allocation/SearchReplicaFilteringAllocationIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/allocation/SearchReplicaFilteringAllocationIT.java index 5f65d6647f26d..df2620b794686 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/allocation/SearchReplicaFilteringAllocationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/allocation/SearchReplicaFilteringAllocationIT.java @@ -14,6 +14,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.util.FeatureFlags; import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase; import org.opensearch.test.OpenSearchIntegTestCase; import java.util.List; @@ -23,7 +24,7 @@ import static org.opensearch.cluster.routing.allocation.decider.SearchReplicaAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) -public class SearchReplicaFilteringAllocationIT extends OpenSearchIntegTestCase { +public class SearchReplicaFilteringAllocationIT extends RemoteStoreBaseIntegTestCase { @Override protected Settings featureFlagSettings() { diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SearchReplicaReplicationAndRecoveryIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SearchReplicaReplicationAndRecoveryIT.java new file mode 100644 index 0000000000000..7d4dd62cdca61 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SearchReplicaReplicationAndRecoveryIT.java @@ -0,0 +1,325 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; +import org.opensearch.action.admin.cluster.node.stats.NodeStats; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest; +import org.opensearch.action.admin.indices.recovery.RecoveryRequest; +import org.opensearch.action.admin.indices.recovery.RecoveryResponse; +import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse; +import org.opensearch.action.support.PlainActionFuture; +import org.opensearch.cluster.health.ClusterHealthStatus; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.routing.RecoverySource; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.index.SegmentReplicationPerGroupStats; +import org.opensearch.index.SegmentReplicationShardStats; +import org.opensearch.indices.recovery.RecoveryState; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.test.InternalTestCluster; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.junit.After; + +import java.nio.file.Path; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutionException; + +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS; +import static org.opensearch.cluster.routing.RecoverySource.Type.EMPTY_STORE; +import static org.opensearch.cluster.routing.RecoverySource.Type.EXISTING_STORE; +import static org.opensearch.cluster.routing.allocation.decider.SearchReplicaAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class SearchReplicaReplicationAndRecoveryIT extends SegmentReplicationBaseIT { + + private static final String REPOSITORY_NAME = "test-remote-store-repo"; + protected Path absolutePath; + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + if (absolutePath == null) { + absolutePath = randomRepoPath().toAbsolutePath(); + } + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put(remoteStoreClusterSettings(REPOSITORY_NAME, absolutePath)) + .build(); + } + + @After + public void teardown() { + clusterAdmin().prepareCleanupRepository(REPOSITORY_NAME).get(); + + } + + @Override + public Settings indexSettings() { + return Settings.builder() + .put(super.indexSettings()) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 1) + .build(); + } + + @Override + protected Settings featureFlagSettings() { + return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, true).build(); + } + + public void testReplication() throws Exception { + internalCluster().startClusterManagerOnlyNode(); + final String primary = internalCluster().startDataOnlyNode(); + createIndex(INDEX_NAME); + ensureYellowAndNoInitializingShards(INDEX_NAME); + final String replica = internalCluster().startDataOnlyNode(); + ensureGreen(INDEX_NAME); + + final int docCount = 10; + for (int i = 0; i < docCount; i++) { + client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get(); + } + refresh(INDEX_NAME); + waitForSearchableDocs(docCount, primary, replica); + } + + public void testSegmentReplicationStatsResponseWithSearchReplica() throws Exception { + internalCluster().startClusterManagerOnlyNode(); + final List nodes = internalCluster().startDataOnlyNodes(2); + createIndex( + INDEX_NAME, + Settings.builder() + .put("number_of_shards", 1) + .put("number_of_replicas", 0) + .put("number_of_search_only_replicas", 1) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .build() + ); + ensureGreen(INDEX_NAME); + + final int docCount = 5; + for (int i = 0; i < docCount; i++) { + client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get(); + } + refresh(INDEX_NAME); + waitForSearchableDocs(docCount, nodes); + + SegmentReplicationStatsResponse segmentReplicationStatsResponse = dataNodeClient().admin() + .indices() + .prepareSegmentReplicationStats(INDEX_NAME) + .setDetailed(true) + .execute() + .actionGet(); + + // Verify the number of indices + assertEquals(1, segmentReplicationStatsResponse.getReplicationStats().size()); + // Verify total shards + assertEquals(2, segmentReplicationStatsResponse.getTotalShards()); + // Verify the number of primary shards + assertEquals(1, segmentReplicationStatsResponse.getReplicationStats().get(INDEX_NAME).size()); + + SegmentReplicationPerGroupStats perGroupStats = segmentReplicationStatsResponse.getReplicationStats().get(INDEX_NAME).get(0); + Set replicaStats = perGroupStats.getReplicaStats(); + // Verify the number of replica stats + assertEquals(1, replicaStats.size()); + for (SegmentReplicationShardStats replicaStat : replicaStats) { + assertNotNull(replicaStat.getCurrentReplicationState()); + } + } + + public void testSearchReplicaRecovery() throws Exception { + internalCluster().startClusterManagerOnlyNode(); + final String primary = internalCluster().startDataOnlyNode(); + final String replica = internalCluster().startDataOnlyNode(); + + // ensure search replicas are only allocated to "replica" node. + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings(Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", replica)) + .execute() + .actionGet(); + + createIndex(INDEX_NAME); + ensureGreen(INDEX_NAME); + assertRecoverySourceType(replica, EMPTY_STORE); + + final int docCount = 10; + for (int i = 0; i < docCount; i++) { + client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get(); + } + refresh(INDEX_NAME); + flush(INDEX_NAME); + waitForSearchableDocs(10, primary, replica); + + // Node stats should show remote download stats as nonzero, use this as a precondition to compare + // post restart. + assertDownloadStats(replica, true); + NodesStatsResponse nodesStatsResponse; + NodeStats nodeStats; + + internalCluster().restartNode(replica); + ensureGreen(INDEX_NAME); + assertDocCounts(10, replica); + + // assert existing store recovery + assertRecoverySourceType(replica, EXISTING_STORE); + assertDownloadStats(replica, false); + } + + public void testRecoveryAfterDocsIndexed() throws Exception { + internalCluster().startClusterManagerOnlyNode(); + final String primary = internalCluster().startDataOnlyNode(); + createIndex(INDEX_NAME); + ensureYellowAndNoInitializingShards(INDEX_NAME); + final int docCount = 10; + for (int i = 0; i < docCount; i++) { + client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get(); + } + refresh(INDEX_NAME); + + final String replica = internalCluster().startDataOnlyNode(); + ensureGreen(INDEX_NAME); + assertDocCounts(10, replica); + + assertRecoverySourceType(replica, EMPTY_STORE); + // replica should have downloaded from remote + assertDownloadStats(replica, true); + + client().admin() + .indices() + .prepareUpdateSettings(INDEX_NAME) + .setSettings(Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 0)) + .get(); + + ensureGreen(INDEX_NAME); + + client().admin() + .indices() + .prepareUpdateSettings(INDEX_NAME) + .setSettings(Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1)) + .get(); + ensureGreen(INDEX_NAME); + assertDocCounts(10, replica); + + internalCluster().restartNode(replica); + + ensureGreen(INDEX_NAME); + assertDocCounts(10, replica); + assertRecoverySourceType(replica, EXISTING_STORE); + assertDownloadStats(replica, false); + } + + private static void assertRecoverySourceType(String replica, RecoverySource.Type recoveryType) throws InterruptedException, + ExecutionException { + RecoveryResponse recoveryResponse = client().admin().indices().recoveries(new RecoveryRequest(INDEX_NAME)).get(); + for (RecoveryState recoveryState : recoveryResponse.shardRecoveryStates().get(INDEX_NAME)) { + if (recoveryState.getPrimary() == false) { + assertEquals("All SR should be of expected recovery type", recoveryType, recoveryState.getRecoverySource().getType()); + assertEquals("All SR should be on the specified node", replica, recoveryState.getTargetNode().getName()); + } + } + } + + private static void assertDownloadStats(String replica, boolean expectBytesDownloaded) throws InterruptedException, ExecutionException { + NodesStatsResponse nodesStatsResponse = client().admin().cluster().nodesStats(new NodesStatsRequest(replica)).get(); + assertEquals(1, nodesStatsResponse.getNodes().size()); + NodeStats nodeStats = nodesStatsResponse.getNodes().get(0); + assertEquals(replica, nodeStats.getNode().getName()); + if (expectBytesDownloaded) { + assertTrue(nodeStats.getIndices().getSegments().getRemoteSegmentStats().getDownloadBytesStarted() > 0); + } else { + assertEquals(0, nodeStats.getIndices().getSegments().getRemoteSegmentStats().getDownloadBytesStarted()); + } + } + + public void testStopPrimary_RestoreOnNewNode() throws Exception { + internalCluster().startClusterManagerOnlyNode(); + final String primary = internalCluster().startDataOnlyNode(); + createIndex(INDEX_NAME); + ensureYellowAndNoInitializingShards(INDEX_NAME); + final int docCount = 10; + for (int i = 0; i < docCount; i++) { + client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get(); + } + refresh(INDEX_NAME); + assertDocCounts(docCount, primary); + + final String replica = internalCluster().startDataOnlyNode(); + ensureGreen(INDEX_NAME); + assertDocCounts(docCount, replica); + // stop the primary + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primary)); + + assertBusy(() -> { + ClusterHealthResponse clusterHealthResponse = clusterAdmin().prepareHealth(INDEX_NAME).get(); + assertEquals(ClusterHealthStatus.RED, clusterHealthResponse.getStatus()); + }); + assertDocCounts(docCount, replica); + + String restoredPrimary = internalCluster().startDataOnlyNode(); + + client().admin().cluster().restoreRemoteStore(new RestoreRemoteStoreRequest().indices(INDEX_NAME), PlainActionFuture.newFuture()); + ensureGreen(INDEX_NAME); + assertDocCounts(docCount, replica, restoredPrimary); + + for (int i = docCount; i < docCount * 2; i++) { + client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get(); + } + refresh(INDEX_NAME); + assertBusy(() -> assertDocCounts(20, replica, restoredPrimary)); + } + + public void testFailoverToNewPrimaryWithPollingReplication() throws Exception { + internalCluster().startClusterManagerOnlyNode(); + final String primary = internalCluster().startDataOnlyNode(); + createIndex(INDEX_NAME); + ensureYellowAndNoInitializingShards(INDEX_NAME); + final int docCount = 10; + for (int i = 0; i < docCount; i++) { + client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get(); + } + refresh(INDEX_NAME); + + final String replica = internalCluster().startDataOnlyNode(); + ensureGreen(INDEX_NAME); + assertDocCounts(10, replica); + + client().admin() + .indices() + .prepareUpdateSettings(INDEX_NAME) + .setSettings(Settings.builder().put(SETTING_NUMBER_OF_REPLICAS, 1)) + .get(); + final String writer_replica = internalCluster().startDataOnlyNode(); + ensureGreen(INDEX_NAME); + + // stop the primary + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primary)); + + assertBusy(() -> { + ClusterHealthResponse clusterHealthResponse = clusterAdmin().prepareHealth(INDEX_NAME).get(); + assertEquals(ClusterHealthStatus.YELLOW, clusterHealthResponse.getStatus()); + }); + ClusterHealthResponse clusterHealthResponse = clusterAdmin().prepareHealth(INDEX_NAME).get(); + assertEquals(ClusterHealthStatus.YELLOW, clusterHealthResponse.getStatus()); + assertDocCounts(10, replica); + + for (int i = docCount; i < docCount * 2; i++) { + client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get(); + } + refresh(INDEX_NAME); + assertBusy(() -> assertDocCounts(20, replica, writer_replica)); + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SearchReplicaReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SearchReplicaReplicationIT.java deleted file mode 100644 index f660695af9965..0000000000000 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SearchReplicaReplicationIT.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.indices.replication; - -import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse; -import org.opensearch.cluster.metadata.IndexMetadata; -import org.opensearch.common.settings.Settings; -import org.opensearch.common.util.FeatureFlags; -import org.opensearch.index.SegmentReplicationPerGroupStats; -import org.opensearch.index.SegmentReplicationShardStats; -import org.opensearch.indices.replication.common.ReplicationType; -import org.opensearch.test.OpenSearchIntegTestCase; -import org.junit.After; -import org.junit.Before; - -import java.nio.file.Path; -import java.util.List; -import java.util.Set; - -@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) -public class SearchReplicaReplicationIT extends SegmentReplicationBaseIT { - - private static final String REPOSITORY_NAME = "test-remote-store-repo"; - protected Path absolutePath; - - private Boolean useRemoteStore; - - @Before - public void randomizeRemoteStoreEnabled() { - useRemoteStore = randomBoolean(); - } - - @Override - protected Settings nodeSettings(int nodeOrdinal) { - if (useRemoteStore) { - if (absolutePath == null) { - absolutePath = randomRepoPath().toAbsolutePath(); - } - return Settings.builder() - .put(super.nodeSettings(nodeOrdinal)) - .put(remoteStoreClusterSettings(REPOSITORY_NAME, absolutePath)) - .build(); - } - return super.nodeSettings(nodeOrdinal); - } - - @After - public void teardown() { - if (useRemoteStore) { - clusterAdmin().prepareCleanupRepository(REPOSITORY_NAME).get(); - } - } - - @Override - public Settings indexSettings() { - return Settings.builder() - .put(super.indexSettings()) - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) - .put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 1) - .build(); - } - - @Override - protected Settings featureFlagSettings() { - return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, true).build(); - } - - public void testReplication() throws Exception { - internalCluster().startClusterManagerOnlyNode(); - final String primary = internalCluster().startDataOnlyNode(); - createIndex(INDEX_NAME); - ensureYellowAndNoInitializingShards(INDEX_NAME); - final String replica = internalCluster().startDataOnlyNode(); - ensureGreen(INDEX_NAME); - - final int docCount = 10; - for (int i = 0; i < docCount; i++) { - client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get(); - } - refresh(INDEX_NAME); - waitForSearchableDocs(docCount, primary, replica); - } - - public void testSegmentReplicationStatsResponseWithSearchReplica() throws Exception { - internalCluster().startClusterManagerOnlyNode(); - final List nodes = internalCluster().startDataOnlyNodes(2); - createIndex( - INDEX_NAME, - Settings.builder() - .put("number_of_shards", 1) - .put("number_of_replicas", 0) - .put("number_of_search_only_replicas", 1) - .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) - .build() - ); - ensureGreen(INDEX_NAME); - - final int docCount = 5; - for (int i = 0; i < docCount; i++) { - client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get(); - } - refresh(INDEX_NAME); - waitForSearchableDocs(docCount, nodes); - - SegmentReplicationStatsResponse segmentReplicationStatsResponse = dataNodeClient().admin() - .indices() - .prepareSegmentReplicationStats(INDEX_NAME) - .setDetailed(true) - .execute() - .actionGet(); - - // Verify the number of indices - assertEquals(1, segmentReplicationStatsResponse.getReplicationStats().size()); - // Verify total shards - assertEquals(2, segmentReplicationStatsResponse.getTotalShards()); - // Verify the number of primary shards - assertEquals(1, segmentReplicationStatsResponse.getReplicationStats().get(INDEX_NAME).size()); - - SegmentReplicationPerGroupStats perGroupStats = segmentReplicationStatsResponse.getReplicationStats().get(INDEX_NAME).get(0); - Set replicaStats = perGroupStats.getReplicaStats(); - // Verify the number of replica stats - assertEquals(1, replicaStats.size()); - for (SegmentReplicationShardStats replicaStat : replicaStats) { - assertNotNull(replicaStat.getCurrentReplicationState()); - } - } -} diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SearchReplicaRestoreIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SearchReplicaRestoreIT.java index 352332b962c92..e8d65e07c7dd9 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SearchReplicaRestoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SearchReplicaRestoreIT.java @@ -15,7 +15,7 @@ import org.opensearch.common.util.FeatureFlags; import org.opensearch.index.query.QueryBuilders; import org.opensearch.indices.replication.common.ReplicationType; -import org.opensearch.snapshots.AbstractSnapshotIntegTestCase; +import org.opensearch.remotestore.RemoteSnapshotIT; import org.opensearch.snapshots.SnapshotRestoreException; import org.opensearch.test.OpenSearchIntegTestCase; @@ -26,7 +26,7 @@ import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) -public class SearchReplicaRestoreIT extends AbstractSnapshotIntegTestCase { +public class SearchReplicaRestoreIT extends RemoteSnapshotIT { private static final String INDEX_NAME = "test-idx-1"; private static final String RESTORED_INDEX_NAME = INDEX_NAME + "-restored"; @@ -40,49 +40,6 @@ protected Settings featureFlagSettings() { return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, true).build(); } - public void testSearchReplicaRestore_WhenSnapshotOnDocRep_RestoreOnDocRepWithSearchReplica() throws Exception { - bootstrapIndexWithOutSearchReplicas(ReplicationType.DOCUMENT); - createRepoAndSnapshot(REPOSITORY_NAME, FS_REPOSITORY_TYPE, SNAPSHOT_NAME, INDEX_NAME); - - SnapshotRestoreException exception = expectThrows( - SnapshotRestoreException.class, - () -> restoreSnapshot( - REPOSITORY_NAME, - SNAPSHOT_NAME, - INDEX_NAME, - RESTORED_INDEX_NAME, - Settings.builder() - .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT) - .put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1) - .build() - ) - ); - assertTrue(exception.getMessage().contains(getSnapshotExceptionMessage(ReplicationType.DOCUMENT, ReplicationType.DOCUMENT))); - } - - public void testSearchReplicaRestore_WhenSnapshotOnDocRep_RestoreOnSegRepWithSearchReplica() throws Exception { - bootstrapIndexWithOutSearchReplicas(ReplicationType.DOCUMENT); - createRepoAndSnapshot(REPOSITORY_NAME, FS_REPOSITORY_TYPE, SNAPSHOT_NAME, INDEX_NAME); - - restoreSnapshot( - REPOSITORY_NAME, - SNAPSHOT_NAME, - INDEX_NAME, - RESTORED_INDEX_NAME, - Settings.builder() - .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) - .put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1) - .build() - ); - ensureYellowAndNoInitializingShards(RESTORED_INDEX_NAME); - internalCluster().startDataOnlyNode(); - ensureGreen(RESTORED_INDEX_NAME); - assertEquals(1, getNumberOfSearchReplicas(RESTORED_INDEX_NAME)); - - SearchResponse resp = client().prepareSearch(RESTORED_INDEX_NAME).setQuery(QueryBuilders.matchAllQuery()).get(); - assertHitCount(resp, DOC_COUNT); - } - public void testSearchReplicaRestore_WhenSnapshotOnSegRep_RestoreOnDocRepWithSearchReplica() throws Exception { bootstrapIndexWithOutSearchReplicas(ReplicationType.SEGMENT); createRepoAndSnapshot(REPOSITORY_NAME, FS_REPOSITORY_TYPE, SNAPSHOT_NAME, INDEX_NAME); @@ -140,27 +97,6 @@ public void testSearchReplicaRestore_WhenSnapshotOnSegRepWithSearchReplica_Resto assertTrue(exception.getMessage().contains(getSnapshotExceptionMessage(ReplicationType.SEGMENT, ReplicationType.DOCUMENT))); } - public void testSearchReplicaRestore_WhenSnapshotOnSegRepWithSearchReplica_RestoreOnDocRepWithNoSearchReplica() throws Exception { - bootstrapIndexWithSearchReplicas(); - createRepoAndSnapshot(REPOSITORY_NAME, FS_REPOSITORY_TYPE, SNAPSHOT_NAME, INDEX_NAME); - - restoreSnapshot( - REPOSITORY_NAME, - SNAPSHOT_NAME, - INDEX_NAME, - RESTORED_INDEX_NAME, - Settings.builder() - .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT) - .put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 0) - .build() - ); - ensureGreen(RESTORED_INDEX_NAME); - assertEquals(0, getNumberOfSearchReplicas(RESTORED_INDEX_NAME)); - - SearchResponse resp = client().prepareSearch(RESTORED_INDEX_NAME).setQuery(QueryBuilders.matchAllQuery()).get(); - assertHitCount(resp, DOC_COUNT); - } - private void bootstrapIndexWithOutSearchReplicas(ReplicationType replicationType) throws InterruptedException { startCluster(2); diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/settings/SearchOnlyReplicaIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/settings/SearchOnlyReplicaIT.java index fa836e2cc5784..f524f4d1298c1 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/settings/SearchOnlyReplicaIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/settings/SearchOnlyReplicaIT.java @@ -20,6 +20,7 @@ import org.opensearch.common.util.FeatureFlags; import org.opensearch.index.query.QueryBuilders; import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase; import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; @@ -31,7 +32,7 @@ import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) -public class SearchOnlyReplicaIT extends OpenSearchIntegTestCase { +public class SearchOnlyReplicaIT extends RemoteStoreBaseIntegTestCase { private static final String TEST_INDEX = "test_index"; @@ -55,35 +56,6 @@ public Settings indexSettings() { .build(); } - public void testCreateDocRepFails() { - Settings settings = Settings.builder().put(indexSettings()).put(SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT).build(); - - IllegalArgumentException illegalArgumentException = expectThrows( - IllegalArgumentException.class, - () -> createIndex(TEST_INDEX, settings) - ); - assertEquals(expectedFailureMessage, illegalArgumentException.getMessage()); - } - - public void testUpdateDocRepFails() { - Settings settings = Settings.builder() - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) - .put(SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT) - .build(); - // create succeeds - createIndex(TEST_INDEX, settings); - - // update fails - IllegalArgumentException illegalArgumentException = expectThrows(IllegalArgumentException.class, () -> { - client().admin() - .indices() - .prepareUpdateSettings(TEST_INDEX) - .setSettings(Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1)) - .get(); - }); - assertEquals(expectedFailureMessage, illegalArgumentException.getMessage()); - } - public void testFailoverWithSearchReplica_WithWriterReplicas() throws IOException { int numSearchReplicas = 1; int numWriterReplicas = 1; diff --git a/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java b/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java index 05588620348aa..b9eb2643b0fb5 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java @@ -1103,14 +1103,9 @@ static Settings aggregateIndexSettings( private static void updateSearchOnlyReplicas(Settings requestSettings, Settings.Builder builder) { if (INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING.exists(builder) && builder.get(SETTING_NUMBER_OF_SEARCH_REPLICAS) != null) { if (INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING.get(requestSettings) > 0 - && ReplicationType.parseString(builder.get(INDEX_REPLICATION_TYPE_SETTING.getKey())).equals(ReplicationType.DOCUMENT)) { + && Boolean.parseBoolean(builder.get(SETTING_REMOTE_STORE_ENABLED)) == false) { throw new IllegalArgumentException( - "To set " - + SETTING_NUMBER_OF_SEARCH_REPLICAS - + ", " - + INDEX_REPLICATION_TYPE_SETTING.getKey() - + " must be set to " - + ReplicationType.SEGMENT + "To set " + SETTING_NUMBER_OF_SEARCH_REPLICAS + ", " + SETTING_REMOTE_STORE_ENABLED + " must be set to true" ); } builder.put(SETTING_NUMBER_OF_SEARCH_REPLICAS, INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING.get(requestSettings)); diff --git a/server/src/main/java/org/opensearch/cluster/metadata/MetadataUpdateSettingsService.java b/server/src/main/java/org/opensearch/cluster/metadata/MetadataUpdateSettingsService.java index 4f0c0f3aa2ab4..ccaa940a0ca9d 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/MetadataUpdateSettingsService.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/MetadataUpdateSettingsService.java @@ -63,7 +63,6 @@ import org.opensearch.index.IndexSettings; import org.opensearch.indices.IndicesService; import org.opensearch.indices.ShardLimitValidator; -import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.threadpool.ThreadPool; import java.io.IOException; @@ -77,8 +76,8 @@ import java.util.Set; import static org.opensearch.action.support.ContextPreservingActionListener.wrapPreservingContext; -import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_REPLICATION_TYPE_SETTING; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_STORE_ENABLED; import static org.opensearch.cluster.metadata.MetadataCreateIndexService.validateOverlap; import static org.opensearch.cluster.metadata.MetadataCreateIndexService.validateRefreshIntervalSettings; import static org.opensearch.cluster.metadata.MetadataCreateIndexService.validateTranslogDurabilitySettings; @@ -539,14 +538,12 @@ public ClusterState execute(ClusterState currentState) { private void validateSearchReplicaCountSettings(Settings requestSettings, Index[] indices, ClusterState currentState) { final int updatedNumberOfSearchReplicas = IndexMetadata.INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING.get(requestSettings); if (updatedNumberOfSearchReplicas > 0) { - if (Arrays.stream(indices).allMatch(index -> currentState.metadata().isSegmentReplicationEnabled(index.getName())) == false) { + if (Arrays.stream(indices) + .allMatch( + index -> currentState.metadata().index(index.getName()).getSettings().getAsBoolean(SETTING_REMOTE_STORE_ENABLED, false) + ) == false) { throw new IllegalArgumentException( - "To set " - + SETTING_NUMBER_OF_SEARCH_REPLICAS - + ", " - + INDEX_REPLICATION_TYPE_SETTING.getKey() - + " must be set to " - + ReplicationType.SEGMENT + "To set " + SETTING_NUMBER_OF_SEARCH_REPLICAS + ", " + SETTING_REMOTE_STORE_ENABLED + " must be set to true" ); } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/IndexRoutingTable.java b/server/src/main/java/org/opensearch/cluster/routing/IndexRoutingTable.java index b46efad9207c1..3be63cd81b362 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/IndexRoutingTable.java +++ b/server/src/main/java/org/opensearch/cluster/routing/IndexRoutingTable.java @@ -145,7 +145,10 @@ boolean validate(Metadata metadata) { "Shard [" + indexShardRoutingTable.shardId().id() + "] routing table has wrong number of replicas, expected [" + + "Replicas: " + indexMetadata.getNumberOfReplicas() + + "Search Replicas: " + + indexMetadata.getNumberOfSearchOnlyReplicas() + "], got [" + routingNumberOfReplicas + "]" @@ -513,15 +516,31 @@ public Builder initializeAsRemoteStoreRestore( ShardRouting.newUnassigned(shardId, false, PeerRecoverySource.INSTANCE, unassignedInfo) ); } + // if writers are red we do not want to re-recover search only shards if already assigned. + for (ShardRouting shardRouting : indexShardRoutingTable.searchOnlyReplicas()) { + if (shardRouting.unassigned()) { + indexShardRoutingBuilder.addShard( + ShardRouting.newUnassigned(shardId, false, true, EmptyStoreRecoverySource.INSTANCE, unassignedInfo) + ); + } else { + indexShardRoutingBuilder.addShard(shardRouting); + } + } } else { // Primary is either active or initializing. Do not trigger restore. indexShardRoutingBuilder.addShard(indexShardRoutingTable.primaryShard()); // Replica, if unassigned, trigger peer recovery else no action. for (ShardRouting shardRouting : indexShardRoutingTable.replicaShards()) { if (shardRouting.unassigned()) { - indexShardRoutingBuilder.addShard( - ShardRouting.newUnassigned(shardId, false, PeerRecoverySource.INSTANCE, unassignedInfo) - ); + if (shardRouting.isSearchOnly()) { + indexShardRoutingBuilder.addShard( + ShardRouting.newUnassigned(shardId, false, true, EmptyStoreRecoverySource.INSTANCE, unassignedInfo) + ); + } else { + indexShardRoutingBuilder.addShard( + ShardRouting.newUnassigned(shardId, false, PeerRecoverySource.INSTANCE, unassignedInfo) + ); + } } else { indexShardRoutingBuilder.addShard(shardRouting); } @@ -574,13 +593,7 @@ private Builder initializeAsRestore( } for (int i = 0; i < indexMetadata.getNumberOfSearchOnlyReplicas(); i++) { indexShardRoutingBuilder.addShard( - ShardRouting.newUnassigned( - shardId, - false, - true, - PeerRecoverySource.INSTANCE, // TODO: Update to remote store if enabled - unassignedInfo - ) + ShardRouting.newUnassigned(shardId, false, true, EmptyStoreRecoverySource.INSTANCE, unassignedInfo) ); } shards.put(shardNumber, indexShardRoutingBuilder.build()); @@ -623,13 +636,7 @@ private Builder initializeEmpty(IndexMetadata indexMetadata, UnassignedInfo unas } for (int i = 0; i < indexMetadata.getNumberOfSearchOnlyReplicas(); i++) { indexShardRoutingBuilder.addShard( - ShardRouting.newUnassigned( - shardId, - false, - true, - PeerRecoverySource.INSTANCE, // TODO: Update to remote store if enabled - unassignedInfo - ) + ShardRouting.newUnassigned(shardId, false, true, EmptyStoreRecoverySource.INSTANCE, unassignedInfo) ); } shards.put(shardNumber, indexShardRoutingBuilder.build()); @@ -664,7 +671,7 @@ public Builder addSearchReplica() { shardId, false, true, - PeerRecoverySource.INSTANCE, // TODO: Change to remote store if enabled + EmptyStoreRecoverySource.INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.REPLICA_ADDED, null) ); shards.put(shardNumber, new IndexShardRoutingTable.Builder(shards.get(shard.id())).addShard(shard).build()); diff --git a/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java b/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java index e4ed65683c5eb..cd53462843dbc 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java +++ b/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java @@ -135,6 +135,23 @@ public class IndexShardRoutingTable extends AbstractDiffable assignedShards = newShardRoutingTable.assignedShards() .stream() .filter(s -> s.isRelocationTarget() == false) + .filter(s -> s.isSearchOnly() == false) // do not consider search only shards for in sync validation .collect(Collectors.toList()); assert assignedShards.size() <= maxActiveShards : "cannot have more assigned shards " + assignedShards diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java index 4bde1e282fe78..32639bc3065da 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java @@ -191,7 +191,8 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing } } else { // Peer recovery - assert initializingShard(shardRouting, node.nodeId()).recoverySource().getType() == RecoverySource.Type.PEER; + assert initializingShard(shardRouting, node.nodeId()).recoverySource().getType() == RecoverySource.Type.PEER + || shardRouting.isSearchOnly(); if (shardRouting.unassignedReasonIndexCreated()) { return allocateInitialShardCopies(shardRouting, node, allocation); @@ -204,7 +205,6 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing private Decision allocateInitialShardCopies(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { int currentInRecoveries = allocation.routingNodes().getInitialIncomingRecoveries(node.nodeId()); assert shardRouting.unassignedReasonIndexCreated() && !shardRouting.primary(); - return allocateShardCopies( shardRouting, allocation, @@ -212,7 +212,8 @@ private Decision allocateInitialShardCopies(ShardRouting shardRouting, RoutingNo replicasInitialRecoveries, this::getInitialPrimaryNodeOutgoingRecoveries, replicasInitialRecoveries, - true + true, + node ); } @@ -228,7 +229,8 @@ private Decision allocateNonInitialShardCopies(ShardRouting shardRouting, Routin concurrentIncomingRecoveries, this::getPrimaryNodeOutgoingRecoveries, concurrentOutgoingRecoveries, - false + false, + node ); } @@ -249,7 +251,8 @@ private Decision allocateShardCopies( int inRecoveriesLimit, BiFunction primaryNodeOutRecoveriesFunc, int outRecoveriesLimit, - boolean isInitialShardCopies + boolean isInitialShardCopies, + RoutingNode candidateNode ) { // Allocating a shard to this node will increase the incoming recoveries if (currentInRecoveries >= inRecoveriesLimit) { @@ -274,6 +277,16 @@ private Decision allocateShardCopies( ); } } else { + // if this is a search shard that recovers from remote store, ignore outgoing recovery limits. + if (shardRouting.isSearchOnly() && candidateNode.node().isRemoteStoreNode()) { + return allocation.decision( + YES, + NAME, + "Remote based search replica below incoming recovery limit: [%d < %d]", + currentInRecoveries, + inRecoveriesLimit + ); + } // search for corresponding recovery source (= primary shard) and check number of outgoing recoveries on that node ShardRouting primaryShard = allocation.routingNodes().activePrimary(shardRouting.shardId()); if (primaryShard == null) { @@ -319,6 +332,10 @@ private Decision allocateShardCopies( } } + private static boolean isRemoteStoreNode(ShardRouting shardRouting, RoutingAllocation allocation) { + return allocation.nodes().getNodes().get(shardRouting.currentNodeId()).isRemoteStoreNode(); + } + /** * The shard routing passed to {@link #canAllocate(ShardRouting, RoutingNode, RoutingAllocation)} is not the initializing shard to this * node but: @@ -357,9 +374,18 @@ private ShardRouting initializingShard(ShardRouting shardRouting, String current @Override public Decision canMoveAway(ShardRouting shardRouting, RoutingAllocation allocation) { int outgoingRecoveries = 0; - if (!shardRouting.primary() && !shardRouting.isSearchOnly()) { + if (!shardRouting.primary()) { ShardRouting primaryShard = allocation.routingNodes().activePrimary(shardRouting.shardId()); - outgoingRecoveries = allocation.routingNodes().getOutgoingRecoveries(primaryShard.currentNodeId()); + if (primaryShard != null) { + outgoingRecoveries = allocation.routingNodes().getOutgoingRecoveries(primaryShard.currentNodeId()); + } else { + assert shardRouting.isSearchOnly(); + // check if the moving away search replica is using remote store, if not + // throw an error as the primary it will use for recovery is not active. + if (isRemoteStoreNode(shardRouting, allocation) == false) { + return allocation.decision(Decision.NO, NAME, "primary shard for this replica is not yet active"); + } + } } else { outgoingRecoveries = allocation.routingNodes().getOutgoingRecoveries(shardRouting.currentNodeId()); } diff --git a/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java index 9bffaedcbf482..80275f27bbcb9 100644 --- a/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java @@ -633,6 +633,7 @@ public synchronized void renewPeerRecoveryRetentionLeases() { */ final boolean renewalNeeded = StreamSupport.stream(routingTable.spliterator(), false) .filter(ShardRouting::assignedToNode) + .filter(r -> r.isSearchOnly() == false) .anyMatch(shardRouting -> { final RetentionLease retentionLease = retentionLeases.get(getPeerRecoveryRetentionLeaseId(shardRouting)); if (retentionLease == null) { diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index c0d9f1bca1223..670fe8563d51c 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -2547,22 +2547,24 @@ public void openEngineAndRecoverFromTranslog(boolean syncFromRemote) throws IOEx */ public void openEngineAndSkipTranslogRecovery() throws IOException { assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]"; - recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG); - loadGlobalCheckpointToReplicationTracker(); - innerOpenEngineAndTranslog(replicationTracker); - getEngine().skipTranslogRecovery(); + openEngineAndSkipTranslogRecovery(true); } public void openEngineAndSkipTranslogRecoveryFromSnapshot() throws IOException { - assert routingEntry().recoverySource().getType() == RecoverySource.Type.SNAPSHOT : "not a snapshot recovery [" - + routingEntry() - + "]"; + assert routingEntry().isSearchOnly() || routingEntry().recoverySource().getType() == RecoverySource.Type.SNAPSHOT + : "not a snapshot recovery [" + routingEntry() + "]"; recoveryState.validateCurrentStage(RecoveryState.Stage.INDEX); maybeCheckIndex(); recoveryState.setStage(RecoveryState.Stage.TRANSLOG); + openEngineAndSkipTranslogRecovery(routingEntry().isSearchOnly()); + } + + void openEngineAndSkipTranslogRecovery(boolean syncFromRemote) throws IOException { recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG); loadGlobalCheckpointToReplicationTracker(); - innerOpenEngineAndTranslog(replicationTracker, false); + innerOpenEngineAndTranslog(replicationTracker, syncFromRemote); + assert routingEntry().isSearchOnly() == false || translogStats().estimatedNumberOfOperations() == 0 + : "Translog is expected to be empty but holds " + translogStats().estimatedNumberOfOperations() + "Operations."; getEngine().skipTranslogRecovery(); } @@ -2912,7 +2914,8 @@ public void recoverFromLocalShards( public void recoverFromStore(ActionListener listener) { // we are the first primary, recover from the gateway // if its post api allocation, the index should exists - assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard"; + assert shardRouting.primary() || shardRouting.isSearchOnly() + : "recover from store only makes sense if the shard is a primary shard or an untracked search only replica"; assert shardRouting.initializing() : "can only start recovery on initializing shard"; StoreRecovery storeRecovery = new StoreRecovery(shardId, logger); storeRecovery.recoverFromStore(this, listener); diff --git a/server/src/main/java/org/opensearch/index/shard/ReplicationGroup.java b/server/src/main/java/org/opensearch/index/shard/ReplicationGroup.java index ccfaf50da1c6b..b2db48737ee3f 100644 --- a/server/src/main/java/org/opensearch/index/shard/ReplicationGroup.java +++ b/server/src/main/java/org/opensearch/index/shard/ReplicationGroup.java @@ -67,15 +67,17 @@ public ReplicationGroup( this.inSyncAllocationIds = inSyncAllocationIds; this.trackedAllocationIds = trackedAllocationIds; this.version = version; - this.unavailableInSyncShards = Sets.difference(inSyncAllocationIds, routingTable.getAllAllocationIds()); this.replicationTargets = new ArrayList<>(); this.skippedShards = new ArrayList<>(); for (final ShardRouting shard : routingTable) { - // search only replicas never receive any replicated operations if (shard.unassigned() || shard.isSearchOnly()) { assert shard.primary() == false : "primary shard should not be unassigned in a replication group: " + shard; skippedShards.add(shard); + if (shard.isSearchOnly()) { + assert shard.allocationId() == null || inSyncAllocationIds.contains(shard.allocationId().getId()) == false + : " Search replicas should not be part of the inSync id set"; + } } else { if (trackedAllocationIds.contains(shard.allocationId().getId())) { replicationTargets.add(shard); diff --git a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java index 494fe0dbef803..e9da9d2159b17 100644 --- a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java @@ -545,7 +545,7 @@ private boolean canRecover(IndexShard indexShard) { // got closed on us, just ignore this recovery return false; } - if (indexShard.routingEntry().primary() == false) { + if (indexShard.routingEntry().primary() == false && indexShard.routingEntry().isSearchOnly() == false) { throw new IndexShardRecoveryException(shardId, "Trying to recover when the shard is in backup state", null); } return true; @@ -748,7 +748,17 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe writeEmptyRetentionLeasesFile(indexShard); indexShard.recoveryState().getIndex().setFileDetailsComplete(); } - indexShard.openEngineAndRecoverFromTranslog(); + if (indexShard.routingEntry().isSearchOnly() == false) { + indexShard.openEngineAndRecoverFromTranslog(); + } else { + // Opens the engine for pull based replica copies that are + // not primary eligible. This will skip any checkpoint tracking and ensure + // that the shards are sync'd with remote store before opening. + // + // first bootstrap new history / translog so that the TranslogUUID matches the UUID from the latest commit. + bootstrapForSnapshot(indexShard, store); + indexShard.openEngineAndSkipTranslogRecoveryFromSnapshot(); + } if (indexShard.shouldSeedRemoteStore()) { indexShard.getThreadPool().executor(ThreadPool.Names.GENERIC).execute(() -> { logger.info("Attempting to seed Remote Store via local recovery for {}", indexShard.shardId()); @@ -879,6 +889,7 @@ private void bootstrap(final IndexShard indexShard, final Store store) throws IO store.bootstrapNewHistory(); final SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo(); final long localCheckpoint = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); + final String translogUUID = Translog.createEmptyTranslog( indexShard.shardPath().resolveTranslog(), localCheckpoint, diff --git a/server/src/test/java/org/opensearch/cluster/metadata/SearchOnlyReplicaTests.java b/server/src/test/java/org/opensearch/cluster/metadata/SearchOnlyReplicaTests.java index 3d11193a07884..81055e01d915b 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/SearchOnlyReplicaTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/SearchOnlyReplicaTests.java @@ -19,32 +19,46 @@ import org.opensearch.cluster.node.DiscoveryNodeRole; import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.common.ValidationException; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.FeatureFlags; +import org.opensearch.env.Environment; +import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.indices.ShardLimitValidator; import org.opensearch.indices.cluster.ClusterStateChanges; import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.repositories.fs.FsRepository; import org.opensearch.test.OpenSearchSingleNodeTestCase; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; import org.junit.After; import org.junit.Before; +import java.nio.file.Path; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_REPLICATION_TYPE_SETTING; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; +import static org.opensearch.indices.IndicesService.CLUSTER_REPLICATION_TYPE_SETTING; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY; public class SearchOnlyReplicaTests extends OpenSearchSingleNodeTestCase { + public static final String TEST_RS_REPO = "test-rs-repo"; + public static final String INDEX_NAME = "test-index"; private ThreadPool threadPool; @Before @@ -70,7 +84,7 @@ protected Settings featureFlagSettings() { public void testCreateWithDefaultSearchReplicasSetting() { final ClusterStateChanges cluster = new ClusterStateChanges(xContentRegistry(), threadPool); ClusterState state = createIndexWithSettings(cluster, Settings.builder().build()); - IndexShardRoutingTable indexShardRoutingTable = state.getRoutingTable().index("index").getShards().get(0); + IndexShardRoutingTable indexShardRoutingTable = state.getRoutingTable().index(INDEX_NAME).getShards().get(0); assertEquals(1, indexShardRoutingTable.replicaShards().size()); assertEquals(0, indexShardRoutingTable.searchOnlyReplicas().size()); assertEquals(1, indexShardRoutingTable.writerReplicas().size()); @@ -91,53 +105,50 @@ public void testSearchReplicasValidationWithDocumentReplication() { ) ); assertEquals( - "To set index.number_of_search_only_replicas, index.replication.type must be set to SEGMENT", + "To set index.number_of_search_only_replicas, index.remote_store.enabled must be set to true", exception.getCause().getMessage() ); } - public void testUpdateSearchReplicaCount() { - final ClusterStateChanges cluster = new ClusterStateChanges(xContentRegistry(), threadPool); + public void testUpdateSearchReplicaCount() throws ExecutionException, InterruptedException { + Settings settings = Settings.builder() + .put(SETTING_NUMBER_OF_SHARDS, 1) + .put(SETTING_NUMBER_OF_REPLICAS, 0) + .put(INDEX_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.SEGMENT) + .put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1) + .build(); + createIndex(INDEX_NAME, settings); - ClusterState state = createIndexWithSettings( - cluster, - Settings.builder() - .put(SETTING_NUMBER_OF_SHARDS, 1) - .put(SETTING_NUMBER_OF_REPLICAS, 0) - .put(INDEX_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.SEGMENT) - .put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1) - .build() - ); - assertTrue(state.metadata().hasIndex("index")); - rerouteUntilActive(state, cluster); - IndexShardRoutingTable indexShardRoutingTable = state.getRoutingTable().index("index").getShards().get(0); + IndexShardRoutingTable indexShardRoutingTable = getIndexShardRoutingTable(); assertEquals(1, indexShardRoutingTable.replicaShards().size()); assertEquals(1, indexShardRoutingTable.searchOnlyReplicas().size()); assertEquals(0, indexShardRoutingTable.writerReplicas().size()); // add another replica - state = cluster.updateSettings( - state, - new UpdateSettingsRequest("index").settings(Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 2).build()) + UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(INDEX_NAME).settings( + Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 2).build() ); - rerouteUntilActive(state, cluster); - indexShardRoutingTable = state.getRoutingTable().index("index").getShards().get(0); + client().admin().indices().updateSettings(updateSettingsRequest).get(); + indexShardRoutingTable = getIndexShardRoutingTable(); assertEquals(2, indexShardRoutingTable.replicaShards().size()); assertEquals(2, indexShardRoutingTable.searchOnlyReplicas().size()); assertEquals(0, indexShardRoutingTable.writerReplicas().size()); // remove all replicas - state = cluster.updateSettings( - state, - new UpdateSettingsRequest("index").settings(Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 0).build()) + updateSettingsRequest = new UpdateSettingsRequest(INDEX_NAME).settings( + Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 0).build() ); - rerouteUntilActive(state, cluster); - indexShardRoutingTable = state.getRoutingTable().index("index").getShards().get(0); + client().admin().indices().updateSettings(updateSettingsRequest).get(); + indexShardRoutingTable = getIndexShardRoutingTable(); assertEquals(0, indexShardRoutingTable.replicaShards().size()); assertEquals(0, indexShardRoutingTable.searchOnlyReplicas().size()); assertEquals(0, indexShardRoutingTable.writerReplicas().size()); } + private IndexShardRoutingTable getIndexShardRoutingTable() { + return client().admin().cluster().prepareState().get().getState().getRoutingTable().index(INDEX_NAME).getShards().get(0); + } + private ClusterState createIndexWithSettings(ClusterStateChanges cluster, Settings settings) { List allNodes = new ArrayList<>(); // node for primary/local @@ -149,48 +160,32 @@ private ClusterState createIndexWithSettings(ClusterStateChanges cluster, Settin } ClusterState state = ClusterStateCreationUtils.state(localNode, localNode, allNodes.toArray(new DiscoveryNode[0])); - CreateIndexRequest request = new CreateIndexRequest("index", settings).waitForActiveShards(ActiveShardCount.NONE); + CreateIndexRequest request = new CreateIndexRequest(INDEX_NAME, settings).waitForActiveShards(ActiveShardCount.NONE); state = cluster.createIndex(state, request); return state; } public void testUpdateSearchReplicasOverShardLimit() { - final ClusterStateChanges cluster = new ClusterStateChanges(xContentRegistry(), threadPool); - - List allNodes = new ArrayList<>(); - // node for primary/local - DiscoveryNode localNode = createNode(Version.CURRENT, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE, DiscoveryNodeRole.DATA_ROLE); - allNodes.add(localNode); - - allNodes.add(createNode(Version.CURRENT, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE, DiscoveryNodeRole.DATA_ROLE)); - - ClusterState state = ClusterStateCreationUtils.state(localNode, localNode, allNodes.toArray(new DiscoveryNode[0])); + Settings settings = Settings.builder() + .put(SETTING_NUMBER_OF_SHARDS, 1) + .put(SETTING_NUMBER_OF_REPLICAS, 0) + .put(INDEX_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.SEGMENT) + .put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 0) + .build(); + createIndex(INDEX_NAME, settings); + Integer maxShardPerNode = ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getDefault(Settings.EMPTY); - CreateIndexRequest request = new CreateIndexRequest( - "index", - Settings.builder() - .put(SETTING_NUMBER_OF_SHARDS, 1) - .put(SETTING_NUMBER_OF_REPLICAS, 0) - .put(INDEX_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.SEGMENT) - .put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1) - .build() - ).waitForActiveShards(ActiveShardCount.NONE); - state = cluster.createIndex(state, request); - assertTrue(state.metadata().hasIndex("index")); - rerouteUntilActive(state, cluster); + UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(INDEX_NAME).settings( + Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, maxShardPerNode * 2).build() + ); // add another replica - ClusterState finalState = state; - Integer maxShardPerNode = ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getDefault(Settings.EMPTY); - expectThrows( - RuntimeException.class, - () -> cluster.updateSettings( - finalState, - new UpdateSettingsRequest("index").settings( - Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, maxShardPerNode * 2).build() - ) - ) + ExecutionException executionException = expectThrows( + ExecutionException.class, + () -> client().admin().indices().updateSettings(updateSettingsRequest).get() ); + Throwable cause = executionException.getCause(); + assertEquals(ValidationException.class, cause.getClass()); } public void testUpdateSearchReplicasOnDocrepCluster() { @@ -206,7 +201,7 @@ public void testUpdateSearchReplicasOnDocrepCluster() { ClusterState state = ClusterStateCreationUtils.state(localNode, localNode, allNodes.toArray(new DiscoveryNode[0])); CreateIndexRequest request = new CreateIndexRequest( - "index", + INDEX_NAME, Settings.builder() .put(SETTING_NUMBER_OF_SHARDS, 1) .put(SETTING_NUMBER_OF_REPLICAS, 0) @@ -214,7 +209,7 @@ public void testUpdateSearchReplicasOnDocrepCluster() { .build() ).waitForActiveShards(ActiveShardCount.NONE); state = cluster.createIndex(state, request); - assertTrue(state.metadata().hasIndex("index")); + assertTrue(state.metadata().hasIndex(INDEX_NAME)); rerouteUntilActive(state, cluster); // add another replica @@ -224,7 +219,7 @@ public void testUpdateSearchReplicasOnDocrepCluster() { RuntimeException.class, () -> cluster.updateSettings( finalState, - new UpdateSettingsRequest("index").settings( + new UpdateSettingsRequest(INDEX_NAME).settings( Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, maxShardPerNode * 2).build() ) ) @@ -232,11 +227,51 @@ public void testUpdateSearchReplicasOnDocrepCluster() { } + Path tempDir = createTempDir(); + Path repo = tempDir.resolve("repo"); + + @Override + protected Settings nodeSettings() { + return Settings.builder() + .put(super.nodeSettings()) + .put(CLUSTER_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.SEGMENT) + .put(buildRemoteStoreNodeAttributes(TEST_RS_REPO, repo)) + .put(Environment.PATH_HOME_SETTING.getKey(), tempDir) + .put(Environment.PATH_REPO_SETTING.getKey(), repo) + .build(); + } + + private Settings buildRemoteStoreNodeAttributes(String repoName, Path repoPath) { + String repoTypeAttributeKey = String.format( + Locale.getDefault(), + "node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, + repoName + ); + String repoSettingsAttributeKeyPrefix = String.format( + Locale.getDefault(), + "node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX, + repoName + ); + + return Settings.builder() + .put("node.attr." + REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY, repoName) + .put(repoTypeAttributeKey, FsRepository.TYPE) + .put(repoSettingsAttributeKeyPrefix + "location", repoPath) + .put("node.attr." + REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY, repoName) + .put(repoTypeAttributeKey, FsRepository.TYPE) + .put(repoSettingsAttributeKeyPrefix + "location", repoPath) + .put("node.attr." + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, repoName) + .put(repoTypeAttributeKey, FsRepository.TYPE) + .put(repoSettingsAttributeKeyPrefix + "location", repoPath) + .put(RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), false) + .build(); + } + private static void rerouteUntilActive(ClusterState state, ClusterStateChanges cluster) { - while (state.routingTable().index("index").shard(0).allShardsStarted() == false) { + while (state.routingTable().index(INDEX_NAME).shard(0).allShardsStarted() == false) { state = cluster.applyStartedShards( state, - state.routingTable().index("index").shard(0).shardsWithState(ShardRoutingState.INITIALIZING) + state.routingTable().index(INDEX_NAME).shard(0).shardsWithState(ShardRoutingState.INITIALIZING) ); state = cluster.reroute(state, new ClusterRerouteRequest()); } diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/SearchReplicaAllocationDeciderTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/SearchReplicaAllocationDeciderTests.java index 8d4f4cdee26cc..9604e82fe4c88 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/SearchReplicaAllocationDeciderTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/SearchReplicaAllocationDeciderTests.java @@ -8,27 +8,44 @@ package org.opensearch.cluster.routing.allocation.decider; +import org.opensearch.Version; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.EmptyClusterInfoService; import org.opensearch.cluster.OpenSearchAllocationTestCase; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.RecoverySource; import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingHelper; import org.opensearch.cluster.routing.UnassignedInfo; import org.opensearch.cluster.routing.allocation.AllocationService; import org.opensearch.cluster.routing.allocation.RoutingAllocation; import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; +import org.opensearch.cluster.routing.allocation.command.AllocationCommands; +import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; +import org.opensearch.core.index.Index; +import org.opensearch.core.index.shard.ShardId; import org.opensearch.snapshots.EmptySnapshotsInfoService; import org.opensearch.test.gateway.TestGatewayAllocator; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; +import java.util.Map; import java.util.Set; +import static org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING; +import static org.opensearch.cluster.routing.ShardRoutingState.STARTED; import static org.opensearch.cluster.routing.allocation.decider.SearchReplicaAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY; public class SearchReplicaAllocationDeciderTests extends OpenSearchAllocationTestCase { @@ -130,4 +147,171 @@ public void testSearchReplicaRoutingDedicatedIncludes() { decision = (Decision.Single) filterAllocationDecider.canRemain(primary, state.getRoutingNodes().node("node1"), allocation); assertEquals(decision.toString(), Decision.Type.YES, decision.type()); } + + public void testSearchReplicaWithThrottlingDecider_PrimaryBasedReplication() { + TestGatewayAllocator gatewayAllocator = new TestGatewayAllocator(); + // throttle outgoing on primary + AllocationService strategy = createAllocationService(Settings.EMPTY, gatewayAllocator); + + Set> settings = new HashSet<>(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + settings.add(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING); + Metadata metadata = Metadata.builder() + .put( + IndexMetadata.builder("test") + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(0) + .numberOfSearchReplicas(1) + ) + .build(); + + ClusterState clusterState = initializeClusterStateWithSingleIndexAndShard(newNode("node1"), metadata, gatewayAllocator); + clusterState = strategy.reroute(clusterState, "reroute"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).add(newNode("node2"))).build(); + clusterState = strategy.reroute(clusterState, "reroute"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + assertEquals(2, clusterState.routingTable().shardsWithState(STARTED).size()); + assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node1"), 0); + // start a third node, we will try and move the SR to this node + clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).add(newNode("node3"))).build(); + clusterState = strategy.reroute(clusterState, "reroute"); + // remove the primary and reroute - this would throw an NPE for search replicas but *not* regular. + // regular replicas would get promoted to primary before the CanMoveAway call. + clusterState = strategy.disassociateDeadNodes( + ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove("node1")).build(), + true, + "test" + ); + + // attempt to move the replica + AllocationService.CommandsResult commandsResult = strategy.reroute( + clusterState, + new AllocationCommands(new MoveAllocationCommand("test", 0, "node2", "node3")), + true, + false + ); + + assertEquals(commandsResult.explanations().explanations().size(), 1); + assertEquals(commandsResult.explanations().explanations().get(0).decisions().type(), Decision.Type.NO); + boolean isCorrectNoDecision = false; + for (Decision decision : commandsResult.explanations().explanations().get(0).decisions().getDecisions()) { + if (decision.label().equals(ThrottlingAllocationDecider.NAME)) { + assertEquals("primary shard for this replica is not yet active", decision.getExplanation()); + assertEquals(Decision.Type.NO, decision.type()); + isCorrectNoDecision = true; + } + } + assertTrue(isCorrectNoDecision); + } + + public void testSearchReplicaWithThrottlingDeciderWithoutPrimary_RemoteStoreEnabled() { + TestGatewayAllocator gatewayAllocator = new TestGatewayAllocator(); + AllocationService strategy = createAllocationService(Settings.EMPTY, gatewayAllocator); + Set> settings = new HashSet<>(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + settings.add(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING); + Metadata metadata = Metadata.builder() + .put( + IndexMetadata.builder("test") + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(0) + .numberOfSearchReplicas(1) + ) + .build(); + + ClusterState clusterState = initializeClusterStateWithSingleIndexAndShard(newRemoteNode("node1"), metadata, gatewayAllocator); + + clusterState = strategy.reroute(clusterState, "reroute"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + DiscoveryNode node2 = newRemoteNode("node2"); + clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).add(node2)).build(); + clusterState = strategy.reroute(clusterState, "reroute"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + assertEquals(2, clusterState.routingTable().shardsWithState(STARTED).size()); + assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node1"), 0); + // start a third node, we will try and move the SR to this node + DiscoveryNode node3 = newRemoteNode("node3"); + clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).add(node3)).build(); + clusterState = strategy.reroute(clusterState, "reroute"); + // remove the primary and reroute - this would throw an NPE for search replicas but *not* regular. + // regular replicas would get promoted to primary before the CanMoveAway call. + clusterState = strategy.disassociateDeadNodes( + ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove("node1")).build(), + true, + "test" + ); + + // attempt to move the replica + AllocationService.CommandsResult commandsResult = strategy.reroute( + clusterState, + new AllocationCommands(new MoveAllocationCommand("test", 0, "node2", "node3")), + true, + false + ); + + assertEquals(commandsResult.explanations().explanations().size(), 1); + assertEquals(commandsResult.explanations().explanations().get(0).decisions().type(), Decision.Type.NO); + boolean foundYesMessage = false; + for (Decision decision : commandsResult.explanations().explanations().get(0).decisions().getDecisions()) { + if (decision.label().equals(ThrottlingAllocationDecider.NAME)) { + assertEquals("Remote based search replica below incoming recovery limit: [0 < 2]", decision.getExplanation()); + assertEquals(Decision.Type.YES, decision.type()); + foundYesMessage = true; + } + } + assertTrue(foundYesMessage); + } + + private ClusterState initializeClusterStateWithSingleIndexAndShard( + DiscoveryNode primaryNode, + Metadata metadata, + TestGatewayAllocator gatewayAllocator + ) { + Metadata.Builder metadataBuilder = new Metadata.Builder(metadata); + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + IndexMetadata indexMetadata = metadata.index("test"); + IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(indexMetadata); + initializePrimaryAndMarkInSync(indexMetadata.getIndex(), indexMetadataBuilder, gatewayAllocator, primaryNode); + routingTableBuilder.addAsRecovery(indexMetadata); + metadataBuilder.put(indexMetadata, false); + RoutingTable routingTable = routingTableBuilder.build(); + return ClusterState.builder(CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .nodes(DiscoveryNodes.builder().add(primaryNode)) + .metadata(metadataBuilder.build()) + .routingTable(routingTable) + .build(); + } + + private void initializePrimaryAndMarkInSync( + Index index, + IndexMetadata.Builder indexMetadata, + TestGatewayAllocator gatewayAllocator, + DiscoveryNode primaryNode + ) { + final ShardRouting unassigned = ShardRouting.newUnassigned( + new ShardId(index, 0), + true, + RecoverySource.EmptyStoreRecoverySource.INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "test") + ); + ShardRouting started = ShardRoutingHelper.moveToStarted(ShardRoutingHelper.initialize(unassigned, primaryNode.getId())); + indexMetadata.putInSyncAllocationIds(0, Collections.singleton(started.allocationId().getId())); + gatewayAllocator.addKnownAllocation(started); + } + + private static DiscoveryNode newRemoteNode(String name) { + return newNode( + name, + name, + Map.of( + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, + "cluster-repo", + REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY, + "segment-repo", + REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY, + "translog-repo" + ) + ); + } } diff --git a/server/src/test/java/org/opensearch/gateway/ClusterStateUpdatersTests.java b/server/src/test/java/org/opensearch/gateway/ClusterStateUpdatersTests.java index dd2fb51151a5b..d85ed10eeeae7 100644 --- a/server/src/test/java/org/opensearch/gateway/ClusterStateUpdatersTests.java +++ b/server/src/test/java/org/opensearch/gateway/ClusterStateUpdatersTests.java @@ -44,6 +44,7 @@ import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.cluster.routing.RecoverySource; import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.UnassignedInfo; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.UUIDs; @@ -489,4 +490,146 @@ public void testHideStateIfNotRecovered() { assertFalse(hiddenState.blocks().hasIndexBlock(indexMetadata.getIndex().getName(), IndexMetadata.INDEX_READ_ONLY_BLOCK)); } + public void testRemoteRestoreWithSearchOnlyShards() { + final int numOfShards = 10; + final int numAssignedSearchReplicas = 5; + final int numOfSearchReplicas = 1; + + final IndexMetadata remoteMetadata = createIndexMetadata( + "test-remote", + Settings.builder() + .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numOfShards) + .put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, numOfSearchReplicas) + .build() + ); + // create an initial routing table where all search replicas exist and are assigned, they should get included as is in the restored + // routing. + final Index index = remoteMetadata.getIndex(); + + Map routingTable = new HashMap<>(); + for (int shardNumber = 0; shardNumber < remoteMetadata.getNumberOfShards(); shardNumber++) { + ShardId shardId = new ShardId(index, shardNumber); + final String nodeId = "node " + shardNumber; + IndexShardRoutingTable.Builder builder = new IndexShardRoutingTable.Builder( + new ShardId(remoteMetadata.getIndex(), shardId.id()) + ); + // add a search replica for the shard + ShardRouting searchReplicaRouting = ShardRouting.newUnassigned( + shardId, + false, + true, + RecoverySource.EmptyStoreRecoverySource.INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "test") + ); + if (shardNumber < numAssignedSearchReplicas) { + // first five shards add the SR as assigned + builder.addShard(searchReplicaRouting.initialize(nodeId, null, 0L)); + } else { + builder.addShard(searchReplicaRouting); + } + routingTable.put(shardId, builder.build()); + } + IndexRoutingTable.Builder routingTableAfterRestore = new IndexRoutingTable.Builder(remoteMetadata.getIndex()) + .initializeAsRemoteStoreRestore( + remoteMetadata, + new RecoverySource.RemoteStoreRecoverySource( + UUIDs.randomBase64UUID(), + remoteMetadata.getCreationVersion(), + new IndexId(remoteMetadata.getIndex().getName(), remoteMetadata.getIndexUUID()) + ), + routingTable, + true + ); + for (IndexShardRoutingTable indexShardRoutingTable : routingTableAfterRestore.build()) { + assertEquals(numOfSearchReplicas, indexShardRoutingTable.searchOnlyReplicas().size()); + for (ShardRouting shardRouting : indexShardRoutingTable.searchOnlyReplicas()) { + if (shardRouting.shardId().getId() < numAssignedSearchReplicas) { + assertTrue(shardRouting.assignedToNode()); + assertTrue(containsSameRouting(routingTable.get(indexShardRoutingTable.getShardId()), shardRouting)); + } else { + assertTrue(shardRouting.unassigned()); + assertFalse(containsSameRouting(routingTable.get(indexShardRoutingTable.getShardId()), shardRouting)); + } + } + } + } + + private boolean containsSameRouting(IndexShardRoutingTable oldRoutingTable, ShardRouting shardRouting) { + return oldRoutingTable.searchOnlyReplicas().stream().anyMatch(r -> r.isSameAllocation(shardRouting)); + } + + public void testRemoteRestoreWithActivePrimaryAndSearchOnlyShards() { + final int numOfShards = 10; + final int numAssignedSearchReplicas = 5; + final int numOfSearchReplicas = 1; + + final IndexMetadata remoteMetadata = createIndexMetadata( + "test-remote", + Settings.builder() + .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numOfShards) + .put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, numOfSearchReplicas) + .build() + ); + // create an initial routing table where all search replicas exist and are assigned, they should get included as is in the restored + // routing. + final Index index = remoteMetadata.getIndex(); + + Map routingTable = new HashMap<>(); + for (int shardNumber = 0; shardNumber < remoteMetadata.getNumberOfShards(); shardNumber++) { + ShardId shardId = new ShardId(index, shardNumber); + final String nodeId = "node " + shardNumber; + IndexShardRoutingTable.Builder builder = new IndexShardRoutingTable.Builder( + new ShardId(remoteMetadata.getIndex(), shardId.id()) + ); + // add the primary as assigned + ShardRouting primary = ShardRouting.newUnassigned( + shardId, + true, + RecoverySource.EmptyStoreRecoverySource.INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "test") + ); + builder.addShard(primary.initialize(nodeId + " Primary", null, 0L)); + + // add a search replica for the shard + ShardRouting searchReplicaRouting = ShardRouting.newUnassigned( + shardId, + false, + true, + RecoverySource.EmptyStoreRecoverySource.INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "test") + ); + if (shardNumber < numAssignedSearchReplicas) { + // first five shards add the SR as assigned + builder.addShard(searchReplicaRouting.initialize(nodeId, null, 0L)); + } else { + builder.addShard(searchReplicaRouting); + } + routingTable.put(shardId, builder.build()); + } + IndexRoutingTable.Builder routingTableAfterRestore = new IndexRoutingTable.Builder(remoteMetadata.getIndex()) + .initializeAsRemoteStoreRestore( + remoteMetadata, + new RecoverySource.RemoteStoreRecoverySource( + UUIDs.randomBase64UUID(), + remoteMetadata.getCreationVersion(), + new IndexId(remoteMetadata.getIndex().getName(), remoteMetadata.getIndexUUID()) + ), + routingTable, + false + ); + for (IndexShardRoutingTable indexShardRoutingTable : routingTableAfterRestore.build()) { + assertEquals(numOfSearchReplicas, indexShardRoutingTable.searchOnlyReplicas().size()); + for (ShardRouting shardRouting : indexShardRoutingTable.searchOnlyReplicas()) { + if (shardRouting.shardId().getId() < numAssignedSearchReplicas) { + assertTrue(shardRouting.assignedToNode()); + assertTrue(containsSameRouting(routingTable.get(indexShardRoutingTable.getShardId()), shardRouting)); + } else { + assertTrue(shardRouting.unassigned()); + assertFalse(containsSameRouting(routingTable.get(indexShardRoutingTable.getShardId()), shardRouting)); + } + } + } + } } diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index 19569e1a19284..0268fafdfd246 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -2949,6 +2949,52 @@ public void testRestoreShardFromRemoteStore(boolean performFlush) throws IOExcep closeShards(target); } + public void testRestoreSearchOnlyShardFromStore() throws IOException { + // this test indexes docs on a primary, refreshes, then recovers a new Search Replica and asserts + // all docs are present + String remoteStorePath = createTempDir().toString(); + Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) + .put(IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY, remoteStorePath + "__test") + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, remoteStorePath + "__test") + .build(); + IndexShard primary = newStartedShard(true, settings, new InternalEngineFactory()); + indexDoc(primary, "_doc", "1"); + indexDoc(primary, "_doc", "2"); + primary.refresh("test"); + assertDocs(primary, "1", "2"); + + ShardRouting searchReplicaShardRouting = TestShardRouting.newShardRouting( + primary.shardId, + randomAlphaOfLength(10), + false, + true, + ShardRoutingState.INITIALIZING, + RecoverySource.EmptyStoreRecoverySource.INSTANCE + ); + IndexShard replica = newShard(searchReplicaShardRouting, settings, new NRTReplicationEngineFactory()); + recoverShardFromStore(replica); + searchReplicaShardRouting = replica.routingEntry(); + assertDocs(replica, "1", "2"); + assertEquals( + primary.getLatestReplicationCheckpoint().getSegmentInfosVersion(), + replica.getLatestReplicationCheckpoint().getSegmentInfosVersion() + ); + + // move to unassigned while the replica is active, then reinit from existing store. + searchReplicaShardRouting = ShardRoutingHelper.moveToUnassigned( + searchReplicaShardRouting, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "because I say so") + ); + searchReplicaShardRouting = ShardRoutingHelper.initialize(searchReplicaShardRouting, replica.routingEntry().currentNodeId()); + assertEquals(RecoverySource.ExistingStoreRecoverySource.INSTANCE, searchReplicaShardRouting.recoverySource()); + replica = reinitShard(replica, searchReplicaShardRouting); + recoverShardFromStore(replica); + assertDocs(replica, "1", "2"); + closeShards(primary, replica); + } + public void testReaderWrapperIsUsed() throws IOException { IndexShard shard = newStartedShard(true); indexDoc(shard, "_doc", "0", "{\"foo\" : \"bar\"}"); diff --git a/test/framework/src/main/java/org/opensearch/cluster/routing/TestShardRouting.java b/test/framework/src/main/java/org/opensearch/cluster/routing/TestShardRouting.java index 9a000a4eeda72..a6af658be2ca1 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/routing/TestShardRouting.java +++ b/test/framework/src/main/java/org/opensearch/cluster/routing/TestShardRouting.java @@ -342,4 +342,26 @@ public static ShardRouting newShardRouting( -1 ); } + + public static ShardRouting newShardRouting( + ShardId shardId, + String currentNodeId, + boolean primary, + boolean searchOnly, + ShardRoutingState state, + RecoverySource recoverySource + ) { + return new ShardRouting( + shardId, + currentNodeId, + null, + primary, + searchOnly, + state, + recoverySource, + buildUnassignedInfo(state), + buildAllocationId(state), + -1 + ); + } }