From 8460a97fcbf62ff9aa396fb53952e94b830b4e8f Mon Sep 17 00:00:00 2001 From: Yukti Dave Date: Wed, 15 Jan 2025 13:24:31 +0530 Subject: [PATCH] adding a small change --- .../impl/producer/TopicPublishInfo.java | 50 ++----------------- 1 file changed, 5 insertions(+), 45 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java index 917fe57aa87..71a8509e7d6 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java @@ -18,8 +18,6 @@ import java.util.ArrayList; import java.util.List; - -import com.google.common.base.Preconditions; import org.apache.rocketmq.client.common.ThreadLocalIndex; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.protocol.route.QueueData; @@ -32,10 +30,6 @@ public class TopicPublishInfo { private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(); private TopicRouteData topicRouteData; - public interface QueueFilter { - boolean filter(MessageQueue mq); - } - public boolean isOrderTopic() { return orderTopic; } @@ -72,50 +66,16 @@ public void setHaveTopicRouterInfo(boolean haveTopicRouterInfo) { this.haveTopicRouterInfo = haveTopicRouterInfo; } - public MessageQueue selectOneMessageQueue(QueueFilter ...filter) { - return selectOneMessageQueue(this.messageQueueList, this.sendWhichQueue, filter); - } - - private MessageQueue selectOneMessageQueue(List messageQueueList, ThreadLocalIndex sendQueue, QueueFilter ...filter) { - if (messageQueueList == null || messageQueueList.isEmpty()) { - return null; - } - - if (filter != null && filter.length != 0) { - for (int i = 0; i < messageQueueList.size(); i++) { - int index = Math.abs(sendQueue.incrementAndGet() % messageQueueList.size()); - MessageQueue mq = messageQueueList.get(index); - boolean filterResult = true; - for (QueueFilter f: filter) { - Preconditions.checkNotNull(f); - filterResult &= f.filter(mq); - } - if (filterResult) { - return mq; - } - } - - return null; - } - - int index = Math.abs(sendQueue.incrementAndGet() % messageQueueList.size()); - return messageQueueList.get(index); - } - - public void resetIndex() { - this.sendWhichQueue.reset(); - } - public MessageQueue selectOneMessageQueue(final String lastBrokerName) { if (lastBrokerName == null) { return selectOneMessageQueue(); } else { for (int i = 0; i < this.messageQueueList.size(); i++) { - MessageQueue mq = selectOneMessageQueue(); + MessageQueue mq = selectOneMessageQueue() if (!mq.getBrokerName().equals(lastBrokerName)) { - return mq; + return mq; } - } + } return selectOneMessageQueue(); } } @@ -127,7 +87,7 @@ public MessageQueue selectOneMessageQueue() { return this.messageQueueList.get(pos); } - public int getWriteQueueNumsByBroker(final String brokerName) { + public int getWriteQueueIdByBroker(final String brokerName) { for (int i = 0; i < topicRouteData.getQueueDatas().size(); i++) { final QueueData queueData = this.topicRouteData.getQueueDatas().get(i); if (queueData.getBrokerName().equals(brokerName)) { @@ -151,4 +111,4 @@ public TopicRouteData getTopicRouteData() { public void setTopicRouteData(final TopicRouteData topicRouteData) { this.topicRouteData = topicRouteData; } -} +} \ No newline at end of file