From 8d45db43efc0d4d222c2c69382ca4ef897558f3c Mon Sep 17 00:00:00 2001 From: Shubh Sahu Date: Thu, 18 Apr 2024 22:21:56 +0530 Subject: [PATCH] few name changes Signed-off-by: Shubh Sahu --- .../changes/TransportGetChangesAction.kt | 20 +++++++++---------- .../replication/util/ValidationUtil.kt | 2 +- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/src/main/kotlin/org/opensearch/replication/action/changes/TransportGetChangesAction.kt b/src/main/kotlin/org/opensearch/replication/action/changes/TransportGetChangesAction.kt index 8a20b3dd..0f00774d 100644 --- a/src/main/kotlin/org/opensearch/replication/action/changes/TransportGetChangesAction.kt +++ b/src/main/kotlin/org/opensearch/replication/action/changes/TransportGetChangesAction.kt @@ -77,8 +77,8 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus indexMetric.lastFetchTime.set(relativeStartNanos) val indexShard = indicesService.indexServiceSafe(shardId.index).getShard(shardId.id) - val isRemoteStoreOrMixedModeEnabled = ValidationUtil.isRemoteStoreEnabledCluster(clusterService) || ValidationUtil.isMixedModeEnabledCluster(clusterService) - if (lastGlobalCheckpoint(indexShard, isRemoteStoreOrMixedModeEnabled) < request.fromSeqNo) { + val isRemoteEnabledOrMigrating = ValidationUtil.isRemoteStoreEnabledCluster(clusterService) || ValidationUtil.isRemoteMigrating(clusterService) + if (lastGlobalCheckpoint(indexShard, isRemoteEnabledOrMigrating) < request.fromSeqNo) { // There are no new operations to sync. Do a long poll and wait for GlobalCheckpoint to advance. If // the checkpoint doesn't advance by the timeout this throws an ESTimeoutException which the caller // should catch and start a new poll. @@ -87,18 +87,18 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus // At this point indexShard.lastKnownGlobalCheckpoint has advanced but it may not yet have been synced // to the translog, which means we can't return those changes. Return to the caller to retry. // TODO: Figure out a better way to wait for the global checkpoint to be synced to the translog - if (lastGlobalCheckpoint(indexShard, isRemoteStoreOrMixedModeEnabled) < request.fromSeqNo) { - assert(gcp > lastGlobalCheckpoint(indexShard, isRemoteStoreOrMixedModeEnabled)) { "Checkpoint didn't advance at all $gcp ${lastGlobalCheckpoint(indexShard, isRemoteStoreOrMixedModeEnabled)}" } + if (lastGlobalCheckpoint(indexShard, isRemoteEnabledOrMigrating) < request.fromSeqNo) { + assert(gcp > lastGlobalCheckpoint(indexShard, isRemoteEnabledOrMigrating)) { "Checkpoint didn't advance at all $gcp ${lastGlobalCheckpoint(indexShard, isRemoteEnabledOrMigrating)}" } throw OpenSearchTimeoutException("global checkpoint not synced. Retry after a few miliseconds...") } } relativeStartNanos = System.nanoTime() // At this point lastSyncedGlobalCheckpoint is at least fromSeqNo - val toSeqNo = min(lastGlobalCheckpoint(indexShard, isRemoteStoreOrMixedModeEnabled), request.toSeqNo) + val toSeqNo = min(lastGlobalCheckpoint(indexShard, isRemoteEnabledOrMigrating), request.toSeqNo) var ops: List = listOf() - var fetchFromTranslog = isTranslogPruningByRetentionLeaseEnabled(shardId) && isRemoteStoreOrMixedModeEnabled == false + var fetchFromTranslog = isTranslogPruningByRetentionLeaseEnabled(shardId) && isRemoteEnabledOrMigrating == false if(fetchFromTranslog) { try { ops = translogService.getHistoryOfOperations(indexShard, request.fromSeqNo, toSeqNo) @@ -136,16 +136,16 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus indexMetric.ops.addAndGet(ops.size.toLong()) ops.stream().forEach{op -> indexMetric.bytesRead.addAndGet(op.estimateSize()) } - GetChangesResponse(ops, request.fromSeqNo, indexShard.maxSeqNoOfUpdatesOrDeletes, lastGlobalCheckpoint(indexShard, isRemoteStoreOrMixedModeEnabled)) + GetChangesResponse(ops, request.fromSeqNo, indexShard.maxSeqNoOfUpdatesOrDeletes, lastGlobalCheckpoint(indexShard, isRemoteEnabledOrMigrating)) } } } - private fun lastGlobalCheckpoint(indexShard: IndexShard, isRemoteStoreOrMixedModeEnabled: Boolean): Long { + private fun lastGlobalCheckpoint(indexShard: IndexShard, isRemoteEnabledOrMigrating: Boolean): Long { // We rely on lastSyncedGlobalCheckpoint as it has been durably written to disk. In case of remote store // enabled clusters, the semantics are slightly different, and we can't use lastSyncedGlobalCheckpoint. Falling back to // lastKnownGlobalCheckpoint in such cases. - return if (isRemoteStoreOrMixedModeEnabled) { + return if (isRemoteEnabledOrMigrating) { indexShard.lastKnownGlobalCheckpoint } else { indexShard.lastSyncedGlobalCheckpoint @@ -173,7 +173,7 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus override fun shards(state: ClusterState, request: InternalRequest): ShardsIterator { val shardIt = state.routingTable().shardRoutingTable(request.request().shardId) // Random active shards - return if (ValidationUtil.isRemoteStoreEnabledCluster(clusterService) || ValidationUtil.isMixedModeEnabledCluster(clusterService)) shardIt.primaryShardIt() + return if (ValidationUtil.isRemoteStoreEnabledCluster(clusterService) || ValidationUtil.isRemoteMigrating(clusterService)) shardIt.primaryShardIt() else shardIt.activeInitializingShardsRandomIt() } } \ No newline at end of file diff --git a/src/main/kotlin/org/opensearch/replication/util/ValidationUtil.kt b/src/main/kotlin/org/opensearch/replication/util/ValidationUtil.kt index d4a1340b..384a53f5 100644 --- a/src/main/kotlin/org/opensearch/replication/util/ValidationUtil.kt +++ b/src/main/kotlin/org/opensearch/replication/util/ValidationUtil.kt @@ -161,7 +161,7 @@ object ValidationUtil { return clusterService.settings.getByPrefix(Node.NODE_ATTRIBUTES.key + RemoteStoreNodeAttribute.REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX).isEmpty == false } - fun isMixedModeEnabledCluster(clusterService: ClusterService): Boolean { + fun isRemoteMigrating(clusterService: ClusterService): Boolean { return clusterService.clusterSettings.get(RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING).equals(RemoteStoreNodeService.CompatibilityMode.MIXED) } }