From 3e230b703b4fb224dfbde5b2a5d539839a2175a6 Mon Sep 17 00:00:00 2001 From: Lakshya Taragi Date: Mon, 15 Apr 2024 09:51:48 +0530 Subject: [PATCH 1/2] Reject different version node join in mixed mode Signed-off-by: Lakshya Taragi --- .../coordination/JoinTaskExecutor.java | 25 ++++++-- .../coordination/JoinTaskExecutorTests.java | 64 +++++++++++++++++++ 2 files changed, 83 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java b/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java index 5d896e392e6bc..05202d951cab4 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java @@ -57,6 +57,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -520,12 +521,24 @@ private static void ensureRemoteStoreNodesCompatibility(DiscoveryNode joiningNod ); } } - } else { - if (remoteStoreCompatibilityMode == CompatibilityMode.MIXED) { - if (joiningNode.isRemoteStoreNode()) { - Optional remoteDN = existingNodes.stream().filter(DiscoveryNode::isRemoteStoreNode).findFirst(); - remoteDN.ifPresent(discoveryNode -> ensureRemoteStoreNodesCompatibility(joiningNode, discoveryNode)); - } + } else if (remoteStoreCompatibilityMode == CompatibilityMode.MIXED) { + Version joiningNodeVersion = joiningNode.getVersion(); + if (joiningNodeVersion.after(currentNodes.getMaxNodeVersion()) || joiningNodeVersion.before(currentNodes.getMinNodeVersion())) { + boolean isAfter = joiningNodeVersion.after(currentNodes.getMaxNodeVersion()); + String reason = String.format( + Locale.ROOT, + "mixed mode: a %s version [%s] node [%s] is not allowed to join cluster with %s version [%s]", + isAfter ? "higher" : "lower", + joiningNode.getVersion(), + joiningNode, + isAfter ? "maximum" : "minimum", + isAfter ? currentNodes.getMaxNodeVersion() : currentNodes.getMinNodeVersion() + ); + throw new IllegalStateException(reason); + } + if (joiningNode.isRemoteStoreNode()) { + Optional remoteDN = existingNodes.stream().filter(DiscoveryNode::isRemoteStoreNode).findFirst(); + remoteDN.ifPresent(discoveryNode -> ensureRemoteStoreNodesCompatibility(joiningNode, discoveryNode)); } } } diff --git a/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java b/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java index 5eafe63e63fad..b09a3f92a4143 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java @@ -76,6 +76,7 @@ import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING; import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING; +import static org.opensearch.test.VersionUtils.allOpenSearchVersions; import static org.opensearch.test.VersionUtils.allVersions; import static org.opensearch.test.VersionUtils.maxCompatibleVersion; import static org.opensearch.test.VersionUtils.randomCompatibleVersion; @@ -885,6 +886,69 @@ public void testUpdatesClusterStateWithMultiNodeClusterAndSameRepository() throw validateRepositoryMetadata(result.resultingState, clusterManagerNode, 2); } + public void testNodeJoinInMixedMode() { + Settings nodeSettings = Settings.builder().put(REMOTE_STORE_MIGRATION_EXPERIMENTAL, "true").build(); + FeatureFlags.initializeFeatureFlags(nodeSettings); + + List versions = allOpenSearchVersions(); + assert versions.size() >= 3 : "test requires at least three open search versions"; + Version lowerVersion = versions.get(versions.size() - 3); + Version baseVersion = versions.get(versions.size() - 2); + Version higherVersion = versions.get(versions.size() - 1); + + DiscoveryNode currentNode1 = new DiscoveryNode(UUIDs.base64UUID(), buildNewFakeTransportAddress(), baseVersion); + DiscoveryNode currentNode2 = new DiscoveryNode(UUIDs.base64UUID(), buildNewFakeTransportAddress(), baseVersion); + DiscoveryNodes currentNodes = DiscoveryNodes.builder() + .add(currentNode1) + .localNodeId(currentNode1.getId()) + .add(currentNode2) + .localNodeId(currentNode2.getId()) + .build(); + + Settings mixedModeCompatibilitySettings = Settings.builder() + .put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), RemoteStoreNodeService.CompatibilityMode.MIXED) + .build(); + + Metadata metadata = Metadata.builder().persistentSettings(mixedModeCompatibilitySettings).build(); + + boolean joiningNodeIsHigher = randomBoolean(); + + // joining node of a different version (higher or lower) than the current nodes + DiscoveryNode joiningNode1 = new DiscoveryNode( + randomAlphaOfLength(10), + randomAlphaOfLength(10), + buildNewFakeTransportAddress(), + remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO), + Collections.singleton(DiscoveryNodeRole.CLUSTER_MANAGER_ROLE), + joiningNodeIsHigher ? higherVersion : lowerVersion + ); + final IllegalStateException exception = expectThrows( + IllegalStateException.class, + () -> JoinTaskExecutor.ensureNodesCompatibility(joiningNode1, currentNodes, metadata) + ); + String reason = String.format( + Locale.ROOT, + "mixed mode: a %s version [%s] node [%s] is not allowed to join cluster with %s version [%s]", + joiningNodeIsHigher ? "higher" : "lower", + joiningNode1.getVersion(), + joiningNode1, + joiningNodeIsHigher ? "maximum" : "minimum", + currentNodes.getMaxNodeVersion() + ); + assertEquals(reason, exception.getMessage()); + + // joining node of the same version as the current nodes + DiscoveryNode joiningNode2 = new DiscoveryNode( + randomAlphaOfLength(10), + randomAlphaOfLength(10), + buildNewFakeTransportAddress(), + remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO), + Collections.singleton(DiscoveryNodeRole.CLUSTER_MANAGER_ROLE), + baseVersion + ); + JoinTaskExecutor.ensureNodesCompatibility(joiningNode2, currentNodes, metadata); + } + private void validateRepositoryMetadata(ClusterState updatedState, DiscoveryNode existingNode, int expectedRepositories) throws Exception { From 686d1fe5252ede31b3d050623be7d9e976a7b676 Mon Sep 17 00:00:00 2001 From: Lakshya Taragi Date: Thu, 18 Apr 2024 13:50:51 +0530 Subject: [PATCH 2/2] Address comments Signed-off-by: Lakshya Taragi --- .../coordination/JoinTaskExecutor.java | 41 ++++++++++--------- .../coordination/JoinTaskExecutorTests.java | 15 +++---- 2 files changed, 26 insertions(+), 30 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java b/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java index 05202d951cab4..5475470b81b93 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java @@ -31,6 +31,7 @@ package org.opensearch.cluster.coordination; +import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.Version; import org.opensearch.cluster.ClusterState; @@ -67,6 +68,7 @@ import static org.opensearch.cluster.decommission.DecommissionHelper.nodeCommissioned; import static org.opensearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; import static org.opensearch.node.remotestore.RemoteStoreNodeService.CompatibilityMode; +import static org.opensearch.node.remotestore.RemoteStoreNodeService.CompatibilityMode.MIXED; import static org.opensearch.node.remotestore.RemoteStoreNodeService.CompatibilityMode.STRICT; import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING; @@ -79,7 +81,7 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor remoteDN = existingNodes.stream().filter(DiscoveryNode::isRemoteStoreNode).findFirst(); - remoteDN.ifPresent(discoveryNode -> ensureRemoteStoreNodesCompatibility(joiningNode, discoveryNode)); + } else { + if (MIXED.equals(remoteStoreCompatibilityMode)) { + if (joiningNode.getVersion().after(currentNodes.getMaxNodeVersion())) { + String reason = String.format( + Locale.ROOT, + "remote migration : a node [%s] of higher version [%s] is not allowed to join a cluster with maximum version [%s]", + joiningNode, + joiningNode.getVersion(), + currentNodes.getMaxNodeVersion() + ); + logger.warn(reason); + throw new IllegalStateException(reason); + } + if (joiningNode.isRemoteStoreNode()) { + Optional remoteDN = existingNodes.stream().filter(DiscoveryNode::isRemoteStoreNode).findFirst(); + remoteDN.ifPresent(discoveryNode -> ensureRemoteStoreNodesCompatibility(joiningNode, discoveryNode)); + } } } } diff --git a/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java b/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java index b09a3f92a4143..3e343e95f6c4b 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java @@ -891,8 +891,7 @@ public void testNodeJoinInMixedMode() { FeatureFlags.initializeFeatureFlags(nodeSettings); List versions = allOpenSearchVersions(); - assert versions.size() >= 3 : "test requires at least three open search versions"; - Version lowerVersion = versions.get(versions.size() - 3); + assert versions.size() >= 2 : "test requires at least two open search versions"; Version baseVersion = versions.get(versions.size() - 2); Version higherVersion = versions.get(versions.size() - 1); @@ -911,16 +910,14 @@ public void testNodeJoinInMixedMode() { Metadata metadata = Metadata.builder().persistentSettings(mixedModeCompatibilitySettings).build(); - boolean joiningNodeIsHigher = randomBoolean(); - - // joining node of a different version (higher or lower) than the current nodes + // joining node of a higher version than the current nodes DiscoveryNode joiningNode1 = new DiscoveryNode( randomAlphaOfLength(10), randomAlphaOfLength(10), buildNewFakeTransportAddress(), remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO), Collections.singleton(DiscoveryNodeRole.CLUSTER_MANAGER_ROLE), - joiningNodeIsHigher ? higherVersion : lowerVersion + higherVersion ); final IllegalStateException exception = expectThrows( IllegalStateException.class, @@ -928,11 +925,9 @@ public void testNodeJoinInMixedMode() { ); String reason = String.format( Locale.ROOT, - "mixed mode: a %s version [%s] node [%s] is not allowed to join cluster with %s version [%s]", - joiningNodeIsHigher ? "higher" : "lower", - joiningNode1.getVersion(), + "remote migration : a node [%s] of higher version [%s] is not allowed to join a cluster with maximum version [%s]", joiningNode1, - joiningNodeIsHigher ? "maximum" : "minimum", + joiningNode1.getVersion(), currentNodes.getMaxNodeVersion() ); assertEquals(reason, exception.getMessage());