From 810ca7e1f693685b9b4087c01f3a6c2f26ab908f Mon Sep 17 00:00:00 2001
From: lizhimins <707364882@qq.com>
Date: Fri, 3 Nov 2023 12:00:38 +0800
Subject: [PATCH] [ISSUE #7545] [RIP-65] Support efficient random index for
massive messages
---
style/spotbugs-suppressions.xml | 2 +-
tieredstore/pom.xml | 14 +
.../tieredstore/TieredMessageFetcher.java | 103 ++--
.../file/CompositeQueueFlatFile.java | 29 +-
.../tieredstore/file/TieredConsumeQueue.java | 2 +-
.../tieredstore/file/TieredFlatFile.java | 5 +-
.../file/TieredFlatFileManager.java | 40 +-
.../tieredstore/file/TieredIndexFile.java | 470 -----------------
.../rocketmq/tieredstore/index/IndexFile.java | 35 ++
.../rocketmq/tieredstore/index/IndexItem.java | 114 ++++
.../tieredstore/index/IndexService.java | 62 +++
.../tieredstore/index/IndexStoreFile.java | 497 ++++++++++++++++++
.../tieredstore/index/IndexStoreService.java | 357 +++++++++++++
.../provider/TieredFileSegment.java | 9 +-
.../provider/TieredStoreProvider.java | 10 +-
.../provider/posix/PosixFileSegment.java | 3 +-
.../tieredstore/TieredMessageFetcherTest.java | 17 +-
.../tieredstore/file/TieredIndexFileTest.java | 93 ----
.../tieredstore/index/IndexItemTest.java | 91 ++++
.../tieredstore/index/IndexStoreFileTest.java | 278 ++++++++++
.../index/IndexStoreServiceBenchTest.java | 145 +++++
.../index/IndexStoreServiceTest.java | 310 +++++++++++
.../util/MessageBufferUtilTest.java | 1 -
.../src/test/resources/rmq.logback-test.xml | 15 +-
24 files changed, 2004 insertions(+), 698 deletions(-)
delete mode 100644 tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredIndexFile.java
create mode 100644 tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexFile.java
create mode 100644 tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexItem.java
create mode 100644 tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexService.java
create mode 100644 tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
create mode 100644 tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
delete mode 100644 tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java
create mode 100644 tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexItemTest.java
create mode 100644 tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreFileTest.java
create mode 100644 tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceBenchTest.java
create mode 100644 tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceTest.java
diff --git a/style/spotbugs-suppressions.xml b/style/spotbugs-suppressions.xml
index 5778695e1e4..6443e029faf 100644
--- a/style/spotbugs-suppressions.xml
+++ b/style/spotbugs-suppressions.xml
@@ -31,7 +31,7 @@
-
+
diff --git a/tieredstore/pom.xml b/tieredstore/pom.xml
index b2ea40bf3a3..9f2a8bf2283 100644
--- a/tieredstore/pom.xml
+++ b/tieredstore/pom.xml
@@ -53,5 +53,19 @@
commons-io
test
+
+
+ org.openjdk.jmh
+ jmh-core
+ 1.36
+ provided
+
+
+
+ org.openjdk.jmh
+ jmh-generator-annprocess
+ 1.36
+ provided
+
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
index c948fa3fa17..f739773eb34 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
@@ -31,6 +31,7 @@
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
@@ -50,7 +51,8 @@
import org.apache.rocketmq.tieredstore.file.CompositeQueueFlatFile;
import org.apache.rocketmq.tieredstore.file.TieredConsumeQueue;
import org.apache.rocketmq.tieredstore.file.TieredFlatFileManager;
-import org.apache.rocketmq.tieredstore.file.TieredIndexFile;
+import org.apache.rocketmq.tieredstore.index.IndexItem;
+import org.apache.rocketmq.tieredstore.index.IndexService;
import org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore;
import org.apache.rocketmq.tieredstore.metadata.TopicMetadata;
import org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant;
@@ -58,7 +60,6 @@
import org.apache.rocketmq.tieredstore.util.CQItemBufferUtil;
import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
-import org.apache.rocketmq.common.BoundaryType;
public class TieredMessageFetcher implements MessageStoreFetcher {
@@ -555,85 +556,51 @@ public long getOffsetInQueueByTime(String topic, int queueId, long timestamp, Bo
public CompletableFuture queryMessageAsync(
String topic, String key, int maxCount, long begin, long end) {
- TieredIndexFile indexFile = TieredFlatFileManager.getIndexFile(storeConfig);
+ IndexService indexStoreService = TieredFlatFileManager.getTieredIndexService(storeConfig);
- int hashCode = TieredIndexFile.indexKeyHashMethod(TieredIndexFile.buildKey(topic, key));
long topicId;
try {
TopicMetadata topicMetadata = metadataStore.getTopic(topic);
if (topicMetadata == null) {
- LOGGER.info("TieredMessageFetcher#queryMessageAsync, topic metadata not found, topic: {}", topic);
+ LOGGER.info("MessageFetcher#queryMessageAsync, topic metadata not found, topic: {}", topic);
return CompletableFuture.completedFuture(new QueryMessageResult());
}
topicId = topicMetadata.getTopicId();
} catch (Exception e) {
- LOGGER.error("TieredMessageFetcher#queryMessageAsync, get topic id failed, topic: {}", topic, e);
+ LOGGER.error("MessageFetcher#queryMessageAsync, get topic id failed, topic: {}", topic, e);
return CompletableFuture.completedFuture(new QueryMessageResult());
}
- return indexFile.queryAsync(topic, key, begin, end)
- .thenCompose(indexBufferList -> {
- QueryMessageResult result = new QueryMessageResult();
- int resultCount = 0;
- List> futureList = new ArrayList<>(maxCount);
- for (Pair pair : indexBufferList) {
- Long fileBeginTimestamp = pair.getKey();
- ByteBuffer indexBuffer = pair.getValue();
-
- if (indexBuffer.remaining() % TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE != 0) {
- LOGGER.error("[Bug] TieredMessageFetcher#queryMessageAsync: " +
- "index buffer size {} is not multiple of index item size {}",
- indexBuffer.remaining(), TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE);
- continue;
- }
-
- for (int indexOffset = indexBuffer.position();
- indexOffset < indexBuffer.limit();
- indexOffset += TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE) {
-
- int indexItemHashCode = indexBuffer.getInt(indexOffset);
- if (indexItemHashCode != hashCode) {
- continue;
- }
-
- int indexItemTopicId = indexBuffer.getInt(indexOffset + 4);
- if (indexItemTopicId != topicId) {
- continue;
- }
-
- int queueId = indexBuffer.getInt(indexOffset + 4 + 4);
- CompositeFlatFile flatFile =
- flatFileManager.getFlatFile(new MessageQueue(topic, brokerName, queueId));
- if (flatFile == null) {
- continue;
- }
-
- // decode index item
- long offset = indexBuffer.getLong(indexOffset + 4 + 4 + 4);
- int size = indexBuffer.getInt(indexOffset + 4 + 4 + 4 + 8);
- int timeDiff = indexBuffer.getInt(indexOffset + 4 + 4 + 4 + 8 + 4);
- long indexTimestamp = fileBeginTimestamp + timeDiff;
- if (indexTimestamp < begin || indexTimestamp > end) {
- continue;
- }
+ CompletableFuture> future = indexStoreService.queryAsync(topic, key, maxCount, begin, end);
- CompletableFuture getMessageFuture = flatFile.getCommitLogAsync(offset, size)
- .thenAccept(messageBuffer -> result.addMessage(
- new SelectMappedBufferResult(0, messageBuffer, size, null)));
- futureList.add(getMessageFuture);
-
- resultCount++;
- if (resultCount >= maxCount) {
- break;
- }
- }
-
- if (resultCount >= maxCount) {
- break;
- }
+ return future.thenCompose(indexItemList -> {
+ QueryMessageResult result = new QueryMessageResult();
+ List> futureList = new ArrayList<>(maxCount);
+ for (IndexItem indexItem : indexItemList) {
+ if (topicId != indexItem.getTopicId()) {
+ continue;
}
- return CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0]))
- .thenApply(v -> result);
- });
+ CompositeFlatFile flatFile =
+ flatFileManager.getFlatFile(new MessageQueue(topic, brokerName, indexItem.getQueueId()));
+ if (flatFile == null) {
+ continue;
+ }
+ CompletableFuture getMessageFuture = flatFile
+ .getCommitLogAsync(indexItem.getOffset(), indexItem.getSize())
+ .thenAccept(messageBuffer -> result.addMessage(
+ new SelectMappedBufferResult(
+ indexItem.getOffset(), messageBuffer, indexItem.getSize(), null)));
+ futureList.add(getMessageFuture);
+ if (futureList.size() >= maxCount) {
+ break;
+ }
+ }
+ return CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).thenApply(v -> result);
+ }).whenComplete((result, throwable) -> {
+ if (result != null) {
+ LOGGER.info("MessageFetcher#queryMessageAsync, query result: {}, topic: {}, topicId: {}, key: {}, maxCount: {}, timestamp: {}-{}",
+ result.getMessageBufferList().size(), topic, topicId, key, maxCount, begin, end);
+ }
+ });
}
}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java
index 0a797f465f1..67d2cf06462 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java
@@ -17,11 +17,15 @@
package org.apache.rocketmq.tieredstore.file;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.store.DispatchRequest;
import org.apache.rocketmq.tieredstore.common.AppendResult;
+import org.apache.rocketmq.tieredstore.index.IndexService;
import org.apache.rocketmq.tieredstore.metadata.QueueMetadata;
import org.apache.rocketmq.tieredstore.metadata.TopicMetadata;
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
@@ -31,13 +35,13 @@ public class CompositeQueueFlatFile extends CompositeFlatFile {
private final MessageQueue messageQueue;
private long topicSequenceNumber;
private QueueMetadata queueMetadata;
- private final TieredIndexFile indexFile;
+ private final IndexService indexStoreService;
public CompositeQueueFlatFile(TieredFileAllocator fileQueueFactory, MessageQueue messageQueue) {
super(fileQueueFactory, TieredStoreUtil.toPath(messageQueue));
this.messageQueue = messageQueue;
this.recoverQueueMetadata();
- this.indexFile = TieredFlatFileManager.getIndexFile(storeConfig);
+ this.indexStoreService = TieredFlatFileManager.getTieredIndexService(storeConfig);
}
@Override
@@ -85,24 +89,15 @@ public AppendResult appendIndexFile(DispatchRequest request) {
return AppendResult.FILE_CLOSED;
}
+ Set keySet = new HashSet<>(
+ Arrays.asList(request.getKeys().split(MessageConst.KEY_SEPARATOR)));
if (StringUtils.isNotBlank(request.getUniqKey())) {
- AppendResult result = indexFile.append(messageQueue, (int) topicSequenceNumber,
- request.getUniqKey(), request.getCommitLogOffset(), request.getMsgSize(), request.getStoreTimestamp());
- if (result != AppendResult.SUCCESS) {
- return result;
- }
+ keySet.add(request.getUniqKey());
}
- for (String key : request.getKeys().split(MessageConst.KEY_SEPARATOR)) {
- if (StringUtils.isNotBlank(key)) {
- AppendResult result = indexFile.append(messageQueue, (int) topicSequenceNumber,
- key, request.getCommitLogOffset(), request.getMsgSize(), request.getStoreTimestamp());
- if (result != AppendResult.SUCCESS) {
- return result;
- }
- }
- }
- return AppendResult.SUCCESS;
+ return indexStoreService.putKey(
+ messageQueue.getTopic(), (int) topicSequenceNumber, messageQueue.getQueueId(), keySet,
+ request.getCommitLogOffset(), request.getMsgSize(), request.getStoreTimestamp());
}
public MessageQueue getMessageQueue() {
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredConsumeQueue.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredConsumeQueue.java
index 35007f8cbfa..6953db032d6 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredConsumeQueue.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredConsumeQueue.java
@@ -20,9 +20,9 @@
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.tieredstore.common.AppendResult;
import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
-import org.apache.rocketmq.common.BoundaryType;
public class TieredConsumeQueue {
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
index d96eb6e8f3c..a41d562d108 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
@@ -141,7 +141,6 @@ public FileSegmentType getFileType() {
return fileType;
}
- @VisibleForTesting
public List getFileSegmentList() {
return fileSegmentList;
}
@@ -274,7 +273,7 @@ public int getFileSegmentCount() {
}
@Nullable
- protected TieredFileSegment getFileByIndex(int index) {
+ public TieredFileSegment getFileByIndex(int index) {
fileSegmentLock.readLock().lock();
try {
if (index < fileSegmentList.size()) {
@@ -354,7 +353,7 @@ protected TieredFileSegment getFileByTime(long timestamp, BoundaryType boundaryT
}
}
- protected List getFileListByTime(long beginTime, long endTime) {
+ public List getFileListByTime(long beginTime, long endTime) {
fileSegmentLock.readLock().lock();
try {
return fileSegmentList.stream()
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java
index 087ea8c9ce6..ffe0836f126 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java
@@ -34,6 +34,8 @@
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
+import org.apache.rocketmq.tieredstore.index.IndexService;
+import org.apache.rocketmq.tieredstore.index.IndexStoreService;
import org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore;
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
@@ -43,7 +45,7 @@ public class TieredFlatFileManager {
private static final Logger logger = LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME);
private static volatile TieredFlatFileManager instance;
- private static volatile TieredIndexFile indexFile;
+ private static volatile IndexStoreService indexStoreService;
private final TieredMetadataStore metadataStore;
private final TieredMessageStoreConfig storeConfig;
@@ -76,25 +78,26 @@ public static TieredFlatFileManager getInstance(TieredMessageStoreConfig storeCo
return instance;
}
- public static TieredIndexFile getIndexFile(TieredMessageStoreConfig storeConfig) {
+ public static IndexService getTieredIndexService(TieredMessageStoreConfig storeConfig) {
if (storeConfig == null) {
- return indexFile;
+ return indexStoreService;
}
- if (indexFile == null) {
+ if (indexStoreService == null) {
synchronized (TieredFlatFileManager.class) {
- if (indexFile == null) {
+ if (indexStoreService == null) {
try {
String filePath = TieredStoreUtil.toPath(new MessageQueue(
TieredStoreUtil.RMQ_SYS_TIERED_STORE_INDEX_TOPIC, storeConfig.getBrokerName(), 0));
- indexFile = new TieredIndexFile(new TieredFileAllocator(storeConfig), filePath);
+ indexStoreService = new IndexStoreService(new TieredFileAllocator(storeConfig), filePath);
+ indexStoreService.start();
} catch (Exception e) {
logger.error("Construct FlatFileManager indexFile error", e);
}
}
}
}
- return indexFile;
+ return indexStoreService;
}
public void doCommit() {
@@ -120,15 +123,6 @@ public void doCommit() {
}
}, delay, TimeUnit.MILLISECONDS);
}
- TieredStoreExecutor.commitExecutor.schedule(() -> {
- try {
- if (indexFile != null) {
- indexFile.commit(true);
- }
- } catch (Throwable e) {
- logger.error("Commit indexFile periodically failed", e);
- }
- }, 0, TimeUnit.MILLISECONDS);
}
public void doCleanExpiredFile() {
@@ -148,10 +142,6 @@ public void doCleanExpiredFile() {
}
});
}
- if (indexFile != null) {
- indexFile.cleanExpiredFile(expiredTimeStamp);
- indexFile.destroyExpiredFile();
- }
}
private void doScheduleTask() {
@@ -244,7 +234,7 @@ public void cleanup() {
private static void cleanStaticReference() {
instance = null;
- indexFile = null;
+ indexStoreService = null;
}
@Nullable
@@ -271,8 +261,8 @@ public ImmutableList deepCopyFlatFileToList() {
}
public void shutdown() {
- if (indexFile != null) {
- indexFile.commit(true);
+ if (indexStoreService != null) {
+ indexStoreService.shutdown();
}
for (CompositeFlatFile flatFile : deepCopyFlatFileToList()) {
flatFile.shutdown();
@@ -280,8 +270,8 @@ public void shutdown() {
}
public void destroy() {
- if (indexFile != null) {
- indexFile.destroy();
+ if (indexStoreService != null) {
+ indexStoreService.destroy();
}
ImmutableList flatFileList = deepCopyFlatFileToList();
cleanup();
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredIndexFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredIndexFile.java
deleted file mode 100644
index eda5e010657..00000000000
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredIndexFile.java
+++ /dev/null
@@ -1,470 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.tieredstore.file;
-
-import com.google.common.annotations.VisibleForTesting;
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.MappedByteBuffer;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantLock;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.logging.org.slf4j.Logger;
-import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
-import org.apache.rocketmq.store.index.IndexHeader;
-import org.apache.rocketmq.store.logfile.DefaultMappedFile;
-import org.apache.rocketmq.store.logfile.MappedFile;
-import org.apache.rocketmq.tieredstore.common.AppendResult;
-import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
-import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
-import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
-import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
-
-public class TieredIndexFile {
-
- private static final Logger logger = LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME);
-
- // header format:
- // magic code(4) + begin timestamp(8) + end timestamp(8) + slot num(4) + index num(4)
- public static final int INDEX_FILE_HEADER_MAGIC_CODE_POSITION = 0;
- public static final int INDEX_FILE_HEADER_BEGIN_TIME_STAMP_POSITION = 4;
- public static final int INDEX_FILE_HEADER_END_TIME_STAMP_POSITION = 12;
- public static final int INDEX_FILE_HEADER_SLOT_NUM_POSITION = 20;
- public static final int INDEX_FILE_HEADER_INDEX_NUM_POSITION = 24;
- public static final int INDEX_FILE_HEADER_SIZE = 28;
-
- // index item
- public static final int INDEX_FILE_BEGIN_MAGIC_CODE = 0xCCDDEEFF ^ 1880681586 + 4;
- public static final int INDEX_FILE_END_MAGIC_CODE = 0xCCDDEEFF ^ 1880681586 + 8;
- public static final int INDEX_FILE_HASH_SLOT_SIZE = 8;
- public static final int INDEX_FILE_HASH_ORIGIN_INDEX_SIZE = 32;
- public static final int INDEX_FILE_HASH_COMPACT_INDEX_SIZE = 28;
-
- private static final String INDEX_FILE_DIR_NAME = "tiered_index_file";
- private static final String CUR_INDEX_FILE_NAME = "0000";
- private static final String PRE_INDEX_FILE_NAME = "1111";
- private static final String COMPACT_FILE_NAME = "2222";
-
- private final TieredMessageStoreConfig storeConfig;
- private final TieredFlatFile flatFile;
- private final int maxHashSlotNum;
- private final int maxIndexNum;
- private final int fileMaxSize;
- private final String curFilePath;
- private final String preFilepath;
- private MappedFile preMappedFile;
- private MappedFile curMappedFile;
-
- private final ReentrantLock curFileLock = new ReentrantLock();
- private Future inflightCompactFuture = CompletableFuture.completedFuture(null);
-
- protected TieredIndexFile(TieredFileAllocator fileQueueFactory, String filePath) throws IOException {
- this.storeConfig = fileQueueFactory.getStoreConfig();
- this.flatFile = fileQueueFactory.createFlatFileForIndexFile(filePath);
- if (flatFile.getBaseOffset() == -1) {
- flatFile.setBaseOffset(0);
- }
- this.maxHashSlotNum = storeConfig.getTieredStoreIndexFileMaxHashSlotNum();
- this.maxIndexNum = storeConfig.getTieredStoreIndexFileMaxIndexNum();
- this.fileMaxSize = IndexHeader.INDEX_HEADER_SIZE
- + this.maxHashSlotNum * INDEX_FILE_HASH_SLOT_SIZE
- + this.maxIndexNum * INDEX_FILE_HASH_ORIGIN_INDEX_SIZE
- + 4;
- this.curFilePath = Paths.get(
- storeConfig.getStorePathRootDir(), INDEX_FILE_DIR_NAME, CUR_INDEX_FILE_NAME).toString();
- this.preFilepath = Paths.get(
- storeConfig.getStorePathRootDir(), INDEX_FILE_DIR_NAME, PRE_INDEX_FILE_NAME).toString();
- initFile();
- TieredStoreExecutor.commonScheduledExecutor.scheduleWithFixedDelay(
- this::doScheduleTask, 10, 10, TimeUnit.SECONDS);
- }
-
- protected void doScheduleTask() {
- try {
- curFileLock.lock();
- try {
- synchronized (TieredIndexFile.class) {
- MappedByteBuffer mappedByteBuffer = curMappedFile.getMappedByteBuffer();
- int indexNum = mappedByteBuffer.getInt(INDEX_FILE_HEADER_INDEX_NUM_POSITION);
- long lastIndexTime = mappedByteBuffer.getLong(INDEX_FILE_HEADER_END_TIME_STAMP_POSITION);
- if (indexNum > 0 &&
- System.currentTimeMillis() - lastIndexTime >
- storeConfig.getTieredStoreIndexFileRollingIdleInterval()) {
- mappedByteBuffer.putInt(fileMaxSize - 4, INDEX_FILE_END_MAGIC_CODE);
- rollingFile();
- }
- if (inflightCompactFuture.isDone() && preMappedFile != null && preMappedFile.isAvailable()) {
- inflightCompactFuture = TieredStoreExecutor.compactIndexFileExecutor.submit(
- new CompactTask(storeConfig, preMappedFile, flatFile), null);
- }
- }
- } finally {
- curFileLock.unlock();
- }
- } catch (Throwable throwable) {
- logger.error("TieredIndexFile: submit compact index file task failed:", throwable);
- }
- }
-
- private static boolean isFileSealed(MappedFile mappedFile) {
- return mappedFile.getMappedByteBuffer().getInt(mappedFile.getFileSize() - 4) == INDEX_FILE_END_MAGIC_CODE;
- }
-
- private void initIndexFileHeader(MappedFile mappedFile) {
- MappedByteBuffer mappedByteBuffer = mappedFile.getMappedByteBuffer();
- if (mappedByteBuffer.getInt(0) != INDEX_FILE_BEGIN_MAGIC_CODE) {
- mappedByteBuffer.putInt(INDEX_FILE_HEADER_MAGIC_CODE_POSITION, INDEX_FILE_BEGIN_MAGIC_CODE);
- mappedByteBuffer.putLong(INDEX_FILE_HEADER_BEGIN_TIME_STAMP_POSITION, -1L);
- mappedByteBuffer.putLong(INDEX_FILE_HEADER_END_TIME_STAMP_POSITION, -1L);
- mappedByteBuffer.putInt(INDEX_FILE_HEADER_SLOT_NUM_POSITION, 0);
- mappedByteBuffer.putInt(INDEX_FILE_HEADER_INDEX_NUM_POSITION, 0);
- for (int i = 0; i < maxHashSlotNum; i++) {
- mappedByteBuffer.putInt(INDEX_FILE_HEADER_SIZE + i * INDEX_FILE_HASH_SLOT_SIZE, -1);
- }
- mappedByteBuffer.putInt(fileMaxSize - 4, -1);
- }
- }
-
- @VisibleForTesting
- public MappedFile getPreMappedFile() {
- return preMappedFile;
- }
-
- private void initFile() throws IOException {
- curMappedFile = new DefaultMappedFile(curFilePath, fileMaxSize);
- initIndexFileHeader(curMappedFile);
- File preFile = new File(preFilepath);
- boolean preFileExists = preFile.exists();
- if (preFileExists) {
- preMappedFile = new DefaultMappedFile(preFilepath, fileMaxSize);
- }
-
- if (isFileSealed(curMappedFile)) {
- if (preFileExists) {
- if (preFile.delete()) {
- logger.info("Pre IndexFile deleted success", preFilepath);
- } else {
- logger.error("Pre IndexFile deleted failed", preFilepath);
- }
- }
- boolean rename = curMappedFile.renameTo(preFilepath);
- if (rename) {
- preMappedFile = curMappedFile;
- curMappedFile = new DefaultMappedFile(curFilePath, fileMaxSize);
- initIndexFileHeader(curMappedFile);
- preFileExists = true;
- }
- }
-
- if (preFileExists) {
- synchronized (TieredIndexFile.class) {
- if (inflightCompactFuture.isDone()) {
- inflightCompactFuture = TieredStoreExecutor.compactIndexFileExecutor.submit(
- new CompactTask(storeConfig, preMappedFile, flatFile), null);
- }
- }
- }
- }
-
- public AppendResult append(MessageQueue mq, int topicId, String key, long offset, int size, long timeStamp) {
- return putKey(mq, topicId, indexKeyHashMethod(buildKey(mq.getTopic(), key)), offset, size, timeStamp);
- }
-
- private boolean rollingFile() throws IOException {
- File preFile = new File(preFilepath);
- boolean preFileExists = preFile.exists();
- if (!preFileExists) {
- boolean rename = curMappedFile.renameTo(preFilepath);
- if (rename) {
- preMappedFile = curMappedFile;
- curMappedFile = new DefaultMappedFile(curFilePath, fileMaxSize);
- initIndexFileHeader(curMappedFile);
- tryToCompactPreFile();
- return true;
- } else {
- logger.error("TieredIndexFile#rollingFile: rename current file failed");
- return false;
- }
- }
- tryToCompactPreFile();
- return false;
- }
-
- private void tryToCompactPreFile() throws IOException {
- synchronized (TieredIndexFile.class) {
- if (inflightCompactFuture.isDone()) {
- inflightCompactFuture = TieredStoreExecutor.compactIndexFileExecutor.submit(new CompactTask(storeConfig, preMappedFile, flatFile), null);
- }
- }
- }
-
- private AppendResult putKey(MessageQueue mq, int topicId, int hashCode, long offset, int size, long timeStamp) {
- curFileLock.lock();
- try {
- if (isFileSealed(curMappedFile) && !rollingFile()) {
- return AppendResult.FILE_FULL;
- }
-
- MappedByteBuffer mappedByteBuffer = curMappedFile.getMappedByteBuffer();
-
- int slotPosition = hashCode % maxHashSlotNum;
- int slotOffset = INDEX_FILE_HEADER_SIZE + slotPosition * INDEX_FILE_HASH_SLOT_SIZE;
-
- int slotValue = mappedByteBuffer.getInt(slotOffset);
-
- long beginTimeStamp = mappedByteBuffer.getLong(INDEX_FILE_HEADER_BEGIN_TIME_STAMP_POSITION);
- if (beginTimeStamp == -1) {
- mappedByteBuffer.putLong(INDEX_FILE_HEADER_BEGIN_TIME_STAMP_POSITION, timeStamp);
- beginTimeStamp = timeStamp;
- }
-
- int indexCount = mappedByteBuffer.getInt(INDEX_FILE_HEADER_INDEX_NUM_POSITION);
- int indexOffset = INDEX_FILE_HEADER_SIZE + maxHashSlotNum * INDEX_FILE_HASH_SLOT_SIZE
- + indexCount * INDEX_FILE_HASH_ORIGIN_INDEX_SIZE;
-
- int timeDiff = (int) (timeStamp - beginTimeStamp);
-
- // put hash index
- mappedByteBuffer.putInt(indexOffset, hashCode);
- mappedByteBuffer.putInt(indexOffset + 4, topicId);
- mappedByteBuffer.putInt(indexOffset + 4 + 4, mq.getQueueId());
- mappedByteBuffer.putLong(indexOffset + 4 + 4 + 4, offset);
- mappedByteBuffer.putInt(indexOffset + 4 + 4 + 4 + 8, size);
- mappedByteBuffer.putInt(indexOffset + 4 + 4 + 4 + 8 + 4, timeDiff);
- mappedByteBuffer.putInt(indexOffset + 4 + 4 + 4 + 8 + 4 + 4, slotValue);
-
- // put hash slot
- mappedByteBuffer.putInt(slotOffset, indexCount);
-
- // put header
- indexCount += 1;
- mappedByteBuffer.putInt(INDEX_FILE_HEADER_INDEX_NUM_POSITION, indexCount);
- mappedByteBuffer.putLong(INDEX_FILE_HEADER_END_TIME_STAMP_POSITION, timeStamp);
- if (indexCount == maxIndexNum) {
- mappedByteBuffer.putInt(fileMaxSize - 4, INDEX_FILE_END_MAGIC_CODE);
- rollingFile();
- }
- return AppendResult.SUCCESS;
- } catch (Exception e) {
- logger.error("TieredIndexFile#putKey: put key failed:", e);
- return AppendResult.IO_ERROR;
- } finally {
- curFileLock.unlock();
- }
- }
-
- public CompletableFuture>> queryAsync(String topic, String key, long beginTime,
- long endTime) {
- int hashCode = indexKeyHashMethod(buildKey(topic, key));
- int slotPosition = hashCode % maxHashSlotNum;
- List fileSegmentList = flatFile.getFileListByTime(beginTime, endTime);
- CompletableFuture>> future = null;
- for (int i = fileSegmentList.size() - 1; i >= 0; i--) {
- TieredFileSegment fileSegment = fileSegmentList.get(i);
- CompletableFuture tmpFuture = fileSegment.readAsync(INDEX_FILE_HEADER_SIZE + (long) slotPosition * INDEX_FILE_HASH_SLOT_SIZE, INDEX_FILE_HASH_SLOT_SIZE)
- .thenCompose(slotBuffer -> {
- int indexPosition = slotBuffer.getInt();
- if (indexPosition == -1) {
- return CompletableFuture.completedFuture(null);
- }
-
- int indexSize = slotBuffer.getInt();
- if (indexSize <= 0) {
- return CompletableFuture.completedFuture(null);
- }
- return fileSegment.readAsync(indexPosition, indexSize);
- });
- if (future == null) {
- future = tmpFuture.thenApply(indexBuffer -> {
- List> result = new ArrayList<>();
- if (indexBuffer != null) {
- result.add(Pair.of(fileSegment.getMinTimestamp(), indexBuffer));
- }
- return result;
- });
- } else {
- future = future.thenCombine(tmpFuture, (indexList, indexBuffer) -> {
- if (indexBuffer != null) {
- indexList.add(Pair.of(fileSegment.getMinTimestamp(), indexBuffer));
- }
- return indexList;
- });
- }
- }
- return future == null ? CompletableFuture.completedFuture(new ArrayList<>()) : future;
- }
-
- public static String buildKey(String topic, String key) {
- return topic + "#" + key;
- }
-
- public static int indexKeyHashMethod(String key) {
- int keyHash = key.hashCode();
- int keyHashPositive = Math.abs(keyHash);
- if (keyHashPositive < 0)
- keyHashPositive = 0;
- return keyHashPositive;
- }
-
- public void commit(boolean sync) {
- flatFile.commit(sync);
- if (sync) {
- try {
- inflightCompactFuture.get();
- } catch (Exception ignore) {
- }
- }
- }
-
- public void cleanExpiredFile(long expireTimestamp) {
- flatFile.cleanExpiredFile(expireTimestamp);
- }
-
- public void destroyExpiredFile() {
- flatFile.destroyExpiredFile();
- }
-
- public void destroy() {
- inflightCompactFuture.cancel(true);
- if (preMappedFile != null) {
- preMappedFile.destroy(-1);
- }
- if (curMappedFile != null) {
- curMappedFile.destroy(-1);
- }
- String compactFilePath = storeConfig.getStorePathRootDir() + File.separator + INDEX_FILE_DIR_NAME + File.separator + COMPACT_FILE_NAME;
- File compactFile = new File(compactFilePath);
- if (compactFile.exists()) {
- compactFile.delete();
- }
- flatFile.destroy();
- }
-
- static class CompactTask implements Runnable {
- private final TieredMessageStoreConfig storeConfig;
-
- private final int maxHashSlotNum;
- private final int maxIndexNum;
- private final int fileMaxSize;
- private MappedFile originFile;
- private TieredFlatFile fileQueue;
- private MappedFile compactFile;
-
- public CompactTask(TieredMessageStoreConfig storeConfig, MappedFile originFile,
- TieredFlatFile fileQueue) throws IOException {
- this.storeConfig = storeConfig;
- this.maxHashSlotNum = storeConfig.getTieredStoreIndexFileMaxHashSlotNum();
- this.maxIndexNum = storeConfig.getTieredStoreIndexFileMaxIndexNum();
- this.originFile = originFile;
- this.fileQueue = fileQueue;
- String compactFilePath = storeConfig.getStorePathRootDir() + File.separator + INDEX_FILE_DIR_NAME + File.separator + COMPACT_FILE_NAME;
- fileMaxSize = IndexHeader.INDEX_HEADER_SIZE + (storeConfig.getTieredStoreIndexFileMaxHashSlotNum() * INDEX_FILE_HASH_SLOT_SIZE) + (storeConfig.getTieredStoreIndexFileMaxIndexNum() * INDEX_FILE_HASH_ORIGIN_INDEX_SIZE) + 4;
- // TODO check magic code, upload immediately when compact complete
- File compactFile = new File(compactFilePath);
- if (compactFile.exists()) {
- compactFile.delete();
- }
- this.compactFile = new DefaultMappedFile(compactFilePath, fileMaxSize);
- }
-
- @Override
- public void run() {
- try {
- compact();
- } catch (Throwable throwable) {
- logger.error("TieredIndexFile#compactTask: compact index file failed:", throwable);
- }
-
- try {
- if (originFile != null) {
- originFile.destroy(-1);
- }
- if (compactFile != null) {
- compactFile.destroy(-1);
- }
- } catch (Throwable throwable) {
- logger.error("TieredIndexFile#compactTask: destroy index file failed:", throwable);
- }
- }
-
- public void compact() {
- if (!isFileSealed(originFile)) {
- logger.error("[Bug]TieredIndexFile#CompactTask#compact: try to compact unsealed file");
- originFile.destroy(-1);
- compactFile.destroy(-1);
- return;
- }
-
- buildCompactFile();
- fileQueue.append(compactFile.getMappedByteBuffer());
- fileQueue.commit(true);
- compactFile.destroy(-1);
- originFile.destroy(-1);
- compactFile = null;
- originFile = null;
- }
-
- private void buildCompactFile() {
- MappedByteBuffer originMappedByteBuffer = originFile.getMappedByteBuffer();
- MappedByteBuffer compactMappedByteBuffer = compactFile.getMappedByteBuffer();
- compactMappedByteBuffer.putInt(INDEX_FILE_HEADER_MAGIC_CODE_POSITION, INDEX_FILE_BEGIN_MAGIC_CODE);
- compactMappedByteBuffer.putLong(INDEX_FILE_HEADER_BEGIN_TIME_STAMP_POSITION, originMappedByteBuffer.getLong(INDEX_FILE_HEADER_BEGIN_TIME_STAMP_POSITION));
- compactMappedByteBuffer.putLong(INDEX_FILE_HEADER_END_TIME_STAMP_POSITION, originMappedByteBuffer.getLong(INDEX_FILE_HEADER_END_TIME_STAMP_POSITION));
- compactMappedByteBuffer.putInt(INDEX_FILE_HEADER_SLOT_NUM_POSITION, maxHashSlotNum);
- compactMappedByteBuffer.putInt(INDEX_FILE_HEADER_INDEX_NUM_POSITION, originMappedByteBuffer.getInt(INDEX_FILE_HEADER_INDEX_NUM_POSITION));
-
- int rePutSlotValue = INDEX_FILE_HEADER_SIZE + (maxHashSlotNum * INDEX_FILE_HASH_SLOT_SIZE);
- for (int i = 0; i < maxHashSlotNum; i++) {
- int slotOffset = INDEX_FILE_HEADER_SIZE + i * INDEX_FILE_HASH_SLOT_SIZE;
- int slotValue = originMappedByteBuffer.getInt(slotOffset);
- if (slotValue != -1) {
- int indexTotalSize = 0;
- int indexPosition = slotValue;
-
- while (indexPosition >= 0 && indexPosition < maxIndexNum) {
- int indexOffset = INDEX_FILE_HEADER_SIZE + maxHashSlotNum * INDEX_FILE_HASH_SLOT_SIZE
- + indexPosition * INDEX_FILE_HASH_ORIGIN_INDEX_SIZE;
- int rePutIndexOffset = rePutSlotValue + indexTotalSize;
-
- compactMappedByteBuffer.putInt(rePutIndexOffset, originMappedByteBuffer.getInt(indexOffset));
- compactMappedByteBuffer.putInt(rePutIndexOffset + 4, originMappedByteBuffer.getInt(indexOffset + 4));
- compactMappedByteBuffer.putInt(rePutIndexOffset + 4 + 4, originMappedByteBuffer.getInt(indexOffset + 4 + 4));
- compactMappedByteBuffer.putLong(rePutIndexOffset + 4 + 4 + 4, originMappedByteBuffer.getLong(indexOffset + 4 + 4 + 4));
- compactMappedByteBuffer.putInt(rePutIndexOffset + 4 + 4 + 4 + 8, originMappedByteBuffer.getInt(indexOffset + 4 + 4 + 4 + 8));
- compactMappedByteBuffer.putInt(rePutIndexOffset + 4 + 4 + 4 + 8 + 4, originMappedByteBuffer.getInt(indexOffset + 4 + 4 + 4 + 8 + 4));
-
- indexTotalSize += INDEX_FILE_HASH_COMPACT_INDEX_SIZE;
- indexPosition = originMappedByteBuffer.getInt(indexOffset + 4 + 4 + 4 + 8 + 4 + 4);
- }
- compactMappedByteBuffer.putInt(slotOffset, rePutSlotValue);
- compactMappedByteBuffer.putInt(slotOffset + 4, indexTotalSize);
- rePutSlotValue += indexTotalSize;
- }
- }
- compactMappedByteBuffer.putInt(INDEX_FILE_HEADER_MAGIC_CODE_POSITION, INDEX_FILE_END_MAGIC_CODE);
- compactMappedByteBuffer.putInt(rePutSlotValue, INDEX_FILE_BEGIN_MAGIC_CODE);
- compactMappedByteBuffer.limit(rePutSlotValue + 4);
- }
- }
-}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexFile.java
new file mode 100644
index 00000000000..d131b9b53ea
--- /dev/null
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexFile.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tieredstore.index;
+
+import java.nio.ByteBuffer;
+
+public interface IndexFile extends IndexService {
+
+ /**
+ * Enumeration for the status of the index file.
+ */
+ enum IndexStatusEnum {
+ SHUTDOWN, UNSEALED, SEALED, UPLOAD
+ }
+
+ long getTimestamp();
+
+ IndexStatusEnum getFileStatus();
+
+ ByteBuffer doCompaction();
+}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexItem.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexItem.java
new file mode 100644
index 00000000000..24ccc4322fa
--- /dev/null
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexItem.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tieredstore.index;
+
+import java.nio.ByteBuffer;
+
+public class IndexItem {
+
+ public static final int INDEX_ITEM_SIZE = 32;
+ public static final int COMPACT_INDEX_ITEM_SIZE = 28;
+
+ private final int hashCode;
+ private final int topicId;
+ private final int queueId;
+ private final long offset;
+ private final int size;
+ private final int timeDiff;
+ private final int itemIndex;
+
+ public IndexItem(int topicId, int queueId, long offset, int size, int hashCode, int timeDiff, int itemIndex) {
+ this.hashCode = hashCode;
+ this.topicId = topicId;
+ this.queueId = queueId;
+ this.offset = offset;
+ this.size = size;
+ this.timeDiff = timeDiff;
+ this.itemIndex = itemIndex;
+ }
+
+ public IndexItem(byte[] bytes) {
+ if (bytes == null ||
+ bytes.length != INDEX_ITEM_SIZE &&
+ bytes.length != COMPACT_INDEX_ITEM_SIZE) {
+ throw new IllegalArgumentException("Byte array length not correct");
+ }
+
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+ hashCode = byteBuffer.getInt(0);
+ topicId = byteBuffer.getInt(4);
+ queueId = byteBuffer.getInt(8);
+ offset = byteBuffer.getLong(12);
+ size = byteBuffer.getInt(20);
+ timeDiff = byteBuffer.getInt(24);
+ itemIndex = bytes.length == INDEX_ITEM_SIZE ? byteBuffer.getInt(28) : 0;
+ }
+
+ public ByteBuffer getByteBuffer() {
+ ByteBuffer byteBuffer = ByteBuffer.allocate(32);
+ byteBuffer.putInt(0, hashCode);
+ byteBuffer.putInt(4, topicId);
+ byteBuffer.putInt(8, queueId);
+ byteBuffer.putLong(12, offset);
+ byteBuffer.putInt(20, size);
+ byteBuffer.putInt(24, timeDiff);
+ byteBuffer.putInt(28, itemIndex);
+ return byteBuffer;
+ }
+
+ public int getHashCode() {
+ return hashCode;
+ }
+
+ public int getTopicId() {
+ return topicId;
+ }
+
+ public int getQueueId() {
+ return queueId;
+ }
+
+ public long getOffset() {
+ return offset;
+ }
+
+ public int getSize() {
+ return size;
+ }
+
+ public int getTimeDiff() {
+ return timeDiff;
+ }
+
+ public int getItemIndex() {
+ return itemIndex;
+ }
+
+ @Override
+ public String toString() {
+ return "IndexItem{" +
+ "hashCode=" + hashCode +
+ ", topicId=" + topicId +
+ ", queueId=" + queueId +
+ ", offset=" + offset +
+ ", size=" + size +
+ ", timeDiff=" + timeDiff +
+ ", position=" + itemIndex +
+ '}';
+ }
+}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexService.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexService.java
new file mode 100644
index 00000000000..d4eb854a2e8
--- /dev/null
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexService.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tieredstore.index;
+
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.rocketmq.tieredstore.common.AppendResult;
+
+public interface IndexService {
+
+ /**
+ * Puts a key into the index.
+ *
+ * @param topic The topic of the key.
+ * @param topicId The ID of the topic.
+ * @param queueId The ID of the queue.
+ * @param keySet The set of keys to be indexed.
+ * @param offset The offset value of the key.
+ * @param size The size of the key.
+ * @param timestamp The timestamp of the key.
+ * @return The result of the put operation.
+ */
+ AppendResult putKey(
+ String topic, int topicId, int queueId, Set keySet, long offset, int size, long timestamp);
+
+ /**
+ * Asynchronously queries the index for a specific key within a given time range.
+ *
+ * @param topic The topic of the key.
+ * @param key The key to be queried.
+ * @param beginTime The start time of the query range.
+ * @param endTime The end time of the query range.
+ * @return A CompletableFuture that holds the list of IndexItems matching the query.
+ */
+ CompletableFuture> queryAsync(String topic, String key, int maxCount, long beginTime, long endTime);
+
+ /**
+ * Shutdown the index service.
+ */
+ void shutdown();
+
+ /**
+ * Destroys the index service and releases all resources.
+ */
+ void destroy();
+}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
new file mode 100644
index 00000000000..def5c8f2d06
--- /dev/null
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
@@ -0,0 +1,497 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tieredstore.index;
+
+import com.google.common.base.Stopwatch;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.store.logfile.DefaultMappedFile;
+import org.apache.rocketmq.store.logfile.MappedFile;
+import org.apache.rocketmq.tieredstore.common.AppendResult;
+import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
+import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
+import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
+import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
+
+import static org.apache.rocketmq.tieredstore.index.IndexFile.IndexStatusEnum.SEALED;
+import static org.apache.rocketmq.tieredstore.index.IndexFile.IndexStatusEnum.UNSEALED;
+import static org.apache.rocketmq.tieredstore.index.IndexFile.IndexStatusEnum.UPLOAD;
+import static org.apache.rocketmq.tieredstore.index.IndexItem.COMPACT_INDEX_ITEM_SIZE;
+import static org.apache.rocketmq.tieredstore.index.IndexStoreService.FILE_COMPACTED_DIRECTORY_NAME;
+import static org.apache.rocketmq.tieredstore.index.IndexStoreService.FILE_DIRECTORY_NAME;
+
+/**
+ * a single IndexFile in indexService
+ */
+public class IndexStoreFile implements IndexFile {
+
+ private static final Logger log = LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME);
+
+ /**
+ * header format:
+ * magic code(4) + begin timestamp(8) + end timestamp(8) + slot num(4) + index num(4)
+ */
+ public static final int INDEX_MAGIC_CODE = 0;
+ public static final int INDEX_BEGIN_TIME_STAMP = 4;
+ public static final int INDEX_END_TIME_STAMP = 12;
+ public static final int INDEX_SLOT_COUNT = 20;
+ public static final int INDEX_ITEM_INDEX = 24;
+ public static final int INDEX_HEADER_SIZE = 28;
+
+ public static final int BEGIN_MAGIC_CODE = 0xCCDDEEFF ^ 1880681586 + 4;
+ public static final int END_MAGIC_CODE = 0xCCDDEEFF ^ 1880681586 + 8;
+
+ /**
+ * hash slot
+ */
+ private static final int INVALID_INDEX = 0;
+ private static final int HASH_SLOT_SIZE = Long.BYTES;
+ private static final int MAX_QUERY_COUNT = 512;
+
+ private final int hashSlotMaxCount;
+ private final int indexItemMaxCount;
+
+ private final ReadWriteLock fileReadWriteLock;
+ private final AtomicReference fileStatus;
+ private final AtomicLong beginTimestamp = new AtomicLong(-1L);
+ private final AtomicLong endTimestamp = new AtomicLong(-1L);
+ private final AtomicInteger hashSlotCount = new AtomicInteger(0);
+ private final AtomicInteger indexItemCount = new AtomicInteger(0);
+
+ private MappedFile mappedFile;
+ private ByteBuffer byteBuffer;
+ private MappedFile compactMappedFile;
+ private TieredFileSegment fileSegment;
+
+ public IndexStoreFile(TieredMessageStoreConfig storeConfig, long timestamp) throws IOException {
+ this.hashSlotMaxCount = storeConfig.getTieredStoreIndexFileMaxHashSlotNum();
+ this.indexItemMaxCount = storeConfig.getTieredStoreIndexFileMaxIndexNum();
+ this.fileStatus = new AtomicReference<>(UNSEALED);
+ this.fileReadWriteLock = new ReentrantReadWriteLock();
+ this.mappedFile = new DefaultMappedFile(
+ Paths.get(storeConfig.getStorePathRootDir(), FILE_DIRECTORY_NAME, String.valueOf(timestamp)).toString(),
+ this.getItemPosition(indexItemMaxCount));
+ this.byteBuffer = this.mappedFile.getMappedByteBuffer();
+
+ this.beginTimestamp.set(timestamp);
+ this.endTimestamp.set(byteBuffer.getLong(INDEX_BEGIN_TIME_STAMP));
+ this.hashSlotCount.set(byteBuffer.getInt(INDEX_SLOT_COUNT));
+ this.indexItemCount.set(byteBuffer.getInt(INDEX_ITEM_INDEX));
+ this.flushNewMetadata(byteBuffer, indexItemMaxCount == this.indexItemCount.get() + 1);
+ }
+
+ public IndexStoreFile(TieredMessageStoreConfig storeConfig, TieredFileSegment fileSegment) {
+ this.fileSegment = fileSegment;
+ this.fileStatus = new AtomicReference<>(UPLOAD);
+ this.fileReadWriteLock = new ReentrantReadWriteLock();
+
+ this.beginTimestamp.set(fileSegment.getMinTimestamp());
+ this.endTimestamp.set(fileSegment.getMaxTimestamp());
+ this.hashSlotCount.set(storeConfig.getTieredStoreIndexFileMaxHashSlotNum());
+ this.indexItemCount.set(storeConfig.getTieredStoreIndexFileMaxIndexNum());
+ this.hashSlotMaxCount = hashSlotCount.get();
+ this.indexItemMaxCount = indexItemCount.get();
+ }
+
+ @Override
+ public long getTimestamp() {
+ return this.beginTimestamp.get();
+ }
+
+ public long getEndTimestamp() {
+ return this.endTimestamp.get();
+ }
+
+ public long getHashSlotCount() {
+ return this.hashSlotCount.get();
+ }
+
+ public long getIndexItemCount() {
+ return this.indexItemCount.get();
+ }
+
+ @Override
+ public IndexStatusEnum getFileStatus() {
+ return this.fileStatus.get();
+ }
+
+ protected String buildKey(String topic, String key) {
+ return String.format("%s#%s", topic, key);
+ }
+
+ protected int hashCode(String keyStr) {
+ int keyHash = keyStr.hashCode();
+ return (keyHash < 0) ? -keyHash : keyHash;
+ }
+
+ protected void flushNewMetadata(ByteBuffer byteBuffer, boolean end) {
+ byteBuffer.putInt(INDEX_MAGIC_CODE, !end ? BEGIN_MAGIC_CODE : END_MAGIC_CODE);
+ byteBuffer.putLong(INDEX_BEGIN_TIME_STAMP, this.beginTimestamp.get());
+ byteBuffer.putLong(INDEX_END_TIME_STAMP, this.endTimestamp.get());
+ byteBuffer.putInt(INDEX_SLOT_COUNT, this.hashSlotCount.get());
+ byteBuffer.putInt(INDEX_ITEM_INDEX, this.indexItemCount.get());
+ }
+
+ protected int getSlotPosition(int slotIndex) {
+ return INDEX_HEADER_SIZE + slotIndex * HASH_SLOT_SIZE;
+ }
+
+ protected int getSlotValue(int slotPosition) {
+ return Math.max(this.byteBuffer.getInt(slotPosition), INVALID_INDEX);
+ }
+
+ protected int getItemPosition(int itemIndex) {
+ return INDEX_HEADER_SIZE + hashSlotMaxCount * HASH_SLOT_SIZE + itemIndex * IndexItem.INDEX_ITEM_SIZE;
+ }
+
+ @Override
+ public AppendResult putKey(
+ String topic, int topicId, int queueId, Set keySet, long offset, int size, long timestamp) {
+
+ if (StringUtils.isBlank(topic)) {
+ return AppendResult.UNKNOWN_ERROR;
+ }
+
+ if (keySet == null || keySet.isEmpty()) {
+ return AppendResult.SUCCESS;
+ }
+
+ try {
+ fileReadWriteLock.writeLock().lock();
+
+ if (!UNSEALED.equals(fileStatus.get())) {
+ return AppendResult.FILE_FULL;
+ }
+
+ if (this.indexItemCount.get() + keySet.size() >= this.indexItemMaxCount) {
+ this.fileStatus.set(IndexStatusEnum.SEALED);
+ return AppendResult.FILE_FULL;
+ }
+
+ for (String key : keySet) {
+ int hashCode = this.hashCode(this.buildKey(topic, key));
+ int slotPosition = this.getSlotPosition(hashCode % this.hashSlotMaxCount);
+ int slotOldValue = this.getSlotValue(slotPosition);
+ int timeDiff = (int) ((timestamp - this.beginTimestamp.get()) / 1000L);
+
+ IndexItem indexItem = new IndexItem(
+ topicId, queueId, offset, size, hashCode, timeDiff, slotOldValue);
+ int itemIndex = this.indexItemCount.incrementAndGet();
+ this.byteBuffer.position(this.getItemPosition(itemIndex));
+ this.byteBuffer.put(indexItem.getByteBuffer());
+ this.byteBuffer.putInt(slotPosition, itemIndex);
+
+ if (slotOldValue <= INVALID_INDEX) {
+ this.hashSlotCount.incrementAndGet();
+ }
+ if (this.endTimestamp.get() < timestamp) {
+ this.endTimestamp.set(timestamp);
+ }
+ this.flushNewMetadata(byteBuffer, indexItemMaxCount == this.indexItemCount.get() + 1);
+
+ log.trace("IndexStoreFile put key, timestamp: {}, topic: {}, key: {}, slot: {}, item: {}, previous item: {}, content: {}",
+ this.getTimestamp(), topic, key, hashCode % this.hashSlotMaxCount, itemIndex, slotOldValue, indexItem);
+ }
+ return AppendResult.SUCCESS;
+ } catch (Exception e) {
+ log.error("IndexStoreFile put key error, topic: {}, topicId: {}, queueId: {}, keySet: {}, offset: {}, " +
+ "size: {}, timestamp: {}", topic, topicId, queueId, keySet, offset, size, timestamp, e);
+ } finally {
+ fileReadWriteLock.writeLock().unlock();
+ }
+
+ return AppendResult.UNKNOWN_ERROR;
+ }
+
+ @Override
+ public CompletableFuture> queryAsync(
+ String topic, String key, int maxCount, long beginTime, long endTime) {
+
+ switch (this.fileStatus.get()) {
+ case UNSEALED:
+ case SEALED:
+ return this.queryAsyncFromUnsealedFile(buildKey(topic, key), maxCount, beginTime, endTime);
+ case UPLOAD:
+ return this.queryAsyncFromSegmentFile(buildKey(topic, key), maxCount, beginTime, endTime);
+ case SHUTDOWN:
+ default:
+ return CompletableFuture.completedFuture(new ArrayList<>());
+ }
+ }
+
+ protected CompletableFuture> queryAsyncFromUnsealedFile(
+ String key, int maxCount, long beginTime, long endTime) {
+
+ return CompletableFuture.supplyAsync(() -> {
+ List result = new ArrayList<>();
+ try {
+ fileReadWriteLock.readLock().lock();
+ if (!UNSEALED.equals(this.fileStatus.get()) && !SEALED.equals(this.fileStatus.get())) {
+ return result;
+ }
+
+ if (mappedFile == null || !mappedFile.hold()) {
+ return result;
+ }
+
+ int hashCode = this.hashCode(key);
+ int slotPosition = this.getSlotPosition(hashCode % this.hashSlotMaxCount);
+ int slotValue = this.getSlotValue(slotPosition);
+
+ int left = MAX_QUERY_COUNT;
+ while (left > 0 &&
+ slotValue > INVALID_INDEX &&
+ slotValue <= this.indexItemCount.get()) {
+
+ byte[] bytes = new byte[IndexItem.INDEX_ITEM_SIZE];
+ ByteBuffer buffer = this.byteBuffer.duplicate();
+ buffer.position(this.getItemPosition(slotValue));
+ buffer.get(bytes);
+ IndexItem indexItem = new IndexItem(bytes);
+ if (hashCode == indexItem.getHashCode()) {
+ result.add(indexItem);
+ if (result.size() > maxCount) {
+ break;
+ }
+ }
+ slotValue = indexItem.getItemIndex();
+ left--;
+ }
+
+ log.debug("IndexStoreFile query from unsealed mapped file, timestamp: {}, result size: {}, " +
+ "key: {}, hashCode: {}, maxCount: {}, timestamp={}-{}",
+ getTimestamp(), result.size(), key, hashCode, maxCount, beginTime, endTime);
+ } catch (Exception e) {
+ log.error("IndexStoreFile query from unsealed mapped file error, timestamp: {}, " +
+ "key: {}, maxCount: {}, timestamp={}-{}", getTimestamp(), key, maxCount, beginTime, endTime, e);
+ } finally {
+ fileReadWriteLock.readLock().unlock();
+ mappedFile.release();
+ }
+ return result;
+ }, TieredStoreExecutor.fetchDataExecutor);
+ }
+
+ protected CompletableFuture> queryAsyncFromSegmentFile(
+ String key, int maxCount, long beginTime, long endTime) {
+
+ if (this.fileSegment == null || !UPLOAD.equals(this.fileStatus.get())) {
+ return CompletableFuture.completedFuture(Collections.emptyList());
+ }
+
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ int hashCode = this.hashCode(key);
+ int slotPosition = this.getSlotPosition(hashCode % this.hashSlotMaxCount);
+
+ CompletableFuture> future = this.fileSegment.readAsync(slotPosition, HASH_SLOT_SIZE)
+ .thenCompose(slotBuffer -> {
+ if (slotBuffer.remaining() < HASH_SLOT_SIZE) {
+ log.error("IndexStoreFile query from tiered storage return error slot buffer, " +
+ "key: {}, maxCount: {}, timestamp={}-{}", key, maxCount, beginTime, endTime);
+ return CompletableFuture.completedFuture(null);
+ }
+ int indexPosition = slotBuffer.getInt();
+ int indexTotalSize = Math.min(slotBuffer.getInt(), COMPACT_INDEX_ITEM_SIZE * 1024);
+ if (indexPosition <= INVALID_INDEX || indexTotalSize <= 0) {
+ return CompletableFuture.completedFuture(null);
+ }
+ return this.fileSegment.readAsync(indexPosition, indexTotalSize);
+ })
+ .thenApply(itemBuffer -> {
+ List result = new ArrayList<>();
+ if (itemBuffer == null) {
+ return result;
+ }
+
+ if (itemBuffer.remaining() % COMPACT_INDEX_ITEM_SIZE != 0) {
+ log.error("IndexStoreFile query from tiered storage return error item buffer, " +
+ "key: {}, maxCount: {}, timestamp={}-{}", key, maxCount, beginTime, endTime);
+ return result;
+ }
+
+ int size = itemBuffer.remaining() / COMPACT_INDEX_ITEM_SIZE;
+ byte[] bytes = new byte[COMPACT_INDEX_ITEM_SIZE];
+ for (int i = 0; i < size; i++) {
+ itemBuffer.get(bytes);
+ IndexItem indexItem = new IndexItem(bytes);
+ long storeTimestamp = indexItem.getTimeDiff() + beginTimestamp.get();
+ if (hashCode == indexItem.getHashCode() &&
+ beginTime <= storeTimestamp && storeTimestamp <= endTime &&
+ result.size() < maxCount) {
+ result.add(indexItem);
+ }
+ }
+ return result;
+ });
+
+ return future.whenComplete((result, throwable) -> {
+ long costTime = stopwatch.elapsed(TimeUnit.MILLISECONDS);
+ if (throwable != null) {
+ log.error("IndexStoreFile query from segment file, cost: {}ms, timestamp: {}, " +
+ "key: {}, hashCode: {}, maxCount: {}, timestamp={}-{}",
+ costTime, getTimestamp(), key, hashCode, maxCount, beginTime, endTime, throwable);
+ } else {
+ String details = Optional.ofNullable(result)
+ .map(r -> r.stream()
+ .map(item -> String.format("%d-%d", item.getQueueId(), item.getOffset()))
+ .collect(Collectors.joining(", ")))
+ .orElse("");
+
+ log.debug("IndexStoreFile query from segment file, cost: {}ms, timestamp: {}, result size: {}, ({}), " +
+ "key: {}, hashCode: {}, maxCount: {}, timestamp={}-{}",
+ costTime, getTimestamp(), result != null ? result.size() : 0, details, key, hashCode, maxCount, beginTime, endTime);
+ }
+ });
+ }
+
+ @Override
+ public ByteBuffer doCompaction() {
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ ByteBuffer buffer;
+ try {
+ buffer = compactToNewFile();
+ log.debug("IndexStoreFile do compaction, timestamp: {}, file size: {}, cost: {}ms",
+ this.getTimestamp(), buffer.capacity(), stopwatch.elapsed(TimeUnit.MICROSECONDS));
+ } catch (Exception e) {
+ log.error("IndexStoreFile do compaction, timestamp: {}, cost: {}ms",
+ this.getTimestamp(), stopwatch.elapsed(TimeUnit.MICROSECONDS), e);
+ return null;
+ }
+
+ try {
+ // Make sure there is no read request here
+ fileReadWriteLock.writeLock().lock();
+ fileStatus.set(IndexStatusEnum.SEALED);
+ } catch (Exception e) {
+ log.error("IndexStoreFile change file status to sealed error, timestamp={}", this.getTimestamp());
+ } finally {
+ fileReadWriteLock.writeLock().unlock();
+ }
+ return buffer;
+ }
+
+ protected String getCompactedFilePath() {
+ return Paths.get(this.mappedFile.getFileName()).getParent()
+ .resolve(FILE_COMPACTED_DIRECTORY_NAME)
+ .resolve(String.valueOf(this.getTimestamp())).toString();
+ }
+
+ protected ByteBuffer compactToNewFile() throws IOException {
+
+ byte[] payload = new byte[IndexItem.INDEX_ITEM_SIZE];
+ ByteBuffer payloadBuffer = ByteBuffer.wrap(payload);
+ int writePosition = INDEX_HEADER_SIZE + (hashSlotMaxCount * HASH_SLOT_SIZE);
+ int fileMaxLength = writePosition + COMPACT_INDEX_ITEM_SIZE * indexItemCount.get();
+
+ compactMappedFile = new DefaultMappedFile(this.getCompactedFilePath(), fileMaxLength);
+ MappedByteBuffer newBuffer = compactMappedFile.getMappedByteBuffer();
+
+ for (int i = 0; i < hashSlotMaxCount; i++) {
+ int slotPosition = this.getSlotPosition(i);
+ int slotValue = this.getSlotValue(slotPosition);
+ int writeBeginPosition = writePosition;
+
+ while (slotValue > INVALID_INDEX && writePosition < fileMaxLength) {
+ ByteBuffer buffer = this.byteBuffer.duplicate();
+ buffer.position(this.getItemPosition(slotValue));
+ buffer.get(payload);
+ int newSlotValue = payloadBuffer.getInt(COMPACT_INDEX_ITEM_SIZE);
+ buffer.limit(COMPACT_INDEX_ITEM_SIZE);
+ newBuffer.position(writePosition);
+ newBuffer.put(payload, 0, COMPACT_INDEX_ITEM_SIZE);
+ log.trace("IndexStoreFile do compaction, write item, slot: {}, current: {}, next: {}", i, slotValue, newSlotValue);
+ slotValue = newSlotValue;
+ writePosition += COMPACT_INDEX_ITEM_SIZE;
+ }
+
+ int length = writePosition - writeBeginPosition;
+ newBuffer.putInt(slotPosition, writeBeginPosition);
+ newBuffer.putInt(slotPosition + Integer.BYTES, length);
+
+ if (length > 0) {
+ log.trace("IndexStoreFile do compaction, write slot, slot: {}, begin: {}, length: {}", i, writeBeginPosition, length);
+ }
+ }
+
+ this.flushNewMetadata(newBuffer, true);
+ newBuffer.flip();
+ return newBuffer;
+ }
+
+ @Override
+ public void shutdown() {
+ try {
+ fileReadWriteLock.writeLock().lock();
+ this.fileStatus.set(IndexStatusEnum.SHUTDOWN);
+ if (this.mappedFile != null) {
+ this.mappedFile.shutdown(TimeUnit.SECONDS.toMillis(10));
+ }
+ if (this.compactMappedFile != null) {
+ this.compactMappedFile.shutdown(TimeUnit.SECONDS.toMillis(10));
+ }
+ } catch (Exception e) {
+ log.error("IndexStoreFile shutdown failed, timestamp: {}, status: {}", this.getTimestamp(), fileStatus.get(), e);
+ } finally {
+ fileReadWriteLock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public void destroy() {
+ try {
+ fileReadWriteLock.writeLock().lock();
+ this.shutdown();
+ switch (this.fileStatus.get()) {
+ case SHUTDOWN:
+ case UNSEALED:
+ case SEALED:
+ if (this.mappedFile != null) {
+ this.mappedFile.destroy(TimeUnit.SECONDS.toMillis(10));
+ }
+ if (this.compactMappedFile != null) {
+ this.compactMappedFile.destroy(TimeUnit.SECONDS.toMillis(10));
+ }
+ log.info("IndexStoreService destroy local file, timestamp: {}, status: {}", this.getTimestamp(), fileStatus.get());
+ break;
+ case UPLOAD:
+ log.warn("[BUG] IndexStoreService destroy remote file, timestamp: {}", this.getTimestamp());
+ }
+ } catch (Exception e) {
+ log.error("IndexStoreService destroy failed, timestamp: {}, status: {}", this.getTimestamp(), fileStatus.get(), e);
+ } finally {
+ fileReadWriteLock.writeLock().unlock();
+ }
+ }
+}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
new file mode 100644
index 00000000000..ddb64efa1d8
--- /dev/null
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tieredstore.index;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Stopwatch;
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.store.logfile.DefaultMappedFile;
+import org.apache.rocketmq.store.logfile.MappedFile;
+import org.apache.rocketmq.tieredstore.common.AppendResult;
+import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
+import org.apache.rocketmq.tieredstore.file.TieredFileAllocator;
+import org.apache.rocketmq.tieredstore.file.TieredFlatFile;
+import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
+import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
+
+public class IndexStoreService extends ServiceThread implements IndexService {
+
+ private static final Logger log = LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME);
+
+ public static final String FILE_DIRECTORY_NAME = "tiered_index_file";
+ public static final String FILE_COMPACTED_DIRECTORY_NAME = "compacting";
+
+ /**
+ * File status in table example:
+ * upload, upload, upload, sealed, sealed, unsealed
+ */
+ private final TieredMessageStoreConfig storeConfig;
+ private final ConcurrentSkipListMap timeStoreTable;
+ private final ReadWriteLock readWriteLock;
+ private final AtomicLong compactTimestamp;
+ private final String filePath;
+ private final TieredFileAllocator fileAllocator;
+
+ private IndexFile currentWriteFile;
+ private TieredFlatFile flatFile;
+
+ public IndexStoreService(TieredFileAllocator fileAllocator, String filePath) {
+ this.storeConfig = fileAllocator.getStoreConfig();
+ this.filePath = filePath;
+ this.fileAllocator = fileAllocator;
+ this.timeStoreTable = new ConcurrentSkipListMap<>();
+ this.compactTimestamp = new AtomicLong(0L);
+ this.readWriteLock = new ReentrantReadWriteLock();
+ this.recover();
+ }
+
+ private void doConvertOldFormatFile(String filePath) {
+ try {
+ File file = new File(filePath);
+ if (!file.exists()) {
+ return;
+ }
+ MappedFile mappedFile = new DefaultMappedFile(file.getPath(), (int) file.length());
+ long timestamp = mappedFile.getMappedByteBuffer().getLong(IndexStoreFile.INDEX_BEGIN_TIME_STAMP);
+ mappedFile.renameTo(String.valueOf(new File(file.getParent(), String.valueOf(timestamp))));
+ mappedFile.shutdown(TimeUnit.SECONDS.toMillis(10));
+ } catch (Exception e) {
+ log.error("IndexStoreService do convert old format error, file: {}", filePath, e);
+ }
+ }
+
+ private void recover() {
+ Stopwatch stopwatch = Stopwatch.createStarted();
+
+ // recover local
+ File dir = new File(Paths.get(storeConfig.getStorePathRootDir(), FILE_DIRECTORY_NAME).toString());
+ this.doConvertOldFormatFile(Paths.get(dir.getPath(), "0000").toString());
+ this.doConvertOldFormatFile(Paths.get(dir.getPath(), "1111").toString());
+ File[] files = dir.listFiles();
+
+ if (files != null) {
+ List fileList = Arrays.asList(files);
+ fileList.sort(Comparator.comparing(File::getName));
+
+ for (File file : fileList) {
+ if (file.isDirectory() || !StringUtils.isNumeric(file.getName())) {
+ continue;
+ }
+
+ try {
+ IndexFile indexFile = new IndexStoreFile(storeConfig, Long.parseLong(file.getName()));
+ timeStoreTable.put(indexFile.getTimestamp(), indexFile);
+ log.info("IndexStoreService recover load local file, timestamp: {}", indexFile.getTimestamp());
+ } catch (Exception e) {
+ log.error("IndexStoreService recover, load local file error", e);
+ }
+ }
+ }
+
+ if (this.timeStoreTable.isEmpty()) {
+ this.createNewIndexFile(System.currentTimeMillis());
+ }
+
+ this.currentWriteFile = this.timeStoreTable.lastEntry().getValue();
+ this.setCompactTimestamp(this.timeStoreTable.firstKey() - 1);
+
+ // recover remote
+ this.flatFile = fileAllocator.createFlatFileForIndexFile(filePath);
+ if (this.flatFile.getBaseOffset() == -1) {
+ this.flatFile.setBaseOffset(0);
+ }
+
+ for (TieredFileSegment fileSegment : flatFile.getFileSegmentList()) {
+ IndexFile indexFile = new IndexStoreFile(storeConfig, fileSegment);
+ timeStoreTable.put(indexFile.getTimestamp(), indexFile);
+ log.info("IndexStoreService recover load remote file, timestamp: {}", indexFile.getTimestamp());
+ }
+
+ log.info("IndexStoreService recover finished, entrySize: {}, cost: {}ms, filePath: {}",
+ timeStoreTable.size(), stopwatch.elapsed(TimeUnit.MILLISECONDS), dir.getAbsolutePath());
+ }
+
+ public void createNewIndexFile(long timestamp) {
+ try {
+ this.readWriteLock.writeLock().lock();
+ IndexFile indexFile = this.currentWriteFile;
+ if (this.timeStoreTable.containsKey(timestamp) ||
+ indexFile != null && IndexFile.IndexStatusEnum.UNSEALED.equals(indexFile.getFileStatus())) {
+ return;
+ }
+ IndexStoreFile newStoreFile = new IndexStoreFile(storeConfig, timestamp);
+ this.timeStoreTable.put(timestamp, newStoreFile);
+ this.currentWriteFile = newStoreFile;
+ log.info("IndexStoreService construct next file, timestamp: {}", timestamp);
+ } catch (Exception e) {
+ log.error("IndexStoreService construct next file, timestamp: {}", timestamp, e);
+ } finally {
+ this.readWriteLock.writeLock().unlock();
+ }
+ }
+
+ @VisibleForTesting
+ public ConcurrentSkipListMap getTimeStoreTable() {
+ return timeStoreTable;
+ }
+
+ @Override
+ public AppendResult putKey(
+ String topic, int topicId, int queueId, Set keySet, long offset, int size, long timestamp) {
+
+ if (StringUtils.isBlank(topic)) {
+ return AppendResult.UNKNOWN_ERROR;
+ }
+
+ if (keySet == null || keySet.isEmpty()) {
+ return AppendResult.SUCCESS;
+ }
+
+ for (int i = 0; i < 3; i++) {
+ AppendResult result = this.currentWriteFile.putKey(
+ topic, topicId, queueId, keySet, offset, size, timestamp);
+
+ if (AppendResult.SUCCESS.equals(result)) {
+ return AppendResult.SUCCESS;
+ } else if (AppendResult.FILE_FULL.equals(result)) {
+ this.createNewIndexFile(timestamp);
+ }
+ }
+
+ log.error("IndexStoreService put key three times return error, topic: {}, topicId: {}, " +
+ "queueId: {}, keySize: {}, timestamp: {}", topic, topicId, queueId, keySet.size(), timestamp);
+ return AppendResult.UNKNOWN_ERROR;
+ }
+
+ @Override
+ public CompletableFuture> queryAsync(
+ String topic, String key, int maxCount, long beginTime, long endTime) {
+
+ CompletableFuture> future = new CompletableFuture<>();
+ try {
+ readWriteLock.readLock().lock();
+ ConcurrentNavigableMap pendingMap =
+ this.timeStoreTable.subMap(beginTime, true, endTime, true);
+ List> futureList = new ArrayList<>(pendingMap.size());
+ ConcurrentHashMap result = new ConcurrentHashMap<>();
+
+ for (Map.Entry entry : pendingMap.descendingMap().entrySet()) {
+ CompletableFuture completableFuture = entry.getValue()
+ .queryAsync(topic, key, maxCount, beginTime, endTime)
+ .thenAccept(itemList -> itemList.forEach(indexItem -> {
+ if (result.size() < maxCount) {
+ result.put(String.format(
+ "%d-%d", indexItem.getQueueId(), indexItem.getOffset()), indexItem);
+ }
+ }));
+ futureList.add(completableFuture);
+ }
+
+ CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0]))
+ .whenComplete((v, t) -> {
+ // Try to return the query results as much as possible here
+ // rather than directly throwing exceptions
+ if (result.isEmpty() && t != null) {
+ future.completeExceptionally(t);
+ } else {
+ List resultList = new ArrayList<>(result.values());
+ future.complete(resultList.subList(0, Math.min(resultList.size(), maxCount)));
+ }
+ });
+ } catch (Exception e) {
+ future.completeExceptionally(e);
+ } finally {
+ readWriteLock.readLock().unlock();
+ }
+ return future;
+ }
+
+ public void doCompactThenUploadFile(IndexFile indexFile) {
+ if (IndexFile.IndexStatusEnum.UPLOAD.equals(indexFile.getFileStatus())) {
+ log.error("IndexStoreService file status not correct, so skip, timestamp: {}, status: {}",
+ indexFile.getTimestamp(), indexFile.getFileStatus());
+ return;
+ }
+
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ ByteBuffer byteBuffer = indexFile.doCompaction();
+ if (byteBuffer == null) {
+ log.error("IndexStoreService found compaction buffer is null, timestamp: {}", indexFile.getTimestamp());
+ return;
+ }
+ flatFile.append(byteBuffer);
+ flatFile.commit(true);
+
+ TieredFileSegment fileSegment = flatFile.getFileByIndex(flatFile.getFileSegmentCount() - 1);
+ if (fileSegment == null || fileSegment.getMinTimestamp() != indexFile.getTimestamp()) {
+ log.warn("IndexStoreService submit compacted file to server failed, timestamp: {}", indexFile.getTimestamp());
+ return;
+ }
+
+ try {
+ readWriteLock.writeLock().lock();
+ IndexFile storeFile = new IndexStoreFile(storeConfig, fileSegment);
+ timeStoreTable.put(indexFile.getTimestamp(), storeFile);
+ indexFile.destroy();
+ } catch (Exception e) {
+ log.error("IndexStoreService switch file failed, timestamp: {}, cost: {}ms",
+ indexFile.getTimestamp(), stopwatch.elapsed(TimeUnit.MILLISECONDS), e);
+ } finally {
+ readWriteLock.writeLock().unlock();
+ }
+ }
+
+ public void destroyExpiredFile(long expireTimestamp) {
+ flatFile.cleanExpiredFile(expireTimestamp);
+ flatFile.destroyExpiredFile();
+ }
+
+ public void destroy() {
+ try {
+ readWriteLock.writeLock().lock();
+
+ // delete local store file
+ for (Map.Entry entry : timeStoreTable.entrySet()) {
+ IndexFile indexFile = entry.getValue();
+ if (IndexFile.IndexStatusEnum.UPLOAD.equals(indexFile.getFileStatus())) {
+ continue;
+ }
+ indexFile.destroy();
+ }
+
+ // delete remote
+ if (flatFile != null) {
+ flatFile.destroy();
+ }
+ } catch (Exception e) {
+ log.error("IndexStoreService destroy all file error", e);
+ } finally {
+ readWriteLock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public String getServiceName() {
+ return IndexStoreService.class.getSimpleName();
+ }
+
+ public void setCompactTimestamp(long timestamp) {
+ this.compactTimestamp.set(timestamp);
+ log.info("IndexStoreService compact timestamp has been set to: {}", timestamp);
+ }
+
+ protected IndexFile getNextSealedFile() {
+ try {
+ Map.Entry entry =
+ this.timeStoreTable.higherEntry(this.compactTimestamp.get());
+ if (entry != null && entry.getKey() < this.timeStoreTable.lastKey()) {
+ return entry.getValue();
+ }
+ } catch (Throwable e) {
+ log.error("Error occurred in " + getServiceName(), e);
+ }
+ return null;
+ }
+
+ @Override
+ public void run() {
+ log.info(this.getServiceName() + " service started");
+ while (!this.isStopped()) {
+ long expireTimestamp = System.currentTimeMillis()
+ - TimeUnit.HOURS.toMillis(storeConfig.getTieredStoreFileReservedTime());
+ this.destroyExpiredFile(expireTimestamp);
+
+ IndexFile indexFile = this.getNextSealedFile();
+ if (indexFile == null) {
+ this.waitForRunning(TimeUnit.SECONDS.toMillis(10));
+ continue;
+ }
+ this.doCompactThenUploadFile(indexFile);
+ this.setCompactTimestamp(indexFile.getTimestamp());
+ }
+ log.info(this.getServiceName() + " service shutdown");
+ }
+
+ @Override
+ public void shutdown() {
+ super.shutdown();
+ for (Map.Entry entry : timeStoreTable.entrySet()) {
+ entry.getValue().shutdown();
+ }
+ log.info("IndexStoreService shutdown gracefully");
+ }
+}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java
index 32911a6e898..aad42de98d8 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java
@@ -31,12 +31,14 @@
import org.apache.rocketmq.tieredstore.exception.TieredStoreException;
import org.apache.rocketmq.tieredstore.file.TieredCommitLog;
import org.apache.rocketmq.tieredstore.file.TieredConsumeQueue;
-import org.apache.rocketmq.tieredstore.file.TieredIndexFile;
import org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStream;
import org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStreamFactory;
import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
+import static org.apache.rocketmq.tieredstore.index.IndexStoreFile.INDEX_BEGIN_TIME_STAMP;
+import static org.apache.rocketmq.tieredstore.index.IndexStoreFile.INDEX_END_TIME_STAMP;
+
public abstract class TieredFileSegment implements Comparable, TieredStoreProvider {
private static final Logger logger = LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME);
@@ -198,8 +200,9 @@ public AppendResult append(ByteBuffer byteBuf, long timestamp) {
}
if (fileType == FileSegmentType.INDEX) {
- minTimestamp = byteBuf.getLong(TieredIndexFile.INDEX_FILE_HEADER_BEGIN_TIME_STAMP_POSITION);
- maxTimestamp = byteBuf.getLong(TieredIndexFile.INDEX_FILE_HEADER_END_TIME_STAMP_POSITION);
+ minTimestamp = byteBuf.getLong(INDEX_BEGIN_TIME_STAMP);
+ maxTimestamp = byteBuf.getLong(INDEX_END_TIME_STAMP);
+
appendPosition += byteBuf.remaining();
// IndexFile is large and not change after compaction, no need deep copy
bufferList.add(byteBuf);
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java
index 0db3eaf8f44..b9938b7a8a0 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java
@@ -59,7 +59,7 @@ public interface TieredStoreProvider {
* Get data from backend file system
*
* @param position the index from where the file will be read
- * @param length the data size will be read
+ * @param length the data size will be read
* @return data to be read
*/
CompletableFuture read0(long position, int length);
@@ -68,10 +68,10 @@ public interface TieredStoreProvider {
* Put data to backend file system
*
* @param inputStream data stream
- * @param position backend file position to put, used in append mode
- * @param length data size in stream
- * @param append try to append or create a new file
+ * @param position backend file position to put, used in append mode
+ * @param length data size in stream
+ * @param append try to append or create a new file
* @return put result, true
if data successfully write; false
otherwise
*/
- CompletableFuture commit0(FileSegmentInputStream inputStream,long position, int length, boolean append);
+ CompletableFuture commit0(FileSegmentInputStream inputStream, long position, int length, boolean append);
}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
index 7e949cb28cc..59c1d8ddaf8 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
@@ -74,7 +74,7 @@ public PosixFileSegment(TieredMessageStoreConfig storeConfig,
String clusterBasePath = TieredStoreUtil.getHash(brokerClusterName) + UNDERLINE + brokerClusterName;
this.fullPath = Paths.get(basePath, clusterBasePath, filePath,
fileType.toString(), TieredStoreUtil.offset2FileName(baseOffset)).toString();
- logger.info("Constructing Posix FileSegment, filePath: {}", fullPath);
+ logger.debug("Constructing Posix FileSegment, filePath: {}", fullPath);
createFile();
}
@@ -159,6 +159,7 @@ public CompletableFuture read0(long position, int length) {
readFileChannel.position(position);
readFileChannel.read(byteBuffer);
byteBuffer.flip();
+ byteBuffer.limit(length);
attributesBuilder.put(LABEL_SUCCESS, true);
long costTime = stopwatch.stop().elapsed(TimeUnit.MILLISECONDS);
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
index 774c6cf646e..4e0d7e69794 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
@@ -37,7 +37,6 @@
import org.apache.rocketmq.tieredstore.file.CompositeFlatFile;
import org.apache.rocketmq.tieredstore.file.CompositeQueueFlatFile;
import org.apache.rocketmq.tieredstore.file.TieredFlatFileManager;
-import org.apache.rocketmq.tieredstore.file.TieredIndexFile;
import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
import org.apache.rocketmq.tieredstore.util.MessageBufferUtilTest;
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
@@ -83,6 +82,7 @@ public Triple buildFetcher() {
Assert.assertEquals(GetMessageStatus.NO_MATCHED_LOGIC_QUEUE, getMessageResult.getStatus());
CompositeFlatFile flatFile = flatFileManager.getOrCreateFlatFileIfAbsent(mq);
+ Assert.assertNotNull(flatFile);
flatFile.initOffset(0);
getMessageResult = fetcher.getMessageAsync("group", mq.getTopic(), mq.getQueueId(), 0, 32, null).join();
@@ -197,6 +197,7 @@ public void testGetMessageAsync() {
public void testGetMessageStoreTimeStampAsync() {
TieredMessageFetcher fetcher = new TieredMessageFetcher(storeConfig);
CompositeFlatFile flatFile = TieredFlatFileManager.getInstance(storeConfig).getOrCreateFlatFileIfAbsent(mq);
+ Assert.assertNotNull(flatFile);
flatFile.initOffset(0);
ByteBuffer msg1 = MessageBufferUtilTest.buildMockedMessageBuffer();
@@ -270,6 +271,7 @@ public void testQueryMessageAsync() {
CompositeQueueFlatFile flatFile = TieredFlatFileManager.getInstance(storeConfig).getOrCreateFlatFileIfAbsent(mq);
Assert.assertEquals(0, fetcher.queryMessageAsync(mq.getTopic(), "key", 32, 0, Long.MAX_VALUE).join().getMessageMapedList().size());
+ Assert.assertNotNull(flatFile);
flatFile.initOffset(0);
ByteBuffer buffer = MessageBufferUtilTest.buildMockedMessageBuffer();
buffer.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 0);
@@ -281,20 +283,19 @@ public void testQueryMessageAsync() {
buffer.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 2);
flatFile.appendCommitLog(buffer);
- DispatchRequest request = new DispatchRequest(mq.getTopic(), mq.getQueueId(), 0, MessageBufferUtilTest.MSG_LEN, 0, 0, 0, "", "key", 0, 0, null);
+ long timestamp = System.currentTimeMillis();
+ DispatchRequest request = new DispatchRequest(mq.getTopic(), mq.getQueueId(), 0, MessageBufferUtilTest.MSG_LEN, 0, timestamp, 0, "", "key", 0, 0, null);
flatFile.appendIndexFile(request);
- request = new DispatchRequest(mq.getTopic(), mq.getQueueId(), MessageBufferUtilTest.MSG_LEN, MessageBufferUtilTest.MSG_LEN, 0, 0, 0, "", "key", 0, 0, null);
+ request = new DispatchRequest(mq.getTopic(), mq.getQueueId(), MessageBufferUtilTest.MSG_LEN, MessageBufferUtilTest.MSG_LEN, 0, timestamp + 1, 0, "", "key", 0, 0, null);
flatFile.appendIndexFile(request);
- request = new DispatchRequest(mq.getTopic(), mq.getQueueId(), MessageBufferUtilTest.MSG_LEN * 2, MessageBufferUtilTest.MSG_LEN, 0, 0, 0, "", "another-key", 0, 0, null);
+ request = new DispatchRequest(mq.getTopic(), mq.getQueueId(), MessageBufferUtilTest.MSG_LEN * 2, MessageBufferUtilTest.MSG_LEN, 0, timestamp + 2, 0, "", "another-key", 0, 0, null);
flatFile.appendIndexFile(request);
flatFile.commit(true);
- TieredIndexFile indexFile = TieredFlatFileManager.getIndexFile(storeConfig);
- indexFile.commit(true);
Assert.assertEquals(1, fetcher.queryMessageAsync(mq.getTopic(), "key", 1, 0, Long.MAX_VALUE).join().getMessageMapedList().size());
QueryMessageResult result = fetcher.queryMessageAsync(mq.getTopic(), "key", 32, 0, Long.MAX_VALUE).join();
Assert.assertEquals(2, result.getMessageMapedList().size());
- Assert.assertEquals(1, result.getMessageMapedList().get(0).getByteBuffer().getLong(MessageBufferUtil.QUEUE_OFFSET_POSITION));
- Assert.assertEquals(0, result.getMessageMapedList().get(1).getByteBuffer().getLong(MessageBufferUtil.QUEUE_OFFSET_POSITION));
+ Assert.assertEquals(0, result.getMessageMapedList().get(0).getByteBuffer().getLong(MessageBufferUtil.QUEUE_OFFSET_POSITION));
+ Assert.assertEquals(1, result.getMessageMapedList().get(1).getByteBuffer().getLong(MessageBufferUtil.QUEUE_OFFSET_POSITION));
}
}
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java
deleted file mode 100644
index 2da72bc7a70..00000000000
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.tieredstore.file;
-
-import com.sun.jna.Platform;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.time.Duration;
-import java.util.List;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.tieredstore.TieredStoreTestUtil;
-import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
-import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
-import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
-import org.awaitility.Awaitility;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-public class TieredIndexFileTest {
-
- private final String storePath = TieredStoreTestUtil.getRandomStorePath();
- private MessageQueue mq;
- private TieredMessageStoreConfig storeConfig;
-
- @Before
- public void setUp() {
- storeConfig = new TieredMessageStoreConfig();
- storeConfig.setBrokerName("IndexFileBroker");
- storeConfig.setStorePathRootDir(storePath);
- storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.provider.posix.PosixFileSegment");
- storeConfig.setTieredStoreIndexFileMaxHashSlotNum(5);
- storeConfig.setTieredStoreIndexFileMaxIndexNum(20);
- mq = new MessageQueue("IndexFileTest", storeConfig.getBrokerName(), 1);
- TieredStoreUtil.getMetadataStore(storeConfig);
- TieredStoreExecutor.init();
- }
-
- @After
- public void tearDown() throws IOException {
- TieredStoreTestUtil.destroyMetadataStore();
- TieredStoreTestUtil.destroyTempDir(storePath);
- TieredStoreExecutor.shutdown();
- }
-
- @Test
- public void testAppendAndQuery() throws IOException, ClassNotFoundException, NoSuchMethodException {
- if (Platform.isWindows()) {
- return;
- }
-
- TieredFileAllocator fileQueueFactory = new TieredFileAllocator(storeConfig);
- TieredIndexFile indexFile = new TieredIndexFile(fileQueueFactory, storePath);
-
- indexFile.append(mq, 0, "key3", 3, 300, 1000);
- indexFile.append(mq, 0, "key2", 2, 200, 1100);
- indexFile.append(mq, 0, "key1", 1, 100, 1200);
-
- // do not do schedule task here
- TieredStoreExecutor.shutdown();
- List> indexList =
- indexFile.queryAsync(mq.getTopic(), "key1", 1000, 1200).join();
- Assert.assertEquals(0, indexList.size());
-
- // do compaction once
- TieredStoreExecutor.init();
- storeConfig.setTieredStoreIndexFileRollingIdleInterval(0);
- indexFile.doScheduleTask();
- Awaitility.await().atMost(Duration.ofSeconds(10))
- .until(() -> !indexFile.getPreMappedFile().getFile().exists());
-
- indexList = indexFile.queryAsync(mq.getTopic(), "key1", 1000, 1200).join();
- Assert.assertEquals(1, indexList.size());
-
- indexFile.destroy();
- }
-}
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexItemTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexItemTest.java
new file mode 100644
index 00000000000..22ed4cc1803
--- /dev/null
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexItemTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tieredstore.index;
+
+import java.nio.ByteBuffer;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class IndexItemTest {
+
+ private final int topicId = 1;
+ private final int queueId = 2;
+ private final long offset = 3L;
+ private final int size = 4;
+ private final int hashCode = 5;
+ private final int timeDiff = 6;
+ private final int itemIndex = 7;
+
+ @Test
+ public void indexItemConstructorTest() {
+ IndexItem indexItem = new IndexItem(topicId, queueId, offset, size, hashCode, timeDiff, itemIndex);
+
+ Assert.assertEquals(topicId, indexItem.getTopicId());
+ Assert.assertEquals(queueId, indexItem.getQueueId());
+ Assert.assertEquals(offset, indexItem.getOffset());
+ Assert.assertEquals(size, indexItem.getSize());
+ Assert.assertEquals(hashCode, indexItem.getHashCode());
+ Assert.assertEquals(timeDiff, indexItem.getTimeDiff());
+ Assert.assertEquals(itemIndex, indexItem.getItemIndex());
+ }
+
+ @Test
+ public void byteBufferConstructorTest() {
+ ByteBuffer byteBuffer = ByteBuffer.allocate(IndexItem.INDEX_ITEM_SIZE);
+ byteBuffer.putInt(hashCode);
+ byteBuffer.putInt(topicId);
+ byteBuffer.putInt(queueId);
+ byteBuffer.putLong(offset);
+ byteBuffer.putInt(size);
+ byteBuffer.putInt(timeDiff);
+ byteBuffer.putInt(itemIndex);
+
+ byte[] bytes = byteBuffer.array();
+ IndexItem indexItem = new IndexItem(bytes);
+
+ Assert.assertEquals(topicId, indexItem.getTopicId());
+ Assert.assertEquals(queueId, indexItem.getQueueId());
+ Assert.assertEquals(offset, indexItem.getOffset());
+ Assert.assertEquals(size, indexItem.getSize());
+ Assert.assertEquals(hashCode, indexItem.getHashCode());
+ Assert.assertEquals(timeDiff, indexItem.getTimeDiff());
+ Assert.assertEquals(itemIndex, indexItem.getItemIndex());
+ Assert.assertNotNull(indexItem.toString());
+
+ Exception exception = null;
+ try {
+ new IndexItem(null);
+ } catch (Exception e) {
+ exception = e;
+ }
+ Assert.assertNotNull(exception);
+ }
+
+ @Test
+ public void getByteBufferTest() {
+ IndexItem indexItem = new IndexItem(topicId, queueId, offset, size, hashCode, timeDiff, itemIndex);
+ ByteBuffer byteBuffer = indexItem.getByteBuffer();
+ Assert.assertEquals(hashCode, byteBuffer.getInt(0));
+ Assert.assertEquals(topicId, byteBuffer.getInt(4));
+ Assert.assertEquals(queueId, byteBuffer.getInt(8));
+ Assert.assertEquals(offset, byteBuffer.getLong(12));
+ Assert.assertEquals(size, byteBuffer.getInt(20));
+ Assert.assertEquals(timeDiff, byteBuffer.getInt(24));
+ Assert.assertEquals(itemIndex, byteBuffer.getInt(28));
+ }
+}
\ No newline at end of file
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreFileTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreFileTest.java
new file mode 100644
index 00000000000..b9ffb7ce940
--- /dev/null
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreFileTest.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tieredstore.index;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.tieredstore.TieredStoreTestUtil;
+import org.apache.rocketmq.tieredstore.common.AppendResult;
+import org.apache.rocketmq.tieredstore.common.FileSegmentType;
+import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
+import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
+import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
+import org.apache.rocketmq.tieredstore.provider.posix.PosixFileSegment;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class IndexStoreFileTest {
+
+ private static final String TOPIC_NAME = "TopicTest";
+ private static final int TOPIC_ID = 123;
+ private static final int QUEUE_ID = 2;
+ private static final long MESSAGE_OFFSET = 666L;
+ private static final int MESSAGE_SIZE = 1024;
+ private static final String KEY = "MessageKey";
+ private static final Set KEY_SET = Collections.singleton(KEY);
+
+ private TieredMessageStoreConfig storeConfig;
+ private IndexStoreFile indexStoreFile;
+
+ @Before
+ public void init() throws IOException {
+ String filePath = Paths.get(System.getProperty("user.home"), "store_test", "index").toString();
+ TieredStoreExecutor.init();
+ storeConfig = new TieredMessageStoreConfig();
+ storeConfig.setTieredStoreFilePath(filePath);
+ storeConfig.setTieredStoreIndexFileMaxHashSlotNum(5);
+ storeConfig.setTieredStoreIndexFileMaxIndexNum(20);
+ storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.provider.posix.PosixFileSegment");
+ indexStoreFile = new IndexStoreFile(storeConfig, System.currentTimeMillis());
+ }
+
+ @After
+ public void shutdown() {
+ if (this.indexStoreFile != null) {
+ this.indexStoreFile.shutdown();
+ this.indexStoreFile.destroy();
+ }
+ TieredStoreTestUtil.destroyMetadataStore();
+ TieredStoreTestUtil.destroyTempDir(storeConfig.getTieredStoreFilePath());
+ TieredStoreExecutor.shutdown();
+ }
+
+ @Test
+ public void testIndexHeaderConstants() {
+ Assert.assertEquals(0, IndexStoreFile.INDEX_MAGIC_CODE);
+ Assert.assertEquals(4, IndexStoreFile.INDEX_BEGIN_TIME_STAMP);
+ Assert.assertEquals(12, IndexStoreFile.INDEX_END_TIME_STAMP);
+ Assert.assertEquals(20, IndexStoreFile.INDEX_SLOT_COUNT);
+ Assert.assertEquals(24, IndexStoreFile.INDEX_ITEM_INDEX);
+ Assert.assertEquals(28, IndexStoreFile.INDEX_HEADER_SIZE);
+ Assert.assertEquals(0xCCDDEEFF ^ 1880681586 + 4, IndexStoreFile.BEGIN_MAGIC_CODE);
+ Assert.assertEquals(0xCCDDEEFF ^ 1880681586 + 8, IndexStoreFile.END_MAGIC_CODE);
+ }
+
+ @Test
+ public void basicMethodTest() throws IOException {
+ long timestamp = System.currentTimeMillis();
+ IndexStoreFile localFile = new IndexStoreFile(storeConfig, timestamp);
+ Assert.assertEquals(timestamp, localFile.getTimestamp());
+
+ // test file status
+ Assert.assertEquals(IndexFile.IndexStatusEnum.UNSEALED, localFile.getFileStatus());
+ localFile.doCompaction();
+ Assert.assertEquals(IndexFile.IndexStatusEnum.SEALED, localFile.getFileStatus());
+
+ // test hash
+ Assert.assertEquals("TopicTest#MessageKey", localFile.buildKey(TOPIC_NAME, KEY));
+ Assert.assertEquals(638347386, indexStoreFile.hashCode(localFile.buildKey(TOPIC_NAME, KEY)));
+
+ // test calculate position
+ long headerSize = IndexStoreFile.INDEX_HEADER_SIZE;
+ Assert.assertEquals(headerSize + Long.BYTES * 2, indexStoreFile.getSlotPosition(2));
+ Assert.assertEquals(headerSize + Long.BYTES * 5, indexStoreFile.getSlotPosition(5));
+ Assert.assertEquals(headerSize + Long.BYTES * 5 + IndexItem.INDEX_ITEM_SIZE * 2,
+ indexStoreFile.getItemPosition(2));
+ Assert.assertEquals(headerSize + Long.BYTES * 5 + IndexItem.INDEX_ITEM_SIZE * 5,
+ indexStoreFile.getItemPosition(5));
+ }
+
+ @Test
+ public void basicPutGetTest() {
+ long timestamp = indexStoreFile.getTimestamp();
+
+ // check metadata
+ Assert.assertEquals(timestamp, indexStoreFile.getTimestamp());
+ Assert.assertEquals(0, indexStoreFile.getEndTimestamp());
+ Assert.assertEquals(0, indexStoreFile.getIndexItemCount());
+ Assert.assertEquals(0, indexStoreFile.getHashSlotCount());
+
+ // not put success
+ Assert.assertEquals(AppendResult.UNKNOWN_ERROR, indexStoreFile.putKey(
+ null, TOPIC_ID, QUEUE_ID, KEY_SET, MESSAGE_OFFSET, MESSAGE_SIZE, timestamp));
+ Assert.assertEquals(AppendResult.SUCCESS, indexStoreFile.putKey(
+ TOPIC_NAME, TOPIC_ID, QUEUE_ID, null, MESSAGE_OFFSET, MESSAGE_SIZE, timestamp));
+ Assert.assertEquals(AppendResult.SUCCESS, indexStoreFile.putKey(
+ TOPIC_NAME, TOPIC_ID, QUEUE_ID, Collections.emptySet(), MESSAGE_OFFSET, MESSAGE_SIZE, timestamp));
+
+ // first item is invalid
+ for (int i = 0; i < storeConfig.getTieredStoreIndexFileMaxIndexNum() - 2; i++) {
+ Assert.assertEquals(AppendResult.SUCCESS, indexStoreFile.putKey(
+ TOPIC_NAME, TOPIC_ID, QUEUE_ID, KEY_SET, MESSAGE_OFFSET, MESSAGE_SIZE, timestamp));
+ Assert.assertEquals(timestamp, indexStoreFile.getTimestamp());
+ Assert.assertEquals(timestamp, indexStoreFile.getEndTimestamp());
+ Assert.assertEquals(1, indexStoreFile.getHashSlotCount());
+ Assert.assertEquals(i + 1, indexStoreFile.getIndexItemCount());
+ }
+
+ Assert.assertEquals(AppendResult.SUCCESS, indexStoreFile.putKey(
+ TOPIC_NAME, TOPIC_ID, QUEUE_ID, KEY_SET, MESSAGE_OFFSET, MESSAGE_SIZE, timestamp));
+ Assert.assertEquals(AppendResult.FILE_FULL, indexStoreFile.putKey(
+ TOPIC_NAME, TOPIC_ID, QUEUE_ID, KEY_SET, MESSAGE_OFFSET, MESSAGE_SIZE, timestamp));
+
+ Assert.assertEquals(timestamp, indexStoreFile.getTimestamp());
+ Assert.assertEquals(timestamp, indexStoreFile.getEndTimestamp());
+ Assert.assertEquals(1, indexStoreFile.getHashSlotCount());
+ Assert.assertEquals(storeConfig.getTieredStoreIndexFileMaxIndexNum() - 1, indexStoreFile.getIndexItemCount());
+ }
+
+ @Test
+ public void differentKeyPutTest() {
+ long timestamp = indexStoreFile.getTimestamp();
+ for (int i = 0; i < 5; i++) {
+ for (int j = 0; j < 3; j++) {
+ Assert.assertEquals(AppendResult.SUCCESS, indexStoreFile.putKey(
+ TOPIC_NAME + i, TOPIC_ID, QUEUE_ID, KEY_SET, MESSAGE_OFFSET, MESSAGE_SIZE, timestamp));
+ }
+ }
+ Assert.assertEquals(timestamp, indexStoreFile.getTimestamp());
+ Assert.assertEquals(timestamp, indexStoreFile.getEndTimestamp());
+ Assert.assertEquals(5, indexStoreFile.getHashSlotCount());
+ Assert.assertEquals(5 * 3, indexStoreFile.getIndexItemCount());
+ }
+
+ @Test
+ public void concurrentPutTest() throws InterruptedException {
+ long timestamp = indexStoreFile.getTimestamp();
+
+ ExecutorService executorService = Executors.newFixedThreadPool(
+ 4, new ThreadFactoryImpl("ConcurrentPutGetTest"));
+
+ // first item is invalid
+ int indexCount = storeConfig.getTieredStoreIndexFileMaxIndexNum() - 1;
+ CountDownLatch latch = new CountDownLatch(indexCount);
+ for (int i = 0; i < indexCount; i++) {
+ executorService.submit(() -> {
+ Assert.assertEquals(AppendResult.SUCCESS, indexStoreFile.putKey(
+ TOPIC_NAME, TOPIC_ID, QUEUE_ID, KEY_SET, MESSAGE_OFFSET, MESSAGE_SIZE, timestamp));
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException ignored) {
+ }
+ latch.countDown();
+ });
+ }
+ latch.await();
+
+ executorService.shutdown();
+ Assert.assertEquals(AppendResult.FILE_FULL, indexStoreFile.putKey(
+ TOPIC_NAME, TOPIC_ID, QUEUE_ID, KEY_SET, MESSAGE_OFFSET, MESSAGE_SIZE, timestamp));
+ Assert.assertEquals(indexCount, indexStoreFile.getIndexItemCount());
+ }
+
+ @Test
+ public void recoverFileTest() throws IOException {
+ int indexCount = 10;
+ long timestamp = indexStoreFile.getTimestamp();
+ for (int i = 0; i < indexCount; i++) {
+ Assert.assertEquals(AppendResult.SUCCESS, indexStoreFile.putKey(
+ TOPIC_NAME, TOPIC_ID, QUEUE_ID, KEY_SET, MESSAGE_OFFSET, MESSAGE_SIZE, timestamp));
+ }
+ indexStoreFile.shutdown();
+ Assert.assertEquals(indexCount, indexStoreFile.getIndexItemCount());
+ indexStoreFile = new IndexStoreFile(storeConfig, timestamp);
+ Assert.assertEquals(indexCount, indexStoreFile.getIndexItemCount());
+ }
+
+ @Test
+ public void doCompactionTest() throws Exception {
+ long timestamp = indexStoreFile.getTimestamp();
+ indexStoreFile = new IndexStoreFile(storeConfig, System.currentTimeMillis());
+ for (int i = 0; i < 10; i++) {
+ Assert.assertEquals(AppendResult.SUCCESS, indexStoreFile.putKey(
+ TOPIC_NAME, TOPIC_ID, QUEUE_ID, KEY_SET, MESSAGE_OFFSET, MESSAGE_SIZE, timestamp));
+ }
+
+ ByteBuffer byteBuffer = indexStoreFile.doCompaction();
+ TieredFileSegment fileSegment = new PosixFileSegment(
+ storeConfig, FileSegmentType.INDEX, "store_index", 0L);
+ fileSegment.append(byteBuffer, timestamp);
+ fileSegment.commit();
+ Assert.assertEquals(byteBuffer.limit(), fileSegment.getSize());
+ fileSegment.destroyFile();
+ }
+
+ @Test
+ public void queryAsyncFromUnsealedFileTest() throws Exception {
+ long timestamp = indexStoreFile.getTimestamp();
+ for (int i = 0; i < 5; i++) {
+ for (int j = 0; j < 3; j++) {
+ Assert.assertEquals(AppendResult.SUCCESS, indexStoreFile.putKey(TOPIC_NAME + i,
+ TOPIC_ID, QUEUE_ID, KEY_SET, MESSAGE_OFFSET, MESSAGE_SIZE, System.currentTimeMillis()));
+ }
+ }
+ List itemList = indexStoreFile.queryAsync(
+ TOPIC_NAME + "1", KEY, 64, timestamp, System.currentTimeMillis()).get();
+ Assert.assertEquals(3, itemList.size());
+ }
+
+ @Test
+ public void queryAsyncFromSegmentFileTest() throws ExecutionException, InterruptedException {
+ long timestamp = indexStoreFile.getTimestamp();
+ for (int i = 0; i < 5; i++) {
+ for (int j = 0; j < 3; j++) {
+ Assert.assertEquals(AppendResult.SUCCESS, indexStoreFile.putKey(TOPIC_NAME + i,
+ TOPIC_ID, QUEUE_ID, KEY_SET, MESSAGE_OFFSET, MESSAGE_SIZE, System.currentTimeMillis()));
+ }
+ }
+
+ ByteBuffer byteBuffer = indexStoreFile.doCompaction();
+ TieredFileSegment fileSegment = new PosixFileSegment(
+ storeConfig, FileSegmentType.INDEX, "store_index", 0L);
+ fileSegment.append(byteBuffer, timestamp);
+ fileSegment.commit();
+ Assert.assertEquals(byteBuffer.limit(), fileSegment.getSize());
+ indexStoreFile.destroy();
+
+ indexStoreFile = new IndexStoreFile(storeConfig, fileSegment);
+
+ // change topic
+ List itemList = indexStoreFile.queryAsync(
+ TOPIC_NAME, KEY, 64, timestamp, System.currentTimeMillis()).get();
+ Assert.assertEquals(0, itemList.size());
+
+ // change key
+ itemList = indexStoreFile.queryAsync(
+ TOPIC_NAME, KEY + "1", 64, timestamp, System.currentTimeMillis()).get();
+ Assert.assertEquals(0, itemList.size());
+
+ itemList = indexStoreFile.queryAsync(
+ TOPIC_NAME + "1", KEY, 64, timestamp, System.currentTimeMillis()).get();
+ Assert.assertEquals(3, itemList.size());
+ }
+}
\ No newline at end of file
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceBenchTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceBenchTest.java
new file mode 100644
index 00000000000..23ac8ed845f
--- /dev/null
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceBenchTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tieredstore.index;
+
+import com.google.common.base.Stopwatch;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAdder;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.tieredstore.common.AppendResult;
+import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
+import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
+import org.apache.rocketmq.tieredstore.file.TieredFileAllocator;
+import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
+import org.junit.Assert;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.results.format.ResultFormatType;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+@State(Scope.Benchmark)
+@Fork(value = 1, jvmArgs = {"-Djava.net.preferIPv4Stack=true", "-Djmh.rmi.port=1099"})
+public class IndexStoreServiceBenchTest {
+
+ private static final Logger log = LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME);
+ private static final String TOPIC_NAME = "TopicTest";
+ private TieredMessageStoreConfig storeConfig;
+ private IndexStoreService indexStoreService;
+ private final LongAdder failureCount = new LongAdder();
+
+ @Setup
+ public void init() throws ClassNotFoundException, NoSuchMethodException {
+ String storePath = Paths.get(System.getProperty("user.home"), "store_test", "index").toString();
+ UtilAll.deleteFile(new File(storePath));
+ UtilAll.deleteFile(new File("./e96d41b2_IndexService"));
+ storeConfig = new TieredMessageStoreConfig();
+ storeConfig.setBrokerClusterName("IndexService");
+ storeConfig.setBrokerName("IndexServiceBroker");
+ storeConfig.setStorePathRootDir(storePath);
+ storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.provider.posix.PosixFileSegment");
+ storeConfig.setTieredStoreIndexFileMaxHashSlotNum(500 * 1000);
+ storeConfig.setTieredStoreIndexFileMaxIndexNum(2000 * 1000);
+ TieredStoreUtil.getMetadataStore(storeConfig);
+ TieredStoreExecutor.init();
+ TieredFileAllocator tieredFileAllocator = new TieredFileAllocator(storeConfig);
+ indexStoreService = new IndexStoreService(tieredFileAllocator, storePath);
+ indexStoreService.start();
+ }
+
+ @TearDown
+ public void shutdown() throws IOException {
+ indexStoreService.shutdown();
+ indexStoreService.destroy();
+ TieredStoreExecutor.shutdown();
+ }
+
+ //@Benchmark
+ @Threads(2)
+ @BenchmarkMode(Mode.Throughput)
+ @OutputTimeUnit(TimeUnit.SECONDS)
+ @Warmup(iterations = 1, time = 1)
+ @Measurement(iterations = 1, time = 1)
+ public void doPutThroughputBenchmark() {
+ for (int i = 0; i < 100; i++) {
+ AppendResult result = indexStoreService.putKey(
+ TOPIC_NAME, 123, 2, Collections.singleton(String.valueOf(i)),
+ i * 100L, i * 100, System.currentTimeMillis());
+ if (AppendResult.SUCCESS.equals(result)) {
+ failureCount.increment();
+ }
+ }
+ }
+
+ @Threads(1)
+ @BenchmarkMode(Mode.AverageTime)
+ @OutputTimeUnit(TimeUnit.SECONDS)
+ @Warmup(iterations = 0)
+ @Measurement(iterations = 1, time = 1)
+ public void doGetThroughputBenchmark() throws ExecutionException, InterruptedException {
+ for (int j = 0; j < 10; j++) {
+ for (int i = 0; i < storeConfig.getTieredStoreIndexFileMaxIndexNum(); i++) {
+ indexStoreService.putKey(
+ "TopicTest", 123, j, Collections.singleton(String.valueOf(i)),
+ i * 100L, i * 100, System.currentTimeMillis());
+ }
+ }
+
+ int queryCount = 100 * 10000;
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ for (int i = 0; i < queryCount; i++) {
+ List indexItems = indexStoreService.queryAsync(TOPIC_NAME, String.valueOf(i),
+ 20, 0, System.currentTimeMillis()).get();
+ Assert.assertEquals(10, indexItems.size());
+
+ List indexItems2 = indexStoreService.queryAsync(TOPIC_NAME, String.valueOf(i),
+ 5, 0, System.currentTimeMillis()).get();
+ Assert.assertEquals(5, indexItems2.size());
+ }
+ log.info("DoGetThroughputBenchmark test cost: {}ms", stopwatch.elapsed(TimeUnit.MILLISECONDS));
+ }
+
+ public static void main(String[] args) throws Exception {
+ Options opt = new OptionsBuilder()
+ .include(IndexStoreServiceBenchTest.class.getSimpleName())
+ .warmupIterations(0)
+ .measurementIterations(1)
+ .result("result.json")
+ .resultFormat(ResultFormatType.JSON)
+ .build();
+ new Runner(opt).run();
+ }
+}
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceTest.java
new file mode 100644
index 00000000000..1a6966abd1d
--- /dev/null
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceTest.java
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tieredstore.index;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.tieredstore.common.AppendResult;
+import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
+import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
+import org.apache.rocketmq.tieredstore.file.TieredFileAllocator;
+import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.awaitility.Awaitility.await;
+
+public class IndexStoreServiceTest {
+
+ private static final Logger log = LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME);
+
+ private static final String TOPIC_NAME = "TopicTest";
+ private static final int TOPIC_ID = 123;
+ private static final int QUEUE_ID = 2;
+ private static final long MESSAGE_OFFSET = 666;
+ private static final int MESSAGE_SIZE = 1024;
+ private static final Set KEY_SET = Collections.singleton("MessageKey");
+
+ private String filePath;
+ private TieredMessageStoreConfig storeConfig;
+ private TieredFileAllocator fileAllocator;
+ private IndexStoreService indexService;
+
+ @Before
+ public void init() throws IOException, ClassNotFoundException, NoSuchMethodException {
+ TieredStoreExecutor.init();
+ filePath = Paths.get(System.getProperty("user.home"), "store_test",
+ UUID.randomUUID().toString().replace("-", "").substring(0, 8)).toString();
+ storeConfig = new TieredMessageStoreConfig();
+ storeConfig.setStorePathRootDir(filePath);
+ storeConfig.setTieredStoreIndexFileMaxHashSlotNum(5);
+ storeConfig.setTieredStoreIndexFileMaxIndexNum(20);
+ storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.provider.posix.PosixFileSegment");
+ fileAllocator = new TieredFileAllocator(storeConfig);
+ }
+
+ @After
+ public void shutdown() {
+ if (indexService != null) {
+ indexService.shutdown();
+ indexService.destroy();
+ }
+ TieredStoreExecutor.shutdown();
+ }
+
+ @Test
+ public void basicServiceTest() throws InterruptedException {
+ indexService = new IndexStoreService(fileAllocator, filePath);
+ for (int i = 0; i < 50; i++) {
+ Assert.assertEquals(AppendResult.SUCCESS, indexService.putKey(
+ TOPIC_NAME, TOPIC_ID, QUEUE_ID, KEY_SET, i * 100, MESSAGE_SIZE, System.currentTimeMillis()));
+ TimeUnit.MILLISECONDS.sleep(1);
+ }
+ ConcurrentSkipListMap timeStoreTable = indexService.getTimeStoreTable();
+ Assert.assertEquals(3, timeStoreTable.size());
+ }
+
+ @Test
+ public void doConvertOldFormatTest() {
+ indexService = new IndexStoreService(fileAllocator, filePath);
+ long timestamp = indexService.getTimeStoreTable().firstKey();
+ Assert.assertEquals(AppendResult.SUCCESS, indexService.putKey(
+ TOPIC_NAME, TOPIC_ID, QUEUE_ID, KEY_SET, MESSAGE_OFFSET, MESSAGE_SIZE, timestamp));
+ indexService.shutdown();
+
+ File file = new File(Paths.get(filePath, IndexStoreService.FILE_DIRECTORY_NAME, String.valueOf(timestamp)).toString());
+ log.info(filePath);
+ log.info(file.getPath());
+ Assert.assertTrue(file.exists());
+
+ Assert.assertTrue(file.renameTo(new File(file.getParent(), "0000")));
+ file = new File(Paths.get(file.getParent(), "0000").toString());
+ Assert.assertTrue(file.exists());
+
+ indexService = new IndexStoreService(fileAllocator, filePath);
+ ConcurrentSkipListMap timeStoreTable = indexService.getTimeStoreTable();
+ Assert.assertEquals(1, timeStoreTable.size());
+ Assert.assertEquals(Long.valueOf(timestamp), timeStoreTable.firstKey());
+ }
+
+ @Test
+ public void concurrentPutTest() throws InterruptedException {
+ ExecutorService executorService = Executors.newFixedThreadPool(
+ 4, new ThreadFactoryImpl("ConcurrentPutTest"));
+ storeConfig.setTieredStoreIndexFileMaxHashSlotNum(500);
+ storeConfig.setTieredStoreIndexFileMaxIndexNum(2000);
+ indexService = new IndexStoreService(fileAllocator, filePath);
+ long timestamp = System.currentTimeMillis();
+
+ // first item is invalid
+ AtomicInteger success = new AtomicInteger();
+ int indexCount = 5000;
+ CountDownLatch latch = new CountDownLatch(indexCount);
+ for (int i = 0; i < indexCount; i++) {
+ final int index = i;
+ executorService.submit(() -> {
+ try {
+ AppendResult result = indexService.putKey(
+ TOPIC_NAME, TOPIC_ID, QUEUE_ID, Collections.singleton(String.valueOf(index)),
+ index * 100, MESSAGE_SIZE, timestamp + index);
+ if (AppendResult.SUCCESS.equals(result)) {
+ success.incrementAndGet();
+ }
+ } catch (Exception e) {
+ log.error("ConcurrentPutTest error", e);
+ } finally {
+ latch.countDown();
+ }
+ });
+ }
+ Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
+ Assert.assertEquals(3, indexService.getTimeStoreTable().size());
+ executorService.shutdown();
+ }
+
+ @Test
+ public void doCompactionTest() throws InterruptedException {
+ concurrentPutTest();
+ IndexFile indexFile = indexService.getNextSealedFile();
+ Assert.assertEquals(IndexFile.IndexStatusEnum.SEALED, indexFile.getFileStatus());
+
+ indexService.doCompactThenUploadFile(indexFile);
+ indexService.setCompactTimestamp(indexFile.getTimestamp());
+ indexFile.destroy();
+
+ List files = new ArrayList<>(indexService.getTimeStoreTable().values());
+ Assert.assertEquals(IndexFile.IndexStatusEnum.UPLOAD, files.get(0).getFileStatus());
+ Assert.assertEquals(IndexFile.IndexStatusEnum.SEALED, files.get(1).getFileStatus());
+ Assert.assertEquals(IndexFile.IndexStatusEnum.UNSEALED, files.get(2).getFileStatus());
+
+ indexFile = indexService.getNextSealedFile();
+ indexService.doCompactThenUploadFile(indexFile);
+ indexService.setCompactTimestamp(indexFile.getTimestamp());
+ files = new ArrayList<>(indexService.getTimeStoreTable().values());
+ Assert.assertEquals(IndexFile.IndexStatusEnum.UPLOAD, files.get(0).getFileStatus());
+ Assert.assertEquals(IndexFile.IndexStatusEnum.UPLOAD, files.get(1).getFileStatus());
+ Assert.assertEquals(IndexFile.IndexStatusEnum.UNSEALED, files.get(2).getFileStatus());
+
+ indexFile = indexService.getNextSealedFile();
+ Assert.assertNull(indexFile);
+ files = new ArrayList<>(indexService.getTimeStoreTable().values());
+ Assert.assertEquals(IndexFile.IndexStatusEnum.UPLOAD, files.get(0).getFileStatus());
+ Assert.assertEquals(IndexFile.IndexStatusEnum.UPLOAD, files.get(1).getFileStatus());
+ Assert.assertEquals(IndexFile.IndexStatusEnum.UNSEALED, files.get(2).getFileStatus());
+ }
+
+ @Test
+ public void runServiceTest() throws InterruptedException {
+ concurrentPutTest();
+ indexService.start();
+ await().atMost(Duration.ofMinutes(1)).pollInterval(Duration.ofSeconds(1)).until(() -> {
+ boolean result = true;
+ ArrayList files = new ArrayList<>(indexService.getTimeStoreTable().values());
+ result &= IndexFile.IndexStatusEnum.UPLOAD.equals(files.get(0).getFileStatus());
+ result &= IndexFile.IndexStatusEnum.UPLOAD.equals(files.get(1).getFileStatus());
+ result &= IndexFile.IndexStatusEnum.UNSEALED.equals(files.get(2).getFileStatus());
+ return result;
+ });
+ }
+
+ @Test
+ public void restartServiceTest() throws InterruptedException {
+ indexService = new IndexStoreService(fileAllocator, filePath);
+ for (int i = 0; i < 20; i++) {
+ AppendResult result = indexService.putKey(
+ TOPIC_NAME, TOPIC_ID, QUEUE_ID, Collections.singleton(String.valueOf(i)),
+ i * 100L, MESSAGE_SIZE, System.currentTimeMillis());
+ Assert.assertEquals(AppendResult.SUCCESS, result);
+ TimeUnit.MILLISECONDS.sleep(1);
+ }
+ long timestamp = indexService.getTimeStoreTable().firstKey();
+ indexService.shutdown();
+ indexService = new IndexStoreService(fileAllocator, filePath);
+ Assert.assertEquals(timestamp, indexService.getTimeStoreTable().firstKey().longValue());
+
+ indexService.start();
+ await().atMost(Duration.ofMinutes(1)).pollInterval(Duration.ofSeconds(1)).until(() -> {
+ ArrayList files = new ArrayList<>(indexService.getTimeStoreTable().values());
+ return IndexFile.IndexStatusEnum.UPLOAD.equals(files.get(0).getFileStatus());
+ });
+ indexService.shutdown();
+
+ indexService = new IndexStoreService(fileAllocator, filePath);
+ Assert.assertEquals(timestamp, indexService.getTimeStoreTable().firstKey().longValue());
+ Assert.assertEquals(2, indexService.getTimeStoreTable().size());
+ Assert.assertEquals(IndexFile.IndexStatusEnum.UPLOAD,
+ indexService.getTimeStoreTable().firstEntry().getValue().getFileStatus());
+ }
+
+ @Test
+ public void queryFromFileTest() throws InterruptedException, ExecutionException {
+ long timestamp = System.currentTimeMillis();
+ indexService = new IndexStoreService(fileAllocator, filePath);
+
+ // three files, echo contains 19 items
+ for (int i = 0; i < 3; i++) {
+ for (int j = 0; j < 20 - 1; j++) {
+ AppendResult result = indexService.putKey(
+ TOPIC_NAME, TOPIC_ID, QUEUE_ID, Collections.singleton(String.valueOf(j)),
+ i * 100L + j, MESSAGE_SIZE, System.currentTimeMillis());
+ Assert.assertEquals(AppendResult.SUCCESS, result);
+ TimeUnit.MILLISECONDS.sleep(1);
+ }
+ }
+
+ ArrayList files = new ArrayList<>(indexService.getTimeStoreTable().values());
+ Assert.assertEquals(3, files.size());
+
+ for (int i = 0; i < 3; i++) {
+ List indexItems = indexService.queryAsync(
+ TOPIC_NAME, String.valueOf(1), 1, timestamp, System.currentTimeMillis()).get();
+ Assert.assertEquals(1, indexItems.size());
+
+ indexItems = indexService.queryAsync(
+ TOPIC_NAME, String.valueOf(1), 3, timestamp, System.currentTimeMillis()).get();
+ Assert.assertEquals(3, indexItems.size());
+
+ indexItems = indexService.queryAsync(
+ TOPIC_NAME, String.valueOf(1), 5, timestamp, System.currentTimeMillis()).get();
+ Assert.assertEquals(3, indexItems.size());
+ }
+ }
+
+ @Test
+ public void concurrentGetTest() throws InterruptedException {
+ storeConfig.setTieredStoreIndexFileMaxIndexNum(2000);
+ indexService = new IndexStoreService(fileAllocator, filePath);
+ indexService.start();
+
+ int fileCount = 10;
+ for (int j = 0; j < fileCount; j++) {
+ for (int i = 0; i < storeConfig.getTieredStoreIndexFileMaxIndexNum(); i++) {
+ indexService.putKey(TOPIC_NAME, TOPIC_ID, j, Collections.singleton(String.valueOf(i)),
+ i * 100L, i * 100, System.currentTimeMillis());
+ }
+ TimeUnit.MILLISECONDS.sleep(1);
+ }
+
+ CountDownLatch latch = new CountDownLatch(fileCount * 3);
+ AtomicBoolean result = new AtomicBoolean(true);
+ ExecutorService executorService = Executors.newFixedThreadPool(
+ 4, new ThreadFactoryImpl("ConcurrentGetTest"));
+
+ for (int i = 0; i < fileCount; i++) {
+ int finalI = i;
+ executorService.submit(() -> {
+ for (int j = 1; j <= 3; j++) {
+ try {
+ List indexItems = indexService.queryAsync(
+ TOPIC_NAME, String.valueOf(finalI), j * 5, 0, System.currentTimeMillis()).get();
+ if (Math.min(fileCount, j * 5) != indexItems.size()) {
+ result.set(false);
+ }
+ } catch (Exception e) {
+ result.set(false);
+ } finally {
+ latch.countDown();
+ }
+ }
+ });
+ }
+
+ Assert.assertTrue(latch.await(15, TimeUnit.SECONDS));
+ executorService.shutdown();
+ Assert.assertTrue(result.get());
+ }
+}
\ No newline at end of file
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java
index a413f2113e1..68277cacc5e 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java
@@ -135,7 +135,6 @@ public static void verifyMockedMessageBuffer(ByteBuffer buffer, int phyOffset) {
Assert.assertEquals("uservalue0", properties.get("userkey"));
}
-
@Test
public void testGetTotalSize() {
ByteBuffer buffer = buildMockedMessageBuffer();
diff --git a/tieredstore/src/test/resources/rmq.logback-test.xml b/tieredstore/src/test/resources/rmq.logback-test.xml
index a7933b5efb6..ac0895e05e4 100644
--- a/tieredstore/src/test/resources/rmq.logback-test.xml
+++ b/tieredstore/src/test/resources/rmq.logback-test.xml
@@ -19,11 +19,22 @@
- %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
+ ${CONSOLE_LOG_PATTERN}
+
+
-
+
+
+
+
+
+
+
+
+