Skip to content

Commit

Permalink
[ISSUE #7547] Let consumer be aware of message queue assignment change (
Browse files Browse the repository at this point in the history
#7548)

* let consumer be aware of message queue assignment change

Signed-off-by: Li Zhanhui <[email protected]>

* add unit test for DefaultMQPushConsumer#setMessageQueueListener

Signed-off-by: Li Zhanhui <[email protected]>

* fix: bazel build warnings

Signed-off-by: Zhanhui Li <[email protected]>

* fix: set MixCommitlogTest test size as medium

Signed-off-by: Zhanhui Li <[email protected]>

* allow cache bazel test results

Signed-off-by: Li Zhanhui <[email protected]>

* fix code style issue by removing unused imports

Signed-off-by: Li Zhanhui <[email protected]>

* fix #7552

Signed-off-by: Zhanhui Li <[email protected]>

---------

Signed-off-by: Li Zhanhui <[email protected]>
Signed-off-by: Zhanhui Li <[email protected]>
  • Loading branch information
lizhanhui authored Nov 13, 2023
1 parent 27759f3 commit 15d32db
Show file tree
Hide file tree
Showing 12 changed files with 59 additions and 9 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/bazel.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ jobs:
- name: Build
run: bazel build --config=remote //...
- name: Run Tests
run: bazel test --config=remote --nocache_test_results //...
run: bazel test --config=remote //...
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,11 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
*/
private MessageListener messageListener;

/**
* Listener to call if message queue assignment is changed.
*/
private MessageQueueListener messageQueueListener;

/**
* Offset Storage
*/
Expand Down Expand Up @@ -987,4 +992,12 @@ public boolean isClientRebalance() {
public void setClientRebalance(boolean clientRebalance) {
this.clientRebalance = clientRebalance;
}

public MessageQueueListener getMessageQueueListener() {
return messageQueueListener;
}

public void setMessageQueueListener(MessageQueueListener messageQueueListener) {
this.messageQueueListener = messageQueueListener;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,22 @@
*/
public interface MQConsumer extends MQAdmin {
/**
* If consuming failure,message will be send back to the brokers,and delay consuming some time
* If consuming of messages failed, they will be sent back to the brokers for another delivery attempt after
* interval specified in delay level.
*/
@Deprecated
void sendMessageBack(final MessageExt msg, final int delayLevel) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException;

/**
* If consuming failure,message will be send back to the broker,and delay consuming some time
* If consuming of messages failed, they will be sent back to the brokers for another delivery attempt after
* interval specified in delay level.
*/
void sendMessageBack(final MessageExt msg, final int delayLevel, final String brokerName)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException;

/**
* Fetch message queues from consumer cache according to the topic
* Fetch message queues from consumer cache pertaining to the given topic.
*
* @param topic message topic
* @return queue set
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ public interface MessageQueueListener {
/**
* @param topic message topic
* @param mqAll all queues in this message topic
* @param mqDivided collection of queues,assigned to the current consumer
* @param mqAssigned collection of queues, assigned to the current consumer
*/
void messageQueueChanged(final String topic, final Set<MessageQueue> mqAll,
final Set<MessageQueue> mqDivided);
void messageQueueChanged(final String topic, final Set<MessageQueue> mqAll, final Set<MessageQueue> mqAssigned);
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.rocketmq.client.consumer.AckResult;
import org.apache.rocketmq.client.consumer.AckStatus;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageQueueListener;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.PopCallback;
import org.apache.rocketmq.client.consumer.PopResult;
Expand Down Expand Up @@ -132,7 +133,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
private long queueMaxSpanFlowControlTimes = 0;

//10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
private int[] popDelayLevel = new int[] {10, 30, 60, 120, 180, 240, 300, 360, 420, 480, 540, 600, 1200, 1800, 3600, 7200};
private final int[] popDelayLevel = new int[] {10, 30, 60, 120, 180, 240, 300, 360, 420, 480, 540, 600, 1200, 1800, 3600, 7200};

private static final int MAX_POP_INVISIBLE_TIME = 300000;
private static final int MIN_POP_INVISIBLE_TIME = 5000;
Expand Down Expand Up @@ -1553,4 +1554,11 @@ public void setPullTimeDelayMillsWhenException(long pullTimeDelayMillsWhenExcept
int[] getPopDelayLevel() {
return popDelayLevel;
}

public MessageQueueListener getMessageQueueListener() {
if (null == defaultMQPushConsumer) {
return null;
}
return defaultMQPushConsumer.getMessageQueueListener();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.consumer.MessageQueueListener;
import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
import org.apache.rocketmq.client.exception.MQClientException;
Expand Down Expand Up @@ -52,7 +53,7 @@ public RebalancePushImpl(String consumerGroup, MessageModel messageModel,

@Override
public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
/**
/*
* When rebalance result changed, should update subscription's version to notify broker.
* Fix: inconsistency subscription may lead to consumer miss messages.
*/
Expand Down Expand Up @@ -82,6 +83,11 @@ public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<Messa

// notify broker
this.getmQClientFactory().sendHeartbeatToAllBrokerWithLockV2(true);

MessageQueueListener messageQueueListener = defaultMQPushConsumerImpl.getMessageQueueListener();
if (null != messageQueueListener) {
messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public static LocalRemotingCommand createRequestCommand(int code, CommandCustomH
cmd.writeCustomHeader(customHeader);
cmd.setExtFields(new HashMap<>());
setCmdVersion(cmd);
cmd.makeCustomHeaderToNet();
return cmd;
}

Expand Down
1 change: 1 addition & 0 deletions remoting/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ java_library(
"@maven//:io_opentelemetry_opentelemetry_sdk_metrics",
"@maven//:org_apache_tomcat_annotations_api",
"@maven//:org_apache_commons_commons_lang3",
"@maven//:org_jetbrains_annotations",
],
resources = glob(["src/test/resources/certs/*.pem"]) + glob(["src/test/resources/certs/*.key"])
)
Expand Down
1 change: 1 addition & 0 deletions store/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ GenTestRules(
"src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest",
"src/test/java/org/apache/rocketmq/store/MappedFileQueueTest",
"src/test/java/org/apache/rocketmq/store/queue/BatchConsumeMessageTest",
"src/test/java/org/apache/rocketmq/store/dledger/MixCommitlogTest",
],
test_files = glob(["src/test/java/**/*Test.java"]),
deps = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.rocketmq.test.client.consumer.balance;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.test.base.BaseConf;
Expand Down Expand Up @@ -112,4 +114,19 @@ public void test3ConsumerAndCrashOne() {
consumer2.getListener().getAllUndupMsgBody()).size());
assertThat(balance).isEqualTo(true);
}

@Test
public void testMessageQueueListener() throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);

RMQNormalConsumer consumer1 = getConsumer(NAMESRV_ADDR, topic, "*", new RMQNormalListener());
// Register message queue listener
consumer1.getConsumer().setMessageQueueListener((topic, mqAll, mqAssigned) -> latch.countDown());

// Without message queue listener
RMQNormalConsumer consumer2 = getConsumer(NAMESRV_ADDR, consumer1.getConsumerGroup(), topic,
"*", new RMQNormalListener());

Assert.assertTrue(latch.await(30, TimeUnit.SECONDS));
}
}
1 change: 1 addition & 0 deletions tieredstore/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ java_library(
"@maven//:org_apache_tomcat_annotations_api",
"@maven//:com_alibaba_fastjson",
"@maven//:org_apache_rocketmq_rocketmq_rocksdb",
"@maven//:commons_collections_commons_collections",
],
)

Expand Down
1 change: 1 addition & 0 deletions tools/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ java_library(
"@maven//:commons_collections_commons_collections",
"@maven//:io_github_aliyunmq_rocketmq_slf4j_api",
"@maven//:io_github_aliyunmq_rocketmq_logback_classic",
"@maven//:org_apache_rocketmq_rocketmq_rocksdb",
],
)

Expand Down

0 comments on commit 15d32db

Please sign in to comment.