diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java index b1057e2a8d4..fd5e214865f 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java @@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.stream.Collectors; +import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopRebalanceCacheManager.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopRebalanceCacheManager.java new file mode 100644 index 00000000000..43f7ae53df8 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopRebalanceCacheManager.java @@ -0,0 +1,79 @@ +package org.apache.rocketmq.broker.processor; + +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +public class PopRebalanceCacheManager { + + private static final long LOCK_TIMEOUT_MILLIS = 3000L; + + private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + + // Cache with pop mode rebalancing under the condition that consumer groups and queues remain unchanged + private final ConcurrentMap>> loadBalanceDateTable = new ConcurrentHashMap<>(); + + private final ConcurrentMap> topicCidAll = new ConcurrentHashMap<>(); + + private final ConcurrentMap> topicMqAll = new ConcurrentHashMap<>(); + + private final Lock popRebalanceCacheLock = new ReentrantLock(); + + public Set getLoadBalanceDate(List mqAll, List cidAll, String topic, + String clientId, String strategyName, int popShareQueueNum) { + try { + if (this.popRebalanceCacheLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { + try { + // Check if there is caches + ConcurrentHashMap> topicCache = loadBalanceDateTable.get(topic); + if(topicCache == null) { + return null; + } + + // Check whether the consumer group and queue information have changed + List oldMqAll = topicMqAll.get(topic); + List oldCidAll = topicCidAll.get(topic); + if(oldMqAll == null || oldCidAll == null || !oldMqAll.equals(cidAll) || !oldCidAll.equals(mqAll)){ + return null; + } + + return topicCache.get(clientId + "_" + strategyName + "_" + popShareQueueNum); + } finally { + this.popRebalanceCacheLock.unlock(); + } + } + } catch (InterruptedException e) { + log.warn("PopRebalanceCacheManager getLoadBalanceDate Exception", e); + } + + return null; + } + + public void putLoadBalanceDate(List mqAll, List cidAll, String topic, String clientId, String strategyName, int popShareQueueNum, + Set loadBalanceDate) { + try { + if (this.popRebalanceCacheLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { + try { + topicCidAll.put(topic, cidAll); + topicMqAll.put(topic, mqAll); + + loadBalanceDateTable.computeIfAbsent(topic, k -> new ConcurrentHashMap<>()) + .put(clientId + "_" + strategyName + "_" + popShareQueueNum, loadBalanceDate); + } finally { + this.popRebalanceCacheLock.unlock(); + } + } + } catch (InterruptedException e) { + log.warn("PopRebalanceCacheManager putLoadBalanceDate Exception", e); + } + } + +} diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessor.java index 2f4cb7b15f8..5ec5a2b29ea 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessor.java @@ -57,6 +57,8 @@ public class QueryAssignmentProcessor implements NettyRequestProcessor { private final ConcurrentHashMap name2LoadStrategy = new ConcurrentHashMap<>(); + private final PopRebalanceCacheManager popRebalanceCacheManager; + private MessageRequestModeManager messageRequestModeManager; public QueryAssignmentProcessor(final BrokerController brokerController) { @@ -71,6 +73,8 @@ public QueryAssignmentProcessor(final BrokerController brokerController) { this.messageRequestModeManager = new MessageRequestModeManager(brokerController); this.messageRequestModeManager.load(); + + this.popRebalanceCacheManager = new PopRebalanceCacheManager(); } @Override @@ -157,6 +161,8 @@ private RemotingCommand queryAssignment(ChannelHandlerContext ctx, RemotingComma * @param clientId * @param messageModel * @param strategyName + * @param setMessageRequestModeRequestBody + * @param ctx * @return the MessageQueues assigned to this client */ private Set doLoadBalance(final String topic, final String consumerGroup, final String clientId, @@ -207,6 +213,14 @@ private Set doLoadBalance(final String topic, final String consume mqAll.addAll(mqSet); Collections.sort(mqAll); Collections.sort(cidAll); + + // search cache, avoiding unnecessary allocate + int popShareQueueNum = setMessageRequestModeRequestBody.getPopShareQueueNum(); + assignedQueueSet = this.popRebalanceCacheManager.getLoadBalanceDate(mqAll, cidAll, topic, clientId, strategyName, popShareQueueNum); + if (assignedQueueSet != null) { + return assignedQueueSet; + } + List allocateResult = null; try { @@ -218,7 +232,7 @@ private Set doLoadBalance(final String topic, final String consume if (setMessageRequestModeRequestBody != null && setMessageRequestModeRequestBody.getMode() == MessageRequestMode.POP) { allocateResult = allocate4Pop(allocateMessageQueueStrategy, consumerGroup, clientId, mqAll, - cidAll, setMessageRequestModeRequestBody.getPopShareQueueNum()); + cidAll, popShareQueueNum); } else { allocateResult = allocateMessageQueueStrategy.allocate(consumerGroup, clientId, mqAll, cidAll); @@ -232,6 +246,10 @@ private Set doLoadBalance(final String topic, final String consume if (allocateResult != null) { assignedQueueSet.addAll(allocateResult); } + + // add cache, for easy use next time + this.popRebalanceCacheManager.putLoadBalanceDate(mqAll, cidAll, topic, clientId, strategyName, popShareQueueNum, assignedQueueSet); + break; } default: