Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/develop' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
YoWuwuuuw committed Jan 14, 2025
2 parents 6a3611e + 5c5db84 commit aea3ec3
Show file tree
Hide file tree
Showing 80 changed files with 4,693 additions and 327 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ $ docker run -it --net=host apache/rocketmq ./mqnamesrv
**2) Start Broker**

```shell
$ docker run -it --net=host --mount source=/tmp/store,target=/home/rocketmq/store apache/rocketmq ./mqbroker -n localhost:9876
$ docker run -it --net=host --mount type=bind,source=/tmp/store,target=/home/rocketmq/store apache/rocketmq ./mqbroker -n localhost:9876
```

### Run RocketMQ in Kubernetes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,13 @@ public List<DefaultAuthorizationContext> build(ChannelHandlerContext context, Re
Resource group;
switch (command.getCode()) {
case RequestCode.GET_ROUTEINFO_BY_TOPIC:
topic = Resource.ofTopic(fields.get(TOPIC));
result.add(DefaultAuthorizationContext.of(subject, topic, Arrays.asList(Action.PUB, Action.SUB, Action.GET), sourceIp));
if (NamespaceUtil.isRetryTopic(fields.get(TOPIC))) {
group = Resource.ofGroup(fields.get(TOPIC));
result.add(DefaultAuthorizationContext.of(subject, group, Arrays.asList(Action.SUB, Action.GET), sourceIp));
} else {
topic = Resource.ofTopic(fields.get(TOPIC));
result.add(DefaultAuthorizationContext.of(subject, topic, Arrays.asList(Action.PUB, Action.SUB, Action.GET), sourceIp));
}
break;
case RequestCode.SEND_MESSAGE:
if (NamespaceUtil.isRetryTopic(fields.get(TOPIC))) {
Expand Down
1 change: 1 addition & 0 deletions broker/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ java_library(
"//srvutil",
"//store",
"//tieredstore",
"@maven//:org_slf4j_slf4j_api",
"@maven//:ch_qos_logback_logback_classic",
"@maven//:com_alibaba_fastjson",
"@maven//:com_alibaba_fastjson2_fastjson2",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,24 @@
import org.apache.rocketmq.broker.config.v1.RocksDBConsumerOffsetManager;
import org.apache.rocketmq.broker.out.BrokerOuterAPI;
import org.apache.rocketmq.broker.plugin.BrokerAttachedPlugin;
import org.apache.rocketmq.broker.processor.*;
import org.apache.rocketmq.broker.pop.PopConsumerService;
import org.apache.rocketmq.broker.processor.AckMessageProcessor;
import org.apache.rocketmq.broker.processor.AdminBrokerProcessor;
import org.apache.rocketmq.broker.processor.ChangeInvisibleTimeProcessor;
import org.apache.rocketmq.broker.processor.ClientManageProcessor;
import org.apache.rocketmq.broker.processor.ConsumerManageProcessor;
import org.apache.rocketmq.broker.processor.EndTransactionProcessor;
import org.apache.rocketmq.broker.processor.NotificationProcessor;
import org.apache.rocketmq.broker.processor.PeekMessageProcessor;
import org.apache.rocketmq.broker.processor.PollingInfoProcessor;
import org.apache.rocketmq.broker.processor.PopInflightMessageCounter;
import org.apache.rocketmq.broker.processor.PopMessageProcessor;
import org.apache.rocketmq.broker.processor.PullMessageProcessor;
import org.apache.rocketmq.broker.processor.QueryAssignmentProcessor;
import org.apache.rocketmq.broker.processor.QueryMessageProcessor;
import org.apache.rocketmq.broker.processor.RecallMessageProcessor;
import org.apache.rocketmq.broker.processor.ReplyMessageProcessor;
import org.apache.rocketmq.broker.processor.SendMessageProcessor;
import org.apache.rocketmq.broker.schedule.ScheduleMessageService;
import org.apache.rocketmq.broker.slave.SlaveSynchronize;
import org.apache.rocketmq.broker.subscription.LmqSubscriptionGroupManager;
Expand Down Expand Up @@ -182,6 +199,7 @@ public class BrokerController {
protected final ConsumerFilterManager consumerFilterManager;
protected final ConsumerOrderInfoManager consumerOrderInfoManager;
protected final PopInflightMessageCounter popInflightMessageCounter;
protected final PopConsumerService popConsumerService;
protected final ProducerManager producerManager;
protected final ScheduleMessageService scheduleMessageService;
protected final ClientHousekeepingService clientHousekeepingService;
Expand Down Expand Up @@ -364,6 +382,7 @@ public BrokerController(
this.consumerFilterManager = new ConsumerFilterManager(this);
this.consumerOrderInfoManager = new ConsumerOrderInfoManager(this);
this.popInflightMessageCounter = new PopInflightMessageCounter(this);
this.popConsumerService = brokerConfig.isPopConsumerKVServiceInit() ? new PopConsumerService(this) : null;
this.clientHousekeepingService = new ClientHousekeepingService(this);
this.broker2Client = new Broker2Client(this);
this.scheduleMessageService = new ScheduleMessageService(this);
Expand Down Expand Up @@ -1298,6 +1317,10 @@ public PopInflightMessageCounter getPopInflightMessageCounter() {
return popInflightMessageCounter;
}

public PopConsumerService getPopConsumerService() {
return popConsumerService;
}

public ConsumerOffsetManager getConsumerOffsetManager() {
return consumerOffsetManager;
}
Expand Down Expand Up @@ -1401,12 +1424,13 @@ protected void shutdownBasicService() {
this.pullRequestHoldService.shutdown();
}

{
this.popMessageProcessor.getPopLongPollingService().shutdown();
this.popMessageProcessor.getQueueLockManager().shutdown();
if (this.popConsumerService != null) {
this.popConsumerService.shutdown();
}

{
this.popMessageProcessor.getPopLongPollingService().shutdown();
this.popMessageProcessor.getQueueLockManager().shutdown();
this.popMessageProcessor.getPopBufferMergeService().shutdown();
this.ackMessageProcessor.shutdownPopReviveService();
}
Expand Down Expand Up @@ -1657,18 +1681,26 @@ protected void startBasicService() throws Exception {

if (this.popMessageProcessor != null) {
this.popMessageProcessor.getPopLongPollingService().start();
this.popMessageProcessor.getPopBufferMergeService().start();
if (brokerConfig.isPopConsumerFSServiceInit()) {
this.popMessageProcessor.getPopBufferMergeService().start();
}
this.popMessageProcessor.getQueueLockManager().start();
}

if (this.ackMessageProcessor != null) {
this.ackMessageProcessor.startPopReviveService();
if (brokerConfig.isPopConsumerFSServiceInit()) {
this.ackMessageProcessor.startPopReviveService();
}
}

if (this.notificationProcessor != null) {
this.notificationProcessor.getPopLongPollingService().start();
}

if (this.popConsumerService != null) {
this.popConsumerService.start();
}

if (this.topicQueueMappingCleanService != null) {
this.topicQueueMappingCleanService.start();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public RemotingCommand resetOffset(String topic, String group, long timeStamp, b
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
if (null == topicConfig) {
log.error("[reset-offset] reset offset failed, no topic in this broker. topic={}", topic);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark("[reset-offset] reset offset failed, no topic in this broker. topic=" + topic);
return response;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

import com.alibaba.fastjson.JSONObject;

import com.alibaba.fastjson2.JSONObject;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class RocksDBConsumerOffsetManager extends ConsumerOffsetManager {

protected static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);

protected RocksDBConfigManager rocksDBConfigManager;
protected transient RocksDBConfigManager rocksDBConfigManager;

public RocksDBConsumerOffsetManager(BrokerController brokerController) {
super(brokerController);
Expand Down Expand Up @@ -100,7 +100,7 @@ protected void removeConsumerOffset(String topicAtGroup) {
byte[] keyBytes = topicAtGroup.getBytes(DataConverter.CHARSET_UTF8);
this.rocksDBConfigManager.delete(keyBytes);
} catch (Exception e) {
LOG.error("kv remove consumerOffset Failed, {}", topicAtGroup);
log.error("kv remove consumerOffset Failed, {}", topicAtGroup);
}
}

Expand All @@ -109,7 +109,7 @@ protected void decodeOffset(final byte[] key, final byte[] body) {
RocksDBOffsetSerializeWrapper wrapper = JSON.parseObject(body, RocksDBOffsetSerializeWrapper.class);

this.offsetTable.put(topicAtGroup, wrapper.getOffsetTable());
LOG.info("load exist local offset, {}, {}", topicAtGroup, wrapper.getOffsetTable());
log.info("load exist local offset, {}, {}", topicAtGroup, wrapper.getOffsetTable());
}

public String rocksdbConfigFilePath() {
Expand All @@ -132,12 +132,17 @@ public synchronized void persist() {
this.rocksDBConfigManager.batchPutWithWal(writeBatch);
this.rocksDBConfigManager.flushWAL();
} catch (Exception e) {
LOG.error("consumer offset persist Failed", e);
log.error("consumer offset persist Failed", e);
} finally {
writeBatch.close();
}
}

public synchronized void exportToJson() {
log.info("RocksDBConsumerOffsetManager export consumer offset to json file");
super.persist();
}

private void putWriteBatch(final WriteBatch writeBatch, final String topicGroupName, final ConcurrentMap<Integer, Long> offsetMap) throws Exception {
byte[] keyBytes = topicGroupName.getBytes(DataConverter.CHARSET_UTF8);
RocksDBOffsetSerializeWrapper wrapper = new RocksDBOffsetSerializeWrapper();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@

public class RocksDBSubscriptionGroupManager extends SubscriptionGroupManager {

protected RocksDBConfigManager rocksDBConfigManager;
protected transient RocksDBConfigManager rocksDBConfigManager;

public RocksDBSubscriptionGroupManager(BrokerController brokerController) {
super(brokerController, false);
Expand Down Expand Up @@ -184,6 +184,11 @@ public synchronized void persist() {
}
}

public synchronized void exportToJson() {
log.info("RocksDBSubscriptionGroupManager export subscription group to json file");
super.persist();
}

public String rocksdbConfigFilePath() {
return this.brokerController.getMessageStoreConfig().getStorePathRootDir() + File.separator + "config" + File.separator + "subscriptionGroups" + File.separator;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@

public class RocksDBTopicConfigManager extends TopicConfigManager {

protected RocksDBConfigManager rocksDBConfigManager;
protected transient RocksDBConfigManager rocksDBConfigManager;

public RocksDBTopicConfigManager(BrokerController brokerController) {
super(brokerController, false);
Expand Down Expand Up @@ -139,6 +139,11 @@ public synchronized void persist() {
}
}

public synchronized void exportToJson() {
log.info("RocksDBTopicConfigManager export topic config to json file");
super.persist();
}

public String rocksdbConfigFilePath() {
return this.brokerController.getMessageStoreConfig().getStorePathRootDir() + File.separator + "config" + File.separator + "topics" + File.separator;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class PopLongPollingService extends ServiceThread {
LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
private final BrokerController brokerController;
private final NettyRequestProcessor processor;
private final ConcurrentHashMap<String, ConcurrentHashMap<String, Byte>> topicCidMap;
private final ConcurrentLinkedHashMap<String, ConcurrentHashMap<String, Byte>> topicCidMap;
private final ConcurrentLinkedHashMap<String, ConcurrentSkipListSet<PopRequest>> pollingMap;
private long lastCleanTime = 0;

Expand All @@ -63,7 +63,8 @@ public PopLongPollingService(BrokerController brokerController, NettyRequestProc
this.brokerController = brokerController;
this.processor = processor;
// 100000 topic default, 100000 lru topic + cid + qid
this.topicCidMap = new ConcurrentHashMap<>(brokerController.getBrokerConfig().getPopPollingMapSize());
this.topicCidMap = new ConcurrentLinkedHashMap.Builder<String, ConcurrentHashMap<String, Byte>>()
.maximumWeightedCapacity(this.brokerController.getBrokerConfig().getPopPollingMapSize() * 2L).build();
this.pollingMap = new ConcurrentLinkedHashMap.Builder<String, ConcurrentSkipListSet<PopRequest>>()
.maximumWeightedCapacity(this.brokerController.getBrokerConfig().getPopPollingMapSize()).build();
this.notifyLast = notifyLast;
Expand Down Expand Up @@ -350,16 +351,16 @@ private void cleanUnusedResource() {
Map.Entry<String, ConcurrentHashMap<String, Byte>> entry = topicCidMapIter.next();
String topic = entry.getKey();
if (brokerController.getTopicConfigManager().selectTopicConfig(topic) == null) {
POP_LOGGER.info("remove not exit topic {} in topicCidMap!", topic);
POP_LOGGER.info("remove nonexistent topic {} in topicCidMap!", topic);
topicCidMapIter.remove();
continue;
}
Iterator<Map.Entry<String, Byte>> cidMapIter = entry.getValue().entrySet().iterator();
while (cidMapIter.hasNext()) {
Map.Entry<String, Byte> cidEntry = cidMapIter.next();
String cid = cidEntry.getKey();
if (!brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().containsKey(cid)) {
POP_LOGGER.info("remove not exit sub {} of topic {} in topicCidMap!", cid, topic);
if (!brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(cid)) {
POP_LOGGER.info("remove nonexistent subscription group {} of topic {} in topicCidMap!", cid, topic);
cidMapIter.remove();
}
}
Expand All @@ -380,12 +381,12 @@ private void cleanUnusedResource() {
String topic = keyArray[0];
String cid = keyArray[1];
if (brokerController.getTopicConfigManager().selectTopicConfig(topic) == null) {
POP_LOGGER.info("remove not exit topic {} in pollingMap!", topic);
POP_LOGGER.info("remove nonexistent topic {} in pollingMap!", topic);
pollingMapIter.remove();
continue;
}
if (!brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().containsKey(cid)) {
POP_LOGGER.info("remove not exit sub {} of topic {} in pollingMap!", cid, topic);
if (!brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(cid)) {
POP_LOGGER.info("remove nonexistent subscription group {} of topic {} in pollingMap!", cid, topic);
pollingMapIter.remove();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ protected void autoClean() {
continue;
}

if (this.brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().get(group) == null) {
if (!this.brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(group)) {
iterator.remove();
log.info("Group not exist, Clean order info, {}:{}", topicAtGroup, qs);
continue;
Expand Down
Loading

0 comments on commit aea3ec3

Please sign in to comment.