diff --git a/src/main/kotlin/org/opensearch/replication/ReplicationPlugin.kt b/src/main/kotlin/org/opensearch/replication/ReplicationPlugin.kt index 792ebe27..3790c3c0 100644 --- a/src/main/kotlin/org/opensearch/replication/ReplicationPlugin.kt +++ b/src/main/kotlin/org/opensearch/replication/ReplicationPlugin.kt @@ -387,7 +387,7 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin, override fun getCustomTranslogDeletionPolicyFactory(): Optional { // We don't need a retention lease translog deletion policy for remote store enabled clusters as // we fetch the operations directly from lucene in such cases. - return if (ValidationUtil.isRemoteStoreEnabledCluster(clusterService) == false) { + return if (ValidationUtil.isRemoteEnabledOrMigrating(clusterService) == false) { Optional.of(TranslogDeletionPolicyFactory { indexSettings, retentionLeasesSupplier -> ReplicationTranslogDeletionPolicy(indexSettings, retentionLeasesSupplier) }) diff --git a/src/main/kotlin/org/opensearch/replication/action/changes/TransportGetChangesAction.kt b/src/main/kotlin/org/opensearch/replication/action/changes/TransportGetChangesAction.kt index 0f00774d..4e364906 100644 --- a/src/main/kotlin/org/opensearch/replication/action/changes/TransportGetChangesAction.kt +++ b/src/main/kotlin/org/opensearch/replication/action/changes/TransportGetChangesAction.kt @@ -77,7 +77,7 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus indexMetric.lastFetchTime.set(relativeStartNanos) val indexShard = indicesService.indexServiceSafe(shardId.index).getShard(shardId.id) - val isRemoteEnabledOrMigrating = ValidationUtil.isRemoteStoreEnabledCluster(clusterService) || ValidationUtil.isRemoteMigrating(clusterService) + val isRemoteEnabledOrMigrating = ValidationUtil.isRemoteEnabledOrMigrating(clusterService) if (lastGlobalCheckpoint(indexShard, isRemoteEnabledOrMigrating) < request.fromSeqNo) { // There are no new operations to sync. Do a long poll and wait for GlobalCheckpoint to advance. If // the checkpoint doesn't advance by the timeout this throws an ESTimeoutException which the caller @@ -173,7 +173,7 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus override fun shards(state: ClusterState, request: InternalRequest): ShardsIterator { val shardIt = state.routingTable().shardRoutingTable(request.request().shardId) // Random active shards - return if (ValidationUtil.isRemoteStoreEnabledCluster(clusterService) || ValidationUtil.isRemoteMigrating(clusterService)) shardIt.primaryShardIt() + return if (ValidationUtil.isRemoteEnabledOrMigrating(clusterService)) shardIt.primaryShardIt() else shardIt.activeInitializingShardsRandomIt() } } \ No newline at end of file diff --git a/src/main/kotlin/org/opensearch/replication/util/ValidationUtil.kt b/src/main/kotlin/org/opensearch/replication/util/ValidationUtil.kt index 384a53f5..91d012c2 100644 --- a/src/main/kotlin/org/opensearch/replication/util/ValidationUtil.kt +++ b/src/main/kotlin/org/opensearch/replication/util/ValidationUtil.kt @@ -161,7 +161,8 @@ object ValidationUtil { return clusterService.settings.getByPrefix(Node.NODE_ATTRIBUTES.key + RemoteStoreNodeAttribute.REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX).isEmpty == false } - fun isRemoteMigrating(clusterService: ClusterService): Boolean { - return clusterService.clusterSettings.get(RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING).equals(RemoteStoreNodeService.CompatibilityMode.MIXED) + fun isRemoteEnabledOrMigrating(clusterService: ClusterService): Boolean { + return isRemoteStoreEnabledCluster(clusterService) || + clusterService.clusterSettings.get(RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING).equals(RemoteStoreNodeService.CompatibilityMode.MIXED) } }