diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRocksdbStore.java b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRocksdbStore.java index 9c940034a95..f2a617b4084 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRocksdbStore.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRocksdbStore.java @@ -28,7 +28,6 @@ import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.CompactRangeOptions; -import org.rocksdb.DBOptions; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; import org.rocksdb.RocksIterator; @@ -63,7 +62,7 @@ protected void initOptions() { this.deleteOptions = new WriteOptions(); this.deleteOptions.setSync(false); this.deleteOptions.setLowPri(true); - this.deleteOptions.setDisableWAL(true); + this.deleteOptions.setDisableWAL(false); this.deleteOptions.setNoSlowdown(false); this.compactRangeOptions = new CompactRangeOptions(); @@ -83,15 +82,11 @@ protected boolean postLoad() { initOptions(); // init column family here - ColumnFamilyOptions defaultOptions = new ColumnFamilyOptions().optimizeForSmallDb(); - ColumnFamilyOptions popStateOptions = new ColumnFamilyOptions().optimizeForSmallDb(); + ColumnFamilyOptions defaultOptions = RocksDBOptionsFactory.createPopCFOptions(); + ColumnFamilyOptions popStateOptions = RocksDBOptionsFactory.createPopCFOptions(); this.cfOptions.add(defaultOptions); this.cfOptions.add(popStateOptions); - this.options = new DBOptions() - .setCreateIfMissing(true) - .setCreateMissingColumnFamilies(true); - List cfDescriptors = new ArrayList<>(); cfDescriptors.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, defaultOptions)); cfDescriptors.add(new ColumnFamilyDescriptor(COLUMN_FAMILY_NAME, popStateOptions)); diff --git a/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java b/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java index 2fac3bf485d..5687d6a222d 100644 --- a/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java +++ b/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java @@ -131,6 +131,57 @@ public static ColumnFamilyOptions createOffsetCFOptions() { setInplaceUpdateSupport(true); } + public static ColumnFamilyOptions createPopCFOptions() { + BlockBasedTableConfig blockBasedTableConfig = new BlockBasedTableConfig() + .setFormatVersion(5) + .setIndexType(IndexType.kBinarySearch) + .setDataBlockIndexType(DataBlockIndexType.kDataBlockBinaryAndHash) + .setDataBlockHashTableUtilRatio(0.75) + .setBlockSize(32 * SizeUnit.KB) + .setMetadataBlockSize(4 * SizeUnit.KB) + .setFilterPolicy(new BloomFilter(16, false)) + .setCacheIndexAndFilterBlocks(false) + .setCacheIndexAndFilterBlocksWithHighPriority(true) + .setPinL0FilterAndIndexBlocksInCache(false) + .setPinTopLevelIndexAndFilter(true) + .setBlockCache(new LRUCache(1024 * SizeUnit.MB, 8, false)) + .setWholeKeyFiltering(true); + + CompactionOptionsUniversal compactionOption = new CompactionOptionsUniversal() + .setSizeRatio(100) + .setMaxSizeAmplificationPercent(25) + .setAllowTrivialMove(true) + .setMinMergeWidth(2) + .setMaxMergeWidth(Integer.MAX_VALUE) + .setStopStyle(CompactionStopStyle.CompactionStopStyleTotalSize) + .setCompressionSizePercent(-1); + + //noinspection resource + return new ColumnFamilyOptions() + .setMaxWriteBufferNumber(4) + .setWriteBufferSize(128 * SizeUnit.MB) + .setMinWriteBufferNumberToMerge(1) + .setTableFormatConfig(blockBasedTableConfig) + .setMemTableConfig(new SkipListMemTableConfig()) + .setCompressionType(CompressionType.NO_COMPRESSION) + .setBottommostCompressionType(CompressionType.NO_COMPRESSION) + .setNumLevels(7) + .setCompactionPriority(CompactionPriority.MinOverlappingRatio) + .setCompactionStyle(CompactionStyle.UNIVERSAL) + .setCompactionOptionsUniversal(compactionOption) + .setMaxCompactionBytes(100 * SizeUnit.GB) + .setSoftPendingCompactionBytesLimit(100 * SizeUnit.GB) + .setHardPendingCompactionBytesLimit(256 * SizeUnit.GB) + .setLevel0FileNumCompactionTrigger(2) + .setLevel0SlowdownWritesTrigger(8) + .setLevel0StopWritesTrigger(10) + .setTargetFileSizeBase(256 * SizeUnit.MB) + .setTargetFileSizeMultiplier(2) + .setMergeOperator(new StringAppendOperator()) + .setReportBgIoStats(true) + .setOptimizeFiltersForHits(true); + } + /** * Create a rocksdb db options, the user must take care to close it after closing db. * @return