Skip to content

Commit

Permalink
adding a small change
Browse files Browse the repository at this point in the history
  • Loading branch information
yuktidave committed Jan 15, 2025
1 parent a275510 commit 8460a97
Showing 1 changed file with 5 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

import java.util.ArrayList;
import java.util.List;

import com.google.common.base.Preconditions;
import org.apache.rocketmq.client.common.ThreadLocalIndex;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.protocol.route.QueueData;
Expand All @@ -32,10 +30,6 @@ public class TopicPublishInfo {
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
private TopicRouteData topicRouteData;

public interface QueueFilter {
boolean filter(MessageQueue mq);
}

public boolean isOrderTopic() {
return orderTopic;
}
Expand Down Expand Up @@ -72,50 +66,16 @@ public void setHaveTopicRouterInfo(boolean haveTopicRouterInfo) {
this.haveTopicRouterInfo = haveTopicRouterInfo;
}

public MessageQueue selectOneMessageQueue(QueueFilter ...filter) {
return selectOneMessageQueue(this.messageQueueList, this.sendWhichQueue, filter);
}

private MessageQueue selectOneMessageQueue(List<MessageQueue> messageQueueList, ThreadLocalIndex sendQueue, QueueFilter ...filter) {
if (messageQueueList == null || messageQueueList.isEmpty()) {
return null;
}

if (filter != null && filter.length != 0) {
for (int i = 0; i < messageQueueList.size(); i++) {
int index = Math.abs(sendQueue.incrementAndGet() % messageQueueList.size());
MessageQueue mq = messageQueueList.get(index);
boolean filterResult = true;
for (QueueFilter f: filter) {
Preconditions.checkNotNull(f);
filterResult &= f.filter(mq);
}
if (filterResult) {
return mq;
}
}

return null;
}

int index = Math.abs(sendQueue.incrementAndGet() % messageQueueList.size());
return messageQueueList.get(index);
}

public void resetIndex() {
this.sendWhichQueue.reset();
}

public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
if (lastBrokerName == null) {
return selectOneMessageQueue();
} else {
for (int i = 0; i < this.messageQueueList.size(); i++) {
MessageQueue mq = selectOneMessageQueue();
MessageQueue mq = selectOneMessageQueue()
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
return mq;
}
}
}
return selectOneMessageQueue();
}
}
Expand All @@ -127,7 +87,7 @@ public MessageQueue selectOneMessageQueue() {
return this.messageQueueList.get(pos);
}

public int getWriteQueueNumsByBroker(final String brokerName) {
public int getWriteQueueIdByBroker(final String brokerName) {
for (int i = 0; i < topicRouteData.getQueueDatas().size(); i++) {
final QueueData queueData = this.topicRouteData.getQueueDatas().get(i);
if (queueData.getBrokerName().equals(brokerName)) {
Expand All @@ -151,4 +111,4 @@ public TopicRouteData getTopicRouteData() {
public void setTopicRouteData(final TopicRouteData topicRouteData) {
this.topicRouteData = topicRouteData;
}
}
}

0 comments on commit 8460a97

Please sign in to comment.