From c10b5d315d0f8e6030a72541aefc3295f49743bf Mon Sep 17 00:00:00 2001 From: zhouxiang Date: Fri, 27 Oct 2023 17:25:29 +0800 Subject: [PATCH] fix --- .../rocketmq/client/impl/consumer/RebalanceImpl.java | 2 +- .../rocketmq/client/impl/factory/MQClientInstance.java | 8 ++------ .../client/impl/consumer/RebalancePushImplTest.java | 6 +++--- .../trace/DefaultMQLitePullConsumerWithTraceTest.java | 2 +- 4 files changed, 7 insertions(+), 11 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java index 97d9460f827..b05e333cfb4 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java @@ -296,7 +296,7 @@ public ConcurrentMap getSubscriptionInner() { return subscriptionInner; } - private boolean rebalanceByTopic(final String topic, final boolean isOrder) { + private boolean rebalanceByTopic(final String topic, final boolean isOrder) throws MQClientException { boolean balanced = true; switch (messageModel) { case BROADCASTING: { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index f44ce90b8d8..042585f6672 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -1134,14 +1134,10 @@ private int findBrokerVersion(String brokerName, String brokerAddr) { return 0; } - public List findConsumerIdList(final String topic, final String group) { + public List findConsumerIdList(final String topic, final String group) throws MQClientException { String brokerAddr = this.findBrokerAddrByTopic(topic); if (null == brokerAddr) { - try { - this.updateTopicRouteInfoFromNameServer(topic); - } catch (MQClientException e) { - return null; - } + this.updateTopicRouteInfoFromNameServer(topic); brokerAddr = this.findBrokerAddrByTopic(topic); } diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImplTest.java index f55b5869e56..d5832ab8b95 100644 --- a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImplTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImplTest.java @@ -70,7 +70,7 @@ public RebalancePushImplTest() { } @Test - public void testMessageQueueChanged_CountThreshold() { + public void testMessageQueueChanged_CountThreshold() throws MQClientException { RebalancePushImpl rebalancePush = new RebalancePushImpl(consumerGroup, MessageModel.CLUSTERING, new AllocateMessageQueueAveragely(), mqClientInstance, defaultMQPushConsumer); init(rebalancePush); @@ -100,7 +100,7 @@ private void doRebalanceForcibly(RebalancePushImpl rebalancePush, Set factoryTable = (ConcurrentMap) FieldUtils.readDeclaredField(MQClientManager.getInstance(), "factoryTable", true); factoryTable.put(litePullConsumer.buildMQClientId(), mQClientFactory); - doReturn(false).when(mQClientFactory).updateTopicRouteInfoFromNameServer(anyString()); + doReturn(false).when(mQClientFactory).updateTopicRouteInfoFromNameServerWithoutException(anyString()); } }