diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java index 6559c211cdb..fb371dce05f 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java @@ -181,6 +181,15 @@ public PopConsumerContext addGetMessageResult(PopConsumerContext context, GetMes if (!context.isFifo() && result.getNextBeginOffset() > OFFSET_NOT_EXIST) { this.brokerController.getConsumerOffsetManager().commitPullOffset( context.getClientHost(), context.getGroupId(), topicId, queueId, result.getNextBeginOffset()); + long commitOffset = result.getStatus() == GetMessageStatus.FOUND ? offset : result.getNextBeginOffset(); + if (brokerConfig.isEnablePopBufferMerge() && popConsumerCache != null) { + long minOffset = popConsumerCache.getMinOffsetInCache(context.getGroupId(), topicId, queueId); + if (minOffset != OFFSET_NOT_EXIST) { + commitOffset = minOffset; + } + } + this.brokerController.getConsumerOffsetManager().commitOffset( + context.getClientHost(), context.getGroupId(), topicId, queueId, commitOffset); } return context; @@ -308,9 +317,6 @@ public CompletableFuture popAsync(String clientHost, long po CompletableFuture.completedFuture(popConsumerContext); try { - GetMessageResult finalGetMessageResult = new GetMessageResult(); - finalGetMessageResult.setStatus(GetMessageStatus.NO_MESSAGE_IN_QUEUE); - if (!fifo && preferRetry) { if (brokerConfig.isRetrieveMessageFromPopRetryTopicV1()) { getMessageFuture = this.getMessageAsync(getMessageFuture, clientHost, groupId, @@ -493,7 +499,7 @@ public long revive(long currentTime, int maxCount) { record.setInvisibleTime(record.getInvisibleTime() + backoffInterval); record.setAttemptTimes(record.getAttemptTimes() + 1); failureList.add(record); - log.error("PopConsumerService revive backoff retry, record={}", record); + log.warn("PopConsumerService revive backoff retry, record={}", record); } else { log.error("PopConsumerService drop record, message may be lost, record={}", record); } @@ -505,9 +511,14 @@ public long revive(long currentTime, int maxCount) { this.popConsumerStore.writeRecords(new ArrayList<>(failureList)); this.popConsumerStore.deleteRecords(consumerRecords); - log.info("PopConsumerService, key size={}, cache size={}, revive count={}, failure count={}, cost={}ms", - popConsumerCache.getCacheKeySize(), popConsumerCache.getCacheSize(), consumerRecords.size(), - failureList.size(), stopwatch.elapsed(TimeUnit.MILLISECONDS)); + if (brokerConfig.isEnablePopBufferMerge()) { + log.info("PopConsumerService, key size={}, cache size={}, revive count={}, failure count={}, cost={}ms", + popConsumerCache.getCacheKeySize(), popConsumerCache.getCacheSize(), consumerRecords.size(), + failureList.size(), stopwatch.elapsed(TimeUnit.MILLISECONDS)); + } else { + log.info("PopConsumerService, revive count={}, failure count={}, cost={}ms", + consumerRecords.size(), failureList.size(), stopwatch.elapsed(TimeUnit.MILLISECONDS)); + } return consumerRecords.size(); } @@ -695,6 +706,7 @@ public void run() { } } catch (Exception e) { log.error("PopConsumerService revive error", e); + this.waitForRunning(500); } } this.consumerRunning.set(false);