diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index 3d3ee86b8f1..741d324e3c2 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -1634,12 +1634,18 @@ private void swapRequests() { private void doCommit() { if (!this.requestsRead.isEmpty()) { for (GroupCommitRequest req : this.requestsRead) { - // There may be a message in the next file, so a maximum of - // two times the flush - boolean flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); - for (int i = 0; i < 2 && !flushOK; i++) { + boolean flushOK = false; + for (int i = 0; i < 1000; i++) { CommitLog.this.mappedFileQueue.flush(0); flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); + if (flushOK) { + break; + } else { + try { + Thread.sleep(1); + } catch (InterruptedException ignored) { + } + } } req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT); @@ -1846,7 +1852,7 @@ public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer // Record ConsumeQueue information Long queueOffset = msgInner.getQueueOffset(); - // this msg maybe a inner-batch msg. + // this msg maybe an inner-batch msg. short messageNum = getMessageNum(msgInner); // Transaction messages that require special handling