From f230bd91f2b22095a116a4acdfde63a4c6216e68 Mon Sep 17 00:00:00 2001 From: wgl <2216348784@qq.com> Date: Mon, 13 Jan 2025 21:44:56 +0800 Subject: [PATCH 1/3] Load balancing cache in POP consumption mode --- .../rocketmq/broker/BrokerController.java | 25 ++--- .../broker/client/ConsumerManager.java | 16 +++- .../processor/PopRebalanceCacheManager.java | 95 +++++++++++++++++++ .../processor/QueryAssignmentProcessor.java | 17 +++- .../broker/topic/TopicRouteInfoManager.java | 3 + .../broker/client/ConsumerManagerTest.java | 2 +- 6 files changed, 137 insertions(+), 21 deletions(-) create mode 100644 broker/src/main/java/org/apache/rocketmq/broker/processor/PopRebalanceCacheManager.java diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 744aba19118..98329188f48 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -79,23 +79,7 @@ import org.apache.rocketmq.broker.config.v1.RocksDBConsumerOffsetManager; import org.apache.rocketmq.broker.out.BrokerOuterAPI; import org.apache.rocketmq.broker.plugin.BrokerAttachedPlugin; -import org.apache.rocketmq.broker.processor.AckMessageProcessor; -import org.apache.rocketmq.broker.processor.AdminBrokerProcessor; -import org.apache.rocketmq.broker.processor.ChangeInvisibleTimeProcessor; -import org.apache.rocketmq.broker.processor.ClientManageProcessor; -import org.apache.rocketmq.broker.processor.ConsumerManageProcessor; -import org.apache.rocketmq.broker.processor.EndTransactionProcessor; -import org.apache.rocketmq.broker.processor.NotificationProcessor; -import org.apache.rocketmq.broker.processor.PeekMessageProcessor; -import org.apache.rocketmq.broker.processor.PollingInfoProcessor; -import org.apache.rocketmq.broker.processor.PopInflightMessageCounter; -import org.apache.rocketmq.broker.processor.PopMessageProcessor; -import org.apache.rocketmq.broker.processor.PullMessageProcessor; -import org.apache.rocketmq.broker.processor.QueryAssignmentProcessor; -import org.apache.rocketmq.broker.processor.QueryMessageProcessor; -import org.apache.rocketmq.broker.processor.RecallMessageProcessor; -import org.apache.rocketmq.broker.processor.ReplyMessageProcessor; -import org.apache.rocketmq.broker.processor.SendMessageProcessor; +import org.apache.rocketmq.broker.processor.*; import org.apache.rocketmq.broker.schedule.ScheduleMessageService; import org.apache.rocketmq.broker.slave.SlaveSynchronize; import org.apache.rocketmq.broker.subscription.LmqSubscriptionGroupManager; @@ -219,6 +203,7 @@ public class BrokerController { protected final ConsumerIdsChangeListener consumerIdsChangeListener; protected final EndTransactionProcessor endTransactionProcessor; private final RebalanceLockManager rebalanceLockManager = new RebalanceLockManager(); + private final PopRebalanceCacheManager popRebalanceCacheManager = new PopRebalanceCacheManager(); private final TopicRouteInfoManager topicRouteInfoManager; protected BrokerOuterAPI brokerOuterAPI; protected ScheduledExecutorService scheduledExecutorService; @@ -375,7 +360,7 @@ public BrokerController( this.replyMessageProcessor = new ReplyMessageProcessor(this); this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService, this.popMessageProcessor, this.notificationProcessor); this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this); - this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener, this.brokerStatsManager, this.brokerConfig); + this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener, this.brokerStatsManager, this.brokerConfig, this); this.producerManager = new ProducerManager(this.brokerStatsManager); this.consumerFilterManager = new ConsumerFilterManager(this); this.consumerOrderInfoManager = new ConsumerOrderInfoManager(this); @@ -2525,6 +2510,10 @@ public TopicRouteInfoManager getTopicRouteInfoManager() { return this.topicRouteInfoManager; } + public PopRebalanceCacheManager getPopRebalanceCacheManager() { + return this.popRebalanceCacheManager; + } + public BlockingQueue getClientManagerThreadPoolQueue() { return clientManagerThreadPoolQueue; } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java index b1057e2a8d4..7eeaf780d97 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java @@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.stream.Collectors; +import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; @@ -48,6 +49,7 @@ public class ConsumerManager { protected final BrokerStatsManager brokerStatsManager; private final long channelExpiredTimeout; private final long subscriptionExpiredTimeout; + private BrokerController brokerController; public ConsumerManager(final ConsumerIdsChangeListener consumerIdsChangeListener, long expiredTimeout) { this.consumerIdsChangeListenerList.add(consumerIdsChangeListener); @@ -57,11 +59,12 @@ public ConsumerManager(final ConsumerIdsChangeListener consumerIdsChangeListener } public ConsumerManager(final ConsumerIdsChangeListener consumerIdsChangeListener, - final BrokerStatsManager brokerStatsManager, BrokerConfig brokerConfig) { + final BrokerStatsManager brokerStatsManager, BrokerConfig brokerConfig, BrokerController brokerController) { this.consumerIdsChangeListenerList.add(consumerIdsChangeListener); this.brokerStatsManager = brokerStatsManager; this.channelExpiredTimeout = brokerConfig.getChannelExpiredTimeout(); this.subscriptionExpiredTimeout = brokerConfig.getSubscriptionExpiredTimeout(); + this.brokerController = brokerController; } public ClientChannelInfo findChannel(final String group, final String clientId) { @@ -135,6 +138,9 @@ public boolean doChannelCloseEvent(final String remoteAddr, final Channel channe Entry next = it.next(); ConsumerGroupInfo info = next.getValue(); ClientChannelInfo clientChannelInfo = info.doChannelCloseEvent(remoteAddr, channel); + + brokerController.getPopRebalanceCacheManager().removeTopicCaches(info.getSubscribeTopics()); + if (clientChannelInfo != null) { callConsumerIdsChangeListener(ConsumerGroupEvent.CLIENT_UNREGISTER, next.getKey(), clientChannelInfo, info.getSubscribeTopics()); if (info.getChannelInfoTable().isEmpty()) { @@ -207,6 +213,8 @@ public boolean registerConsumer(final String group, final ClientChannelInfo clie callConsumerIdsChangeListener(ConsumerGroupEvent.REGISTER, group, subList, clientChannelInfo); + brokerController.getPopRebalanceCacheManager().removeTopicCaches(consumerGroupInfo.getSubscribeTopics()); + return r1 || r2; } @@ -226,6 +234,9 @@ public boolean registerConsumerWithoutSub(final String group, final ClientChanne if (null != this.brokerStatsManager) { this.brokerStatsManager.incConsumerRegisterTime((int) (System.currentTimeMillis() - start)); } + + brokerController.getPopRebalanceCacheManager().removeTopicCaches(consumerGroupInfo.getSubscribeTopics()); + return updateChannelRst; } @@ -234,6 +245,9 @@ public void unregisterConsumer(final String group, final ClientChannelInfo clien ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group); if (null != consumerGroupInfo) { boolean removed = consumerGroupInfo.unregisterChannel(clientChannelInfo); + + brokerController.getPopRebalanceCacheManager().removeTopicCaches(consumerGroupInfo.getSubscribeTopics()); + if (removed) { callConsumerIdsChangeListener(ConsumerGroupEvent.CLIENT_UNREGISTER, group, clientChannelInfo, consumerGroupInfo.getSubscribeTopics()); } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopRebalanceCacheManager.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopRebalanceCacheManager.java new file mode 100644 index 00000000000..7d91cc29db0 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopRebalanceCacheManager.java @@ -0,0 +1,95 @@ +package org.apache.rocketmq.broker.processor; + +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +public class PopRebalanceCacheManager { + private static final long LOCK_TIMEOUT_MILLIS = 3000L; + + private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + + // Cache with pop mode rebalancing under the condition that consumer groups and queues remain unchanged + private final ConcurrentMap>> loadBalanceDateTable = new ConcurrentHashMap<>(); + + // Version numbers to avoid cache invalidation during the allocation of old requests, resulting in incorrect cache updates + private final ConcurrentMap versionMap = new ConcurrentHashMap<>(); + + private final Lock popRebalanceCacheLock = new ReentrantLock(); + + public long getVersion(String topic) { + long version = 0L; + try { + if (this.popRebalanceCacheLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { + version = versionMap.getOrDefault(topic, 0L); + } + } catch (InterruptedException e) { + log.warn("PopRebalanceCacheManager getVersion Exception", e); + } + return version; + } + + public Set getLoadBalanceDate(String topic, String clientId, String strategyName, int popShareQueueNum) { + Set loadBalanceDateSet = null; + try { + if (this.popRebalanceCacheLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { + try { + ConcurrentHashMap> topicCache = loadBalanceDateTable.get(topic); + loadBalanceDateSet = topicCache.get(clientId + "_" + strategyName + "_" + popShareQueueNum); + } finally { + this.popRebalanceCacheLock.unlock(); + } + } + } catch (InterruptedException e) { + log.warn("PopRebalanceCacheManager getLoadBalanceDate Exception", e); + } + return loadBalanceDateSet; + } + + public void putLoadBalanceDate(String topic, String clientId, String strategyName, int popShareQueueNum, + Set loadBalanceDate, long oldVersion) { + try { + if (this.popRebalanceCacheLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { + try { + if (getVersion(topic) == oldVersion) { + loadBalanceDateTable.computeIfAbsent(topic, k -> new ConcurrentHashMap<>()) + .put(clientId + "_" + strategyName + "_" + popShareQueueNum, loadBalanceDate); + versionMap.put(topic, oldVersion + 1); + } + } finally { + this.popRebalanceCacheLock.unlock(); + } + } + } catch (InterruptedException e) { + log.warn("PopRebalanceCacheManager putLoadBalanceDate Exception", e); + } + } + + public void removeTopicCaches(Set topics) { + for (String topic : topics) { + removeTopicCache(topic); + } + } + + public void removeTopicCache(String topic) { + try { + if (this.popRebalanceCacheLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { + try { + loadBalanceDateTable.remove(topic); + versionMap.merge(topic, 1L, Long::sum); + } finally { + this.popRebalanceCacheLock.unlock(); + } + } + } catch (InterruptedException e) { + log.warn("PopRebalanceCacheManager removeTopicCache Exception", e); + } + } +} diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessor.java index 2f4cb7b15f8..7945eb52bec 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessor.java @@ -157,6 +157,8 @@ private RemotingCommand queryAssignment(ChannelHandlerContext ctx, RemotingComma * @param clientId * @param messageModel * @param strategyName + * @param setMessageRequestModeRequestBody + * @param ctx * @return the MessageQueues assigned to this client */ private Set doLoadBalance(final String topic, final String consumerGroup, final String clientId, @@ -193,6 +195,15 @@ private Set doLoadBalance(final String topic, final String consume return mqSet; } + // search cache, avoiding unnecessary allocate + int popShareQueueNum = setMessageRequestModeRequestBody.getPopShareQueueNum(); + assignedQueueSet = brokerController.getPopRebalanceCacheManager().getLoadBalanceDate(topic, clientId, strategyName, popShareQueueNum); + if (assignedQueueSet != null) { + return assignedQueueSet; + } + + long version = brokerController.getPopRebalanceCacheManager().getVersion(topic); + List cidAll = null; ConsumerGroupInfo consumerGroupInfo = this.brokerController.getConsumerManager().getConsumerGroupInfo(consumerGroup); if (consumerGroupInfo != null) { @@ -218,7 +229,7 @@ private Set doLoadBalance(final String topic, final String consume if (setMessageRequestModeRequestBody != null && setMessageRequestModeRequestBody.getMode() == MessageRequestMode.POP) { allocateResult = allocate4Pop(allocateMessageQueueStrategy, consumerGroup, clientId, mqAll, - cidAll, setMessageRequestModeRequestBody.getPopShareQueueNum()); + cidAll, popShareQueueNum); } else { allocateResult = allocateMessageQueueStrategy.allocate(consumerGroup, clientId, mqAll, cidAll); @@ -232,6 +243,10 @@ private Set doLoadBalance(final String topic, final String consume if (allocateResult != null) { assignedQueueSet.addAll(allocateResult); } + + // add cache, for easy use next time + brokerController.getPopRebalanceCacheManager().putLoadBalanceDate(topic, clientId, strategyName, popShareQueueNum, assignedQueueSet, version); + break; } default: diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicRouteInfoManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicRouteInfoManager.java index 11bde5f5fe2..e9ce5ad6c92 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicRouteInfoManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicRouteInfoManager.java @@ -164,6 +164,9 @@ private boolean updateSubscribeInfoTable(TopicRouteData topicRouteData, String t log.info("the topic[{}] subscribe message queue changed, old[{}] ,new[{}]", topic, oldSubscribeInfo, newSubscribeInfo); topicSubscribeInfoTable.put(topic, newSubscribeInfo); + + brokerController.getPopRebalanceCacheManager().removeTopicCache(topic); + return true; } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/client/ConsumerManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/client/ConsumerManagerTest.java index a23ad20037c..3e4287a4cc3 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/client/ConsumerManagerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/client/ConsumerManagerTest.java @@ -76,7 +76,7 @@ public void before() { clientChannelInfo = new ClientChannelInfo(channel, CLIENT_ID, LanguageCode.JAVA, VERSION); DefaultConsumerIdsChangeListener defaultConsumerIdsChangeListener = new DefaultConsumerIdsChangeListener(brokerController); BrokerStatsManager brokerStatsManager = new BrokerStatsManager(brokerConfig); - consumerManager = spy(new ConsumerManager(defaultConsumerIdsChangeListener, brokerStatsManager, brokerConfig)); + consumerManager = spy(new ConsumerManager(defaultConsumerIdsChangeListener, brokerStatsManager, brokerConfig, brokerController)); ConsumerFilterManager consumerFilterManager = mock(ConsumerFilterManager.class); when(brokerController.getConsumerFilterManager()).thenReturn(consumerFilterManager); } From 29263c070060d51f996a6fcf7e883c8eb1cc4445 Mon Sep 17 00:00:00 2001 From: wgl <2216348784@qq.com> Date: Tue, 14 Jan 2025 22:19:22 +0800 Subject: [PATCH 2/3] fix:float the cache to the top doRebalancing method --- .../rocketmq/broker/BrokerController.java | 7 +- .../broker/client/ConsumerManager.java | 19 +---- .../processor/PopRebalanceCacheManager.java | 74 ++++++++----------- .../processor/QueryAssignmentProcessor.java | 23 +++--- .../broker/topic/TopicRouteInfoManager.java | 3 - 5 files changed, 45 insertions(+), 81 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 98329188f48..423ff45aa9f 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -203,7 +203,6 @@ public class BrokerController { protected final ConsumerIdsChangeListener consumerIdsChangeListener; protected final EndTransactionProcessor endTransactionProcessor; private final RebalanceLockManager rebalanceLockManager = new RebalanceLockManager(); - private final PopRebalanceCacheManager popRebalanceCacheManager = new PopRebalanceCacheManager(); private final TopicRouteInfoManager topicRouteInfoManager; protected BrokerOuterAPI brokerOuterAPI; protected ScheduledExecutorService scheduledExecutorService; @@ -360,7 +359,7 @@ public BrokerController( this.replyMessageProcessor = new ReplyMessageProcessor(this); this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService, this.popMessageProcessor, this.notificationProcessor); this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this); - this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener, this.brokerStatsManager, this.brokerConfig, this); + this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener, this.brokerStatsManager, this.brokerConfig); this.producerManager = new ProducerManager(this.brokerStatsManager); this.consumerFilterManager = new ConsumerFilterManager(this); this.consumerOrderInfoManager = new ConsumerOrderInfoManager(this); @@ -2510,10 +2509,6 @@ public TopicRouteInfoManager getTopicRouteInfoManager() { return this.topicRouteInfoManager; } - public PopRebalanceCacheManager getPopRebalanceCacheManager() { - return this.popRebalanceCacheManager; - } - public BlockingQueue getClientManagerThreadPoolQueue() { return clientManagerThreadPoolQueue; } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java index 7eeaf780d97..b0466d975a1 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java @@ -27,7 +27,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.stream.Collectors; -import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; @@ -49,7 +48,6 @@ public class ConsumerManager { protected final BrokerStatsManager brokerStatsManager; private final long channelExpiredTimeout; private final long subscriptionExpiredTimeout; - private BrokerController brokerController; public ConsumerManager(final ConsumerIdsChangeListener consumerIdsChangeListener, long expiredTimeout) { this.consumerIdsChangeListenerList.add(consumerIdsChangeListener); @@ -59,12 +57,11 @@ public ConsumerManager(final ConsumerIdsChangeListener consumerIdsChangeListener } public ConsumerManager(final ConsumerIdsChangeListener consumerIdsChangeListener, - final BrokerStatsManager brokerStatsManager, BrokerConfig brokerConfig, BrokerController brokerController) { + final BrokerStatsManager brokerStatsManager, BrokerConfig brokerConfig) { this.consumerIdsChangeListenerList.add(consumerIdsChangeListener); this.brokerStatsManager = brokerStatsManager; this.channelExpiredTimeout = brokerConfig.getChannelExpiredTimeout(); this.subscriptionExpiredTimeout = brokerConfig.getSubscriptionExpiredTimeout(); - this.brokerController = brokerController; } public ClientChannelInfo findChannel(final String group, final String clientId) { @@ -138,9 +135,6 @@ public boolean doChannelCloseEvent(final String remoteAddr, final Channel channe Entry next = it.next(); ConsumerGroupInfo info = next.getValue(); ClientChannelInfo clientChannelInfo = info.doChannelCloseEvent(remoteAddr, channel); - - brokerController.getPopRebalanceCacheManager().removeTopicCaches(info.getSubscribeTopics()); - if (clientChannelInfo != null) { callConsumerIdsChangeListener(ConsumerGroupEvent.CLIENT_UNREGISTER, next.getKey(), clientChannelInfo, info.getSubscribeTopics()); if (info.getChannelInfoTable().isEmpty()) { @@ -212,9 +206,6 @@ public boolean registerConsumer(final String group, final ClientChannelInfo clie } callConsumerIdsChangeListener(ConsumerGroupEvent.REGISTER, group, subList, clientChannelInfo); - - brokerController.getPopRebalanceCacheManager().removeTopicCaches(consumerGroupInfo.getSubscribeTopics()); - return r1 || r2; } @@ -234,10 +225,7 @@ public boolean registerConsumerWithoutSub(final String group, final ClientChanne if (null != this.brokerStatsManager) { this.brokerStatsManager.incConsumerRegisterTime((int) (System.currentTimeMillis() - start)); } - - brokerController.getPopRebalanceCacheManager().removeTopicCaches(consumerGroupInfo.getSubscribeTopics()); - - return updateChannelRst; + return updateChannelRst; } public void unregisterConsumer(final String group, final ClientChannelInfo clientChannelInfo, @@ -245,9 +233,6 @@ public void unregisterConsumer(final String group, final ClientChannelInfo clien ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group); if (null != consumerGroupInfo) { boolean removed = consumerGroupInfo.unregisterChannel(clientChannelInfo); - - brokerController.getPopRebalanceCacheManager().removeTopicCaches(consumerGroupInfo.getSubscribeTopics()); - if (removed) { callConsumerIdsChangeListener(ConsumerGroupEvent.CLIENT_UNREGISTER, group, clientChannelInfo, consumerGroupInfo.getSubscribeTopics()); } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopRebalanceCacheManager.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopRebalanceCacheManager.java index 7d91cc29db0..43f7ae53df8 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopRebalanceCacheManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopRebalanceCacheManager.java @@ -4,6 +4,7 @@ import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -12,6 +13,7 @@ import java.util.concurrent.locks.ReentrantLock; public class PopRebalanceCacheManager { + private static final long LOCK_TIMEOUT_MILLIS = 3000L; private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); @@ -19,77 +21,59 @@ public class PopRebalanceCacheManager { // Cache with pop mode rebalancing under the condition that consumer groups and queues remain unchanged private final ConcurrentMap>> loadBalanceDateTable = new ConcurrentHashMap<>(); - // Version numbers to avoid cache invalidation during the allocation of old requests, resulting in incorrect cache updates - private final ConcurrentMap versionMap = new ConcurrentHashMap<>(); + private final ConcurrentMap> topicCidAll = new ConcurrentHashMap<>(); - private final Lock popRebalanceCacheLock = new ReentrantLock(); + private final ConcurrentMap> topicMqAll = new ConcurrentHashMap<>(); - public long getVersion(String topic) { - long version = 0L; - try { - if (this.popRebalanceCacheLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { - version = versionMap.getOrDefault(topic, 0L); - } - } catch (InterruptedException e) { - log.warn("PopRebalanceCacheManager getVersion Exception", e); - } - return version; - } + private final Lock popRebalanceCacheLock = new ReentrantLock(); - public Set getLoadBalanceDate(String topic, String clientId, String strategyName, int popShareQueueNum) { - Set loadBalanceDateSet = null; + public Set getLoadBalanceDate(List mqAll, List cidAll, String topic, + String clientId, String strategyName, int popShareQueueNum) { try { if (this.popRebalanceCacheLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { try { + // Check if there is caches ConcurrentHashMap> topicCache = loadBalanceDateTable.get(topic); - loadBalanceDateSet = topicCache.get(clientId + "_" + strategyName + "_" + popShareQueueNum); - } finally { - this.popRebalanceCacheLock.unlock(); - } - } - } catch (InterruptedException e) { - log.warn("PopRebalanceCacheManager getLoadBalanceDate Exception", e); - } - return loadBalanceDateSet; - } + if(topicCache == null) { + return null; + } - public void putLoadBalanceDate(String topic, String clientId, String strategyName, int popShareQueueNum, - Set loadBalanceDate, long oldVersion) { - try { - if (this.popRebalanceCacheLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { - try { - if (getVersion(topic) == oldVersion) { - loadBalanceDateTable.computeIfAbsent(topic, k -> new ConcurrentHashMap<>()) - .put(clientId + "_" + strategyName + "_" + popShareQueueNum, loadBalanceDate); - versionMap.put(topic, oldVersion + 1); + // Check whether the consumer group and queue information have changed + List oldMqAll = topicMqAll.get(topic); + List oldCidAll = topicCidAll.get(topic); + if(oldMqAll == null || oldCidAll == null || !oldMqAll.equals(cidAll) || !oldCidAll.equals(mqAll)){ + return null; } + + return topicCache.get(clientId + "_" + strategyName + "_" + popShareQueueNum); } finally { this.popRebalanceCacheLock.unlock(); } } } catch (InterruptedException e) { - log.warn("PopRebalanceCacheManager putLoadBalanceDate Exception", e); + log.warn("PopRebalanceCacheManager getLoadBalanceDate Exception", e); } - } - public void removeTopicCaches(Set topics) { - for (String topic : topics) { - removeTopicCache(topic); - } + return null; } - public void removeTopicCache(String topic) { + public void putLoadBalanceDate(List mqAll, List cidAll, String topic, String clientId, String strategyName, int popShareQueueNum, + Set loadBalanceDate) { try { if (this.popRebalanceCacheLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { try { - loadBalanceDateTable.remove(topic); - versionMap.merge(topic, 1L, Long::sum); + topicCidAll.put(topic, cidAll); + topicMqAll.put(topic, mqAll); + + loadBalanceDateTable.computeIfAbsent(topic, k -> new ConcurrentHashMap<>()) + .put(clientId + "_" + strategyName + "_" + popShareQueueNum, loadBalanceDate); } finally { this.popRebalanceCacheLock.unlock(); } } } catch (InterruptedException e) { - log.warn("PopRebalanceCacheManager removeTopicCache Exception", e); + log.warn("PopRebalanceCacheManager putLoadBalanceDate Exception", e); } } + } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessor.java index 7945eb52bec..5ec5a2b29ea 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessor.java @@ -57,6 +57,8 @@ public class QueryAssignmentProcessor implements NettyRequestProcessor { private final ConcurrentHashMap name2LoadStrategy = new ConcurrentHashMap<>(); + private final PopRebalanceCacheManager popRebalanceCacheManager; + private MessageRequestModeManager messageRequestModeManager; public QueryAssignmentProcessor(final BrokerController brokerController) { @@ -71,6 +73,8 @@ public QueryAssignmentProcessor(final BrokerController brokerController) { this.messageRequestModeManager = new MessageRequestModeManager(brokerController); this.messageRequestModeManager.load(); + + this.popRebalanceCacheManager = new PopRebalanceCacheManager(); } @Override @@ -195,15 +199,6 @@ private Set doLoadBalance(final String topic, final String consume return mqSet; } - // search cache, avoiding unnecessary allocate - int popShareQueueNum = setMessageRequestModeRequestBody.getPopShareQueueNum(); - assignedQueueSet = brokerController.getPopRebalanceCacheManager().getLoadBalanceDate(topic, clientId, strategyName, popShareQueueNum); - if (assignedQueueSet != null) { - return assignedQueueSet; - } - - long version = brokerController.getPopRebalanceCacheManager().getVersion(topic); - List cidAll = null; ConsumerGroupInfo consumerGroupInfo = this.brokerController.getConsumerManager().getConsumerGroupInfo(consumerGroup); if (consumerGroupInfo != null) { @@ -218,6 +213,14 @@ private Set doLoadBalance(final String topic, final String consume mqAll.addAll(mqSet); Collections.sort(mqAll); Collections.sort(cidAll); + + // search cache, avoiding unnecessary allocate + int popShareQueueNum = setMessageRequestModeRequestBody.getPopShareQueueNum(); + assignedQueueSet = this.popRebalanceCacheManager.getLoadBalanceDate(mqAll, cidAll, topic, clientId, strategyName, popShareQueueNum); + if (assignedQueueSet != null) { + return assignedQueueSet; + } + List allocateResult = null; try { @@ -245,7 +248,7 @@ private Set doLoadBalance(final String topic, final String consume } // add cache, for easy use next time - brokerController.getPopRebalanceCacheManager().putLoadBalanceDate(topic, clientId, strategyName, popShareQueueNum, assignedQueueSet, version); + this.popRebalanceCacheManager.putLoadBalanceDate(mqAll, cidAll, topic, clientId, strategyName, popShareQueueNum, assignedQueueSet); break; } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicRouteInfoManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicRouteInfoManager.java index e9ce5ad6c92..11bde5f5fe2 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicRouteInfoManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicRouteInfoManager.java @@ -164,9 +164,6 @@ private boolean updateSubscribeInfoTable(TopicRouteData topicRouteData, String t log.info("the topic[{}] subscribe message queue changed, old[{}] ,new[{}]", topic, oldSubscribeInfo, newSubscribeInfo); topicSubscribeInfoTable.put(topic, newSubscribeInfo); - - brokerController.getPopRebalanceCacheManager().removeTopicCache(topic); - return true; } From 6a3611e50dcce0b45f28393171c78ad00c5b4854 Mon Sep 17 00:00:00 2001 From: wgl <2216348784@qq.com> Date: Tue, 14 Jan 2025 22:47:18 +0800 Subject: [PATCH 3/3] fix:Avoid format changes caused by accidental touch --- .../org/apache/rocketmq/broker/client/ConsumerManager.java | 4 +++- .../apache/rocketmq/broker/client/ConsumerManagerTest.java | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java index b0466d975a1..fd5e214865f 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java @@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.stream.Collectors; +import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; @@ -206,6 +207,7 @@ public boolean registerConsumer(final String group, final ClientChannelInfo clie } callConsumerIdsChangeListener(ConsumerGroupEvent.REGISTER, group, subList, clientChannelInfo); + return r1 || r2; } @@ -225,7 +227,7 @@ public boolean registerConsumerWithoutSub(final String group, final ClientChanne if (null != this.brokerStatsManager) { this.brokerStatsManager.incConsumerRegisterTime((int) (System.currentTimeMillis() - start)); } - return updateChannelRst; + return updateChannelRst; } public void unregisterConsumer(final String group, final ClientChannelInfo clientChannelInfo, diff --git a/broker/src/test/java/org/apache/rocketmq/broker/client/ConsumerManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/client/ConsumerManagerTest.java index 3e4287a4cc3..a23ad20037c 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/client/ConsumerManagerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/client/ConsumerManagerTest.java @@ -76,7 +76,7 @@ public void before() { clientChannelInfo = new ClientChannelInfo(channel, CLIENT_ID, LanguageCode.JAVA, VERSION); DefaultConsumerIdsChangeListener defaultConsumerIdsChangeListener = new DefaultConsumerIdsChangeListener(brokerController); BrokerStatsManager brokerStatsManager = new BrokerStatsManager(brokerConfig); - consumerManager = spy(new ConsumerManager(defaultConsumerIdsChangeListener, brokerStatsManager, brokerConfig, brokerController)); + consumerManager = spy(new ConsumerManager(defaultConsumerIdsChangeListener, brokerStatsManager, brokerConfig)); ConsumerFilterManager consumerFilterManager = mock(ConsumerFilterManager.class); when(brokerController.getConsumerFilterManager()).thenReturn(consumerFilterManager); }