Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
drpmma committed Oct 27, 2023
1 parent 033690e commit c10b5d3
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ public ConcurrentMap<String, SubscriptionData> getSubscriptionInner() {
return subscriptionInner;
}

private boolean rebalanceByTopic(final String topic, final boolean isOrder) {
private boolean rebalanceByTopic(final String topic, final boolean isOrder) throws MQClientException {
boolean balanced = true;
switch (messageModel) {
case BROADCASTING: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1134,14 +1134,10 @@ private int findBrokerVersion(String brokerName, String brokerAddr) {
return 0;
}

public List<String> findConsumerIdList(final String topic, final String group) {
public List<String> findConsumerIdList(final String topic, final String group) throws MQClientException {
String brokerAddr = this.findBrokerAddrByTopic(topic);
if (null == brokerAddr) {
try {
this.updateTopicRouteInfoFromNameServer(topic);
} catch (MQClientException e) {
return null;
}
this.updateTopicRouteInfoFromNameServer(topic);
brokerAddr = this.findBrokerAddrByTopic(topic);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public RebalancePushImplTest() {
}

@Test
public void testMessageQueueChanged_CountThreshold() {
public void testMessageQueueChanged_CountThreshold() throws MQClientException {
RebalancePushImpl rebalancePush = new RebalancePushImpl(consumerGroup, MessageModel.CLUSTERING,
new AllocateMessageQueueAveragely(), mqClientInstance, defaultMQPushConsumer);
init(rebalancePush);
Expand Down Expand Up @@ -100,7 +100,7 @@ private void doRebalanceForcibly(RebalancePushImpl rebalancePush, Set<MessageQue
rebalancePush.messageQueueChanged(topic, allocateResultSet, allocateResultSet);
}

private void init(final RebalancePushImpl rebalancePush) {
private void init(final RebalancePushImpl rebalancePush) throws MQClientException {
rebalancePush.getSubscriptionInner().putIfAbsent(topic, new SubscriptionData());

rebalancePush.subscriptionInner.putIfAbsent(topic, new SubscriptionData());
Expand All @@ -111,7 +111,7 @@ private void init(final RebalancePushImpl rebalancePush) {
}

@Test
public void testMessageQueueChanged_SizeThreshold() {
public void testMessageQueueChanged_SizeThreshold() throws MQClientException {
RebalancePushImpl rebalancePush = new RebalancePushImpl(consumerGroup, MessageModel.CLUSTERING,
new AllocateMessageQueueAveragely(), mqClientInstance, defaultMQPushConsumer);
init(rebalancePush);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ private static void suppressUpdateTopicRouteInfoFromNameServer(DefaultLitePullCo
MQClientInstance mQClientFactory = spy(MQClientManager.getInstance().getOrCreateMQClientInstance(litePullConsumer, (RPCHook) FieldUtils.readDeclaredField(defaultLitePullConsumerImpl, "rpcHook", true)));
ConcurrentMap<String, MQClientInstance> factoryTable = (ConcurrentMap<String, MQClientInstance>) FieldUtils.readDeclaredField(MQClientManager.getInstance(), "factoryTable", true);
factoryTable.put(litePullConsumer.buildMQClientId(), mQClientFactory);
doReturn(false).when(mQClientFactory).updateTopicRouteInfoFromNameServer(anyString());
doReturn(false).when(mQClientFactory).updateTopicRouteInfoFromNameServerWithoutException(anyString());
}

}

0 comments on commit c10b5d3

Please sign in to comment.