Skip to content

Commit

Permalink
fix error log and speedup commit offset
Browse files Browse the repository at this point in the history
  • Loading branch information
lizhimins committed Dec 20, 2024
1 parent 940bfdd commit a83babf
Showing 1 changed file with 19 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,15 @@ public PopConsumerContext addGetMessageResult(PopConsumerContext context, GetMes
if (!context.isFifo() && result.getNextBeginOffset() > OFFSET_NOT_EXIST) {
this.brokerController.getConsumerOffsetManager().commitPullOffset(
context.getClientHost(), context.getGroupId(), topicId, queueId, result.getNextBeginOffset());
long commitOffset = result.getStatus() == GetMessageStatus.FOUND ? offset : result.getNextBeginOffset();
if (brokerConfig.isEnablePopBufferMerge() && popConsumerCache != null) {
long minOffset = popConsumerCache.getMinOffsetInCache(context.getGroupId(), topicId, queueId);
if (minOffset != OFFSET_NOT_EXIST) {
commitOffset = minOffset;
}
}
this.brokerController.getConsumerOffsetManager().commitOffset(
context.getClientHost(), context.getGroupId(), topicId, queueId, commitOffset);
}

return context;
Expand Down Expand Up @@ -308,9 +317,6 @@ public CompletableFuture<PopConsumerContext> popAsync(String clientHost, long po
CompletableFuture.completedFuture(popConsumerContext);

try {
GetMessageResult finalGetMessageResult = new GetMessageResult();
finalGetMessageResult.setStatus(GetMessageStatus.NO_MESSAGE_IN_QUEUE);

if (!fifo && preferRetry) {
if (brokerConfig.isRetrieveMessageFromPopRetryTopicV1()) {
getMessageFuture = this.getMessageAsync(getMessageFuture, clientHost, groupId,
Expand Down Expand Up @@ -493,7 +499,7 @@ public long revive(long currentTime, int maxCount) {
record.setInvisibleTime(record.getInvisibleTime() + backoffInterval);
record.setAttemptTimes(record.getAttemptTimes() + 1);
failureList.add(record);
log.error("PopConsumerService revive backoff retry, record={}", record);
log.warn("PopConsumerService revive backoff retry, record={}", record);
} else {
log.error("PopConsumerService drop record, message may be lost, record={}", record);
}
Expand All @@ -505,9 +511,14 @@ public long revive(long currentTime, int maxCount) {
this.popConsumerStore.writeRecords(new ArrayList<>(failureList));
this.popConsumerStore.deleteRecords(consumerRecords);

log.info("PopConsumerService, key size={}, cache size={}, revive count={}, failure count={}, cost={}ms",
popConsumerCache.getCacheKeySize(), popConsumerCache.getCacheSize(), consumerRecords.size(),
failureList.size(), stopwatch.elapsed(TimeUnit.MILLISECONDS));
if (brokerConfig.isEnablePopBufferMerge()) {
log.info("PopConsumerService, key size={}, cache size={}, revive count={}, failure count={}, cost={}ms",
popConsumerCache.getCacheKeySize(), popConsumerCache.getCacheSize(), consumerRecords.size(),
failureList.size(), stopwatch.elapsed(TimeUnit.MILLISECONDS));
} else {
log.info("PopConsumerService, revive count={}, failure count={}, cost={}ms",
consumerRecords.size(), failureList.size(), stopwatch.elapsed(TimeUnit.MILLISECONDS));
}

return consumerRecords.size();
}
Expand Down Expand Up @@ -695,6 +706,7 @@ public void run() {
}
} catch (Exception e) {
log.error("PopConsumerService revive error", e);
this.waitForRunning(500);
}
}
this.consumerRunning.set(false);
Expand Down

0 comments on commit a83babf

Please sign in to comment.