Skip to content

Commit

Permalink
Fix the issue of duplicate consumption in LMQ
Browse files Browse the repository at this point in the history
  • Loading branch information
RongtongJin committed Jan 3, 2025
1 parent f32fe78 commit 44af35f
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -350,16 +350,16 @@ private void cleanUnusedResource() {
Map.Entry<String, ConcurrentHashMap<String, Byte>> entry = topicCidMapIter.next();
String topic = entry.getKey();
if (brokerController.getTopicConfigManager().selectTopicConfig(topic) == null) {
POP_LOGGER.info("remove not exit topic {} in topicCidMap!", topic);
POP_LOGGER.info("remove nonexistent topic {} in topicCidMap!", topic);
topicCidMapIter.remove();
continue;
}
Iterator<Map.Entry<String, Byte>> cidMapIter = entry.getValue().entrySet().iterator();
while (cidMapIter.hasNext()) {
Map.Entry<String, Byte> cidEntry = cidMapIter.next();
String cid = cidEntry.getKey();
if (!brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().containsKey(cid)) {
POP_LOGGER.info("remove not exit sub {} of topic {} in topicCidMap!", cid, topic);
if (!brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(cid)) {
POP_LOGGER.info("remove nonexistent subcriptionGroup {} of topic {} in topicCidMap!", cid, topic);
cidMapIter.remove();
}
}
Expand All @@ -380,12 +380,12 @@ private void cleanUnusedResource() {
String topic = keyArray[0];
String cid = keyArray[1];
if (brokerController.getTopicConfigManager().selectTopicConfig(topic) == null) {
POP_LOGGER.info("remove not exit topic {} in pollingMap!", topic);
POP_LOGGER.info("remove nonexistent topic {} in pollingMap!", topic);
pollingMapIter.remove();
continue;
}
if (!brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().containsKey(cid)) {
POP_LOGGER.info("remove not exit sub {} of topic {} in pollingMap!", cid, topic);
if (!brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(cid)) {
POP_LOGGER.info("remove nonexistent subcriptionGroup {} of topic {} in pollingMap!", cid, topic);
pollingMapIter.remove();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ protected void autoClean() {
continue;
}

if (this.brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().get(group) == null) {
if (!this.brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(group)) {
iterator.remove();
log.info("Group not exist, Clean order info, {}:{}", topicAtGroup, qs);
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ private RemotingCommand getSubscriptionGroup(ChannelHandlerContext ctx,
GetSubscriptionGroupConfigRequestHeader requestHeader = (GetSubscriptionGroupConfigRequestHeader) request.decodeCommandCustomHeader(GetSubscriptionGroupConfigRequestHeader.class);
final RemotingCommand response = RemotingCommand.createResponseCommand(null);

SubscriptionGroupConfig groupConfig = this.brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().get(requestHeader.getGroup());
SubscriptionGroupConfig groupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup());
if (groupConfig == null) {
LOGGER.error("No group in this broker, client: {} group: {}", ctx.channel().remoteAddress(), requestHeader.getGroup());
response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
Expand Down Expand Up @@ -2444,7 +2444,7 @@ private RemotingCommand consumeMessageDirectly(ChannelHandlerContext ctx,
}
// groupSysFlag
if (StringUtils.isNotEmpty(requestHeader.getConsumerGroup())) {
SubscriptionGroupConfig groupConfig = brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().get(requestHeader.getConsumerGroup());
SubscriptionGroupConfig groupConfig = brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());
if (groupConfig != null) {
request.addExtField("groupSysFlag", String.valueOf(groupConfig.getGroupSysFlag()));
}
Expand Down Expand Up @@ -2933,7 +2933,7 @@ private RemotingCommand getTopicConfig(ChannelHandlerContext ctx,
GetTopicConfigRequestHeader requestHeader = (GetTopicConfigRequestHeader) request.decodeCommandCustomHeader(GetTopicConfigRequestHeader.class);
final RemotingCommand response = RemotingCommand.createResponseCommand(null);

TopicConfig topicConfig = this.brokerController.getTopicConfigManager().getTopicConfigTable().get(requestHeader.getTopic());
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (topicConfig == null) {
LOGGER.error("No topic in this broker, client: {} topic: {}", ctx.channel().remoteAddress(), requestHeader.getTopic());
//be care of the response code, should set "not-exist" explicitly
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,12 +197,12 @@ private void scanGarbage() {
String topic = keyArray[0];
String cid = keyArray[1];
if (brokerController.getTopicConfigManager().selectTopicConfig(topic) == null) {
POP_LOGGER.info("[PopBuffer]remove not exit topic {} in buffer!", topic);
POP_LOGGER.info("[PopBuffer]remove nonexistent topic {} in buffer!", topic);
iterator.remove();
continue;
}
if (!brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().containsKey(cid)) {
POP_LOGGER.info("[PopBuffer]remove not exit sub {} of topic {} in buffer!", cid, topic);
if (!brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(cid)) {
POP_LOGGER.info("[PopBuffer]remove nonexistent subcriptionGroup {} of topic {} in buffer!", cid, topic);
iterator.remove();
continue;
}
Expand Down

0 comments on commit 44af35f

Please sign in to comment.