From f7b562829cf38376e4b99f4b224c7d0e42f0db21 Mon Sep 17 00:00:00 2001 From: LetLetMe Date: Tue, 29 Oct 2024 19:09:35 +0800 Subject: [PATCH] CreateFlushConsumeQueueService for loadAndStartConsumerServiceOnly --- .../rocketmq/common/config/AbstractRocksDBStorage.java | 1 + .../java/org/apache/rocketmq/store/DefaultMessageStore.java | 6 ++---- .../java/org/apache/rocketmq/store/RocksDBMessageStore.java | 6 +----- .../store/queue/RocksDBConsumeQueueOffsetTable.java | 2 +- 4 files changed, 5 insertions(+), 10 deletions(-) diff --git a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java index 48ba4b8086c2..e01cad081496 100644 --- a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java +++ b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java @@ -381,6 +381,7 @@ public synchronized boolean start() { public synchronized boolean shutdown() { try { if (!this.loaded) { + LOGGER.info("shutdown OK. {} is not loaded", this.dbPath); return true; } diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 9d3c46a438a8..c11fdd2804cf 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -128,7 +128,7 @@ public class DefaultMessageStore implements MessageStore { protected final ConsumeQueueStoreInterface consumeQueueStore; - private final FlushConsumeQueueService flushConsumeQueueService; + protected final FlushConsumeQueueService flushConsumeQueueService; protected final CleanCommitLogService cleanCommitLogService; @@ -517,11 +517,9 @@ public void shutdown() { if (this.compactionService != null) { this.compactionService.shutdown(); } - - if (messageStoreConfig.isRocksdbCQDoubleWriteEnable()) { + if (messageStoreConfig.isRocksdbCQDoubleWriteEnable() && this.rocksDBMessageStore != null) { this.rocksDBMessageStore.consumeQueueStore.shutdown(); } - this.flushConsumeQueueService.shutdown(); this.allocateMappedFileService.shutdown(); this.storeCheckpoint.flush(); diff --git a/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java index 321689ac8f56..83e86aab0771 100644 --- a/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java @@ -58,11 +58,6 @@ public CleanConsumeQueueService createCleanConsumeQueueService() { return new RocksDBCleanConsumeQueueService(); } - @Override - public FlushConsumeQueueService createFlushConsumeQueueService() { - return new RocksDBFlushConsumeQueueService(); - } - @Override public CorrectLogicOffsetService createCorrectLogicOffsetService() { return new RocksDBCorrectLogicOffsetService(); @@ -198,6 +193,7 @@ public void loadAndStartConsumerServiceOnly() { throw new RuntimeException("load consume queue failed"); } super.loadCheckPoint(); + this.flushConsumeQueueService.start(); this.consumeQueueStore.start(); } catch (Exception e) { ERROR_LOG.error("loadAndStartConsumerServiceOnly error", e); diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueOffsetTable.java b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueOffsetTable.java index a91ae5e244ec..cb989852fb9a 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueOffsetTable.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueOffsetTable.java @@ -144,7 +144,7 @@ private void loadMaxConsumeQueueOffsets() { Function predicate = entry -> entry.type == OffsetEntryType.MAXIMUM; Consumer fn = entry -> { topicQueueMaxCqOffset.putIfAbsent(entry.topic + "-" + entry.queueId, entry.offset); - ROCKSDB_LOG.info("Max {}:{} --> {}|{}", entry.topic, entry.queueId, entry.offset, entry.commitLogOffset); + log.info("LoadMaxConsumeQueueOffsets Max {}:{} --> {}|{}", entry.topic, entry.queueId, entry.offset, entry.commitLogOffset); }; try { forEach(predicate, fn);