From d9096b200db3e2bcebd802f664a4a2742bda6c2b Mon Sep 17 00:00:00 2001 From: zane-neo Date: Thu, 15 Aug 2024 11:21:07 +0800 Subject: [PATCH] draft PR to fix cluster not able to spin up issue when disk usage exceeds threshold Signed-off-by: zane-neo --- .../cluster/InternalClusterInfoService.java | 26 ++++++++++++++----- .../cluster/block/ClusterBlocks.java | 22 ++++++++++++++-- .../allocation/DiskThresholdMonitor.java | 5 ++++ 3 files changed, 44 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/InternalClusterInfoService.java b/server/src/main/java/org/opensearch/cluster/InternalClusterInfoService.java index e381b8f244bf3..260fa748d599d 100644 --- a/server/src/main/java/org/opensearch/cluster/InternalClusterInfoService.java +++ b/server/src/main/java/org/opensearch/cluster/InternalClusterInfoService.java @@ -155,13 +155,17 @@ void setUpdateFrequency(TimeValue updateFrequency) { @Override public void clusterChanged(ClusterChangedEvent event) { - if (event.localNodeClusterManager() && refreshAndRescheduleRunnable.get() == null) { - logger.trace("elected as cluster-manager, scheduling cluster info update tasks"); - executeRefresh(event.state(), "became cluster-manager"); - - final RefreshAndRescheduleRunnable newRunnable = new RefreshAndRescheduleRunnable(); - refreshAndRescheduleRunnable.set(newRunnable); - threadPool.scheduleUnlessShuttingDown(updateFrequency, REFRESH_EXECUTOR, newRunnable); + if (event.localNodeClusterManager()) { + if (!event.state().blocks().indices().isEmpty()) { + executeRefreshImmediately(event.state()); + } else if (refreshAndRescheduleRunnable.get() == null) { + logger.trace("elected as cluster-manager, scheduling cluster info update tasks"); + executeRefresh(event.state(), "became cluster-manager"); + + final RefreshAndRescheduleRunnable newRunnable = new RefreshAndRescheduleRunnable(); + refreshAndRescheduleRunnable.set(newRunnable); + threadPool.scheduleUnlessShuttingDown(updateFrequency, REFRESH_EXECUTOR, newRunnable); + } } else if (event.localNodeClusterManager() == false) { refreshAndRescheduleRunnable.set(null); return; @@ -204,6 +208,14 @@ private void executeRefresh(ClusterState clusterState, String reason) { } } + private void executeRefreshImmediately(ClusterState clusterState) { + String reason = "became cluster-manager with indices blocked"; + if (!clusterState.nodes().getDataNodes().isEmpty()) { + logger.trace("refreshing cluster info immediately [{}]", reason); + new RefreshRunnable(reason).run(); + } + } + @Override public ClusterInfo getClusterInfo() { final IndicesStatsSummary indicesStatsSummary = this.indicesStatsSummary; // single volatile read diff --git a/server/src/main/java/org/opensearch/cluster/block/ClusterBlocks.java b/server/src/main/java/org/opensearch/cluster/block/ClusterBlocks.java index 02a20b7681ba7..4fcbbc3a9d135 100644 --- a/server/src/main/java/org/opensearch/cluster/block/ClusterBlocks.java +++ b/server/src/main/java/org/opensearch/cluster/block/ClusterBlocks.java @@ -67,9 +67,9 @@ public class ClusterBlocks extends AbstractDiffable { private final Set global; - private final Map> indicesBlocks; + private Map> indicesBlocks; - private final EnumMap levelHolders; + private EnumMap levelHolders; ClusterBlocks(Set global, final Map> indicesBlocks) { this.global = global; @@ -161,6 +161,24 @@ public boolean hasIndexBlock(String index, ClusterBlock block) { return indicesBlocks.containsKey(index) && indicesBlocks.get(index).contains(block); } + public void removeIndexBlock(String index, ClusterBlock block) { + Map> newIndicesBlocks = new HashMap<>(indicesBlocks); // copy to avoid UnsupportedOperationException> + for (Map.Entry> entry : indicesBlocks.entrySet()) { + String indexName = entry.getKey(); + Set clusterBlockSet = new HashSet<>(entry.getValue()); + if (indexName.equals(index)) { + clusterBlockSet.remove(block); + if (clusterBlockSet.isEmpty()) { + newIndicesBlocks.remove(indexName); + } else { + newIndicesBlocks.put(indexName, Collections.unmodifiableSet(clusterBlockSet)); + } + } + } + this.indicesBlocks = Collections.unmodifiableMap(newIndicesBlocks); + this.levelHolders = generateLevelHolders(global, indicesBlocks); + } + public boolean hasIndexBlockWithId(String index, int blockId) { final Set clusterBlocks = indicesBlocks.get(index); if (clusterBlocks != null) { diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitor.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitor.java index e6e5046ea28ee..9c784ad289208 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitor.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitor.java @@ -40,7 +40,9 @@ import org.opensearch.cluster.ClusterInfo; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.DiskUsage; +import org.opensearch.cluster.block.ClusterBlock; import org.opensearch.cluster.block.ClusterBlockLevel; +import org.opensearch.cluster.block.ClusterBlocks; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.routing.RerouteService; @@ -253,6 +255,9 @@ public void onNewInfo(ClusterInfo info) { } } else { + for (Map.Entry> indexBlockEntry : state.blocks().indices().entrySet()) { + state.blocks().removeIndexBlock(indexBlockEntry.getKey(), IndexMetadata.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK); + } nodesOverHighThresholdAndRelocating.remove(node);