Skip to content

Commit

Permalink
[ISSUE #7545] [RIP-65] Support efficient random index for massive mes…
Browse files Browse the repository at this point in the history
…sages
  • Loading branch information
lizhimins committed Nov 8, 2023
1 parent ead3d90 commit 810ca7e
Show file tree
Hide file tree
Showing 24 changed files with 2,004 additions and 698 deletions.
2 changes: 1 addition & 1 deletion style/spotbugs-suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
<Bug pattern="RV_ABSOLUTE_VALUE_OF_HASHCODE"/>
</Match>
<Match>
<Class name="org.apache.rocketmq.tieredstore.file.TieredIndexFile"/>
<Class name="org.apache.rocketmq.tieredstore.index.TieredIndexFile"/>
<Method name="indexKeyHashMethod" />
<Bug pattern="RV_ABSOLUTE_VALUE_OF_HASHCODE"/>
</Match>
Expand Down
14 changes: 14 additions & 0 deletions tieredstore/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 +53,19 @@
<artifactId>commons-io</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-core</artifactId>
<version>1.36</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-generator-annprocess</artifactId>
<version>1.36</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
Expand All @@ -50,15 +51,15 @@
import org.apache.rocketmq.tieredstore.file.CompositeQueueFlatFile;
import org.apache.rocketmq.tieredstore.file.TieredConsumeQueue;
import org.apache.rocketmq.tieredstore.file.TieredFlatFileManager;
import org.apache.rocketmq.tieredstore.file.TieredIndexFile;
import org.apache.rocketmq.tieredstore.index.IndexItem;
import org.apache.rocketmq.tieredstore.index.IndexService;
import org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore;
import org.apache.rocketmq.tieredstore.metadata.TopicMetadata;
import org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant;
import org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsManager;
import org.apache.rocketmq.tieredstore.util.CQItemBufferUtil;
import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
import org.apache.rocketmq.common.BoundaryType;

public class TieredMessageFetcher implements MessageStoreFetcher {

Expand Down Expand Up @@ -555,85 +556,51 @@ public long getOffsetInQueueByTime(String topic, int queueId, long timestamp, Bo
public CompletableFuture<QueryMessageResult> queryMessageAsync(
String topic, String key, int maxCount, long begin, long end) {

TieredIndexFile indexFile = TieredFlatFileManager.getIndexFile(storeConfig);
IndexService indexStoreService = TieredFlatFileManager.getTieredIndexService(storeConfig);

int hashCode = TieredIndexFile.indexKeyHashMethod(TieredIndexFile.buildKey(topic, key));
long topicId;
try {
TopicMetadata topicMetadata = metadataStore.getTopic(topic);
if (topicMetadata == null) {
LOGGER.info("TieredMessageFetcher#queryMessageAsync, topic metadata not found, topic: {}", topic);
LOGGER.info("MessageFetcher#queryMessageAsync, topic metadata not found, topic: {}", topic);
return CompletableFuture.completedFuture(new QueryMessageResult());
}
topicId = topicMetadata.getTopicId();
} catch (Exception e) {
LOGGER.error("TieredMessageFetcher#queryMessageAsync, get topic id failed, topic: {}", topic, e);
LOGGER.error("MessageFetcher#queryMessageAsync, get topic id failed, topic: {}", topic, e);
return CompletableFuture.completedFuture(new QueryMessageResult());
}

return indexFile.queryAsync(topic, key, begin, end)
.thenCompose(indexBufferList -> {
QueryMessageResult result = new QueryMessageResult();
int resultCount = 0;
List<CompletableFuture<Void>> futureList = new ArrayList<>(maxCount);
for (Pair<Long, ByteBuffer> pair : indexBufferList) {
Long fileBeginTimestamp = pair.getKey();
ByteBuffer indexBuffer = pair.getValue();

if (indexBuffer.remaining() % TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE != 0) {
LOGGER.error("[Bug] TieredMessageFetcher#queryMessageAsync: " +
"index buffer size {} is not multiple of index item size {}",
indexBuffer.remaining(), TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE);
continue;
}

for (int indexOffset = indexBuffer.position();
indexOffset < indexBuffer.limit();
indexOffset += TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE) {

int indexItemHashCode = indexBuffer.getInt(indexOffset);
if (indexItemHashCode != hashCode) {
continue;
}

int indexItemTopicId = indexBuffer.getInt(indexOffset + 4);
if (indexItemTopicId != topicId) {
continue;
}

int queueId = indexBuffer.getInt(indexOffset + 4 + 4);
CompositeFlatFile flatFile =
flatFileManager.getFlatFile(new MessageQueue(topic, brokerName, queueId));
if (flatFile == null) {
continue;
}

// decode index item
long offset = indexBuffer.getLong(indexOffset + 4 + 4 + 4);
int size = indexBuffer.getInt(indexOffset + 4 + 4 + 4 + 8);
int timeDiff = indexBuffer.getInt(indexOffset + 4 + 4 + 4 + 8 + 4);
long indexTimestamp = fileBeginTimestamp + timeDiff;
if (indexTimestamp < begin || indexTimestamp > end) {
continue;
}
CompletableFuture<List<IndexItem>> future = indexStoreService.queryAsync(topic, key, maxCount, begin, end);

CompletableFuture<Void> getMessageFuture = flatFile.getCommitLogAsync(offset, size)
.thenAccept(messageBuffer -> result.addMessage(
new SelectMappedBufferResult(0, messageBuffer, size, null)));
futureList.add(getMessageFuture);

resultCount++;
if (resultCount >= maxCount) {
break;
}
}

if (resultCount >= maxCount) {
break;
}
return future.thenCompose(indexItemList -> {
QueryMessageResult result = new QueryMessageResult();
List<CompletableFuture<Void>> futureList = new ArrayList<>(maxCount);
for (IndexItem indexItem : indexItemList) {
if (topicId != indexItem.getTopicId()) {
continue;
}
return CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0]))
.thenApply(v -> result);
});
CompositeFlatFile flatFile =
flatFileManager.getFlatFile(new MessageQueue(topic, brokerName, indexItem.getQueueId()));
if (flatFile == null) {
continue;
}
CompletableFuture<Void> getMessageFuture = flatFile
.getCommitLogAsync(indexItem.getOffset(), indexItem.getSize())
.thenAccept(messageBuffer -> result.addMessage(
new SelectMappedBufferResult(
indexItem.getOffset(), messageBuffer, indexItem.getSize(), null)));
futureList.add(getMessageFuture);
if (futureList.size() >= maxCount) {
break;
}
}
return CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).thenApply(v -> result);
}).whenComplete((result, throwable) -> {
if (result != null) {
LOGGER.info("MessageFetcher#queryMessageAsync, query result: {}, topic: {}, topicId: {}, key: {}, maxCount: {}, timestamp: {}-{}",
result.getMessageBufferList().size(), topic, topicId, key, maxCount, begin, end);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@

package org.apache.rocketmq.tieredstore.file;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.store.DispatchRequest;
import org.apache.rocketmq.tieredstore.common.AppendResult;
import org.apache.rocketmq.tieredstore.index.IndexService;
import org.apache.rocketmq.tieredstore.metadata.QueueMetadata;
import org.apache.rocketmq.tieredstore.metadata.TopicMetadata;
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
Expand All @@ -31,13 +35,13 @@ public class CompositeQueueFlatFile extends CompositeFlatFile {
private final MessageQueue messageQueue;
private long topicSequenceNumber;
private QueueMetadata queueMetadata;
private final TieredIndexFile indexFile;
private final IndexService indexStoreService;

public CompositeQueueFlatFile(TieredFileAllocator fileQueueFactory, MessageQueue messageQueue) {
super(fileQueueFactory, TieredStoreUtil.toPath(messageQueue));
this.messageQueue = messageQueue;
this.recoverQueueMetadata();
this.indexFile = TieredFlatFileManager.getIndexFile(storeConfig);
this.indexStoreService = TieredFlatFileManager.getTieredIndexService(storeConfig);
}

@Override
Expand Down Expand Up @@ -85,24 +89,15 @@ public AppendResult appendIndexFile(DispatchRequest request) {
return AppendResult.FILE_CLOSED;
}

Set<String> keySet = new HashSet<>(
Arrays.asList(request.getKeys().split(MessageConst.KEY_SEPARATOR)));
if (StringUtils.isNotBlank(request.getUniqKey())) {
AppendResult result = indexFile.append(messageQueue, (int) topicSequenceNumber,
request.getUniqKey(), request.getCommitLogOffset(), request.getMsgSize(), request.getStoreTimestamp());
if (result != AppendResult.SUCCESS) {
return result;
}
keySet.add(request.getUniqKey());
}

for (String key : request.getKeys().split(MessageConst.KEY_SEPARATOR)) {
if (StringUtils.isNotBlank(key)) {
AppendResult result = indexFile.append(messageQueue, (int) topicSequenceNumber,
key, request.getCommitLogOffset(), request.getMsgSize(), request.getStoreTimestamp());
if (result != AppendResult.SUCCESS) {
return result;
}
}
}
return AppendResult.SUCCESS;
return indexStoreService.putKey(
messageQueue.getTopic(), (int) topicSequenceNumber, messageQueue.getQueueId(), keySet,
request.getCommitLogOffset(), request.getMsgSize(), request.getStoreTimestamp());
}

public MessageQueue getMessageQueue() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.tieredstore.common.AppendResult;
import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
import org.apache.rocketmq.common.BoundaryType;

public class TieredConsumeQueue {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ public FileSegmentType getFileType() {
return fileType;
}

@VisibleForTesting
public List<TieredFileSegment> getFileSegmentList() {
return fileSegmentList;
}
Expand Down Expand Up @@ -274,7 +273,7 @@ public int getFileSegmentCount() {
}

@Nullable
protected TieredFileSegment getFileByIndex(int index) {
public TieredFileSegment getFileByIndex(int index) {
fileSegmentLock.readLock().lock();
try {
if (index < fileSegmentList.size()) {
Expand Down Expand Up @@ -354,7 +353,7 @@ protected TieredFileSegment getFileByTime(long timestamp, BoundaryType boundaryT
}
}

protected List<TieredFileSegment> getFileListByTime(long beginTime, long endTime) {
public List<TieredFileSegment> getFileListByTime(long beginTime, long endTime) {
fileSegmentLock.readLock().lock();
try {
return fileSegmentList.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
import org.apache.rocketmq.tieredstore.index.IndexService;
import org.apache.rocketmq.tieredstore.index.IndexStoreService;
import org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore;
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;

Expand All @@ -43,7 +45,7 @@ public class TieredFlatFileManager {
private static final Logger logger = LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME);

private static volatile TieredFlatFileManager instance;
private static volatile TieredIndexFile indexFile;
private static volatile IndexStoreService indexStoreService;

private final TieredMetadataStore metadataStore;
private final TieredMessageStoreConfig storeConfig;
Expand Down Expand Up @@ -76,25 +78,26 @@ public static TieredFlatFileManager getInstance(TieredMessageStoreConfig storeCo
return instance;
}

public static TieredIndexFile getIndexFile(TieredMessageStoreConfig storeConfig) {
public static IndexService getTieredIndexService(TieredMessageStoreConfig storeConfig) {
if (storeConfig == null) {
return indexFile;
return indexStoreService;
}

if (indexFile == null) {
if (indexStoreService == null) {
synchronized (TieredFlatFileManager.class) {
if (indexFile == null) {
if (indexStoreService == null) {
try {
String filePath = TieredStoreUtil.toPath(new MessageQueue(
TieredStoreUtil.RMQ_SYS_TIERED_STORE_INDEX_TOPIC, storeConfig.getBrokerName(), 0));
indexFile = new TieredIndexFile(new TieredFileAllocator(storeConfig), filePath);
indexStoreService = new IndexStoreService(new TieredFileAllocator(storeConfig), filePath);
indexStoreService.start();
} catch (Exception e) {
logger.error("Construct FlatFileManager indexFile error", e);
}
}
}
}
return indexFile;
return indexStoreService;
}

public void doCommit() {
Expand All @@ -120,15 +123,6 @@ public void doCommit() {
}
}, delay, TimeUnit.MILLISECONDS);
}
TieredStoreExecutor.commitExecutor.schedule(() -> {
try {
if (indexFile != null) {
indexFile.commit(true);
}
} catch (Throwable e) {
logger.error("Commit indexFile periodically failed", e);
}
}, 0, TimeUnit.MILLISECONDS);
}

public void doCleanExpiredFile() {
Expand All @@ -148,10 +142,6 @@ public void doCleanExpiredFile() {
}
});
}
if (indexFile != null) {
indexFile.cleanExpiredFile(expiredTimeStamp);
indexFile.destroyExpiredFile();
}
}

private void doScheduleTask() {
Expand Down Expand Up @@ -244,7 +234,7 @@ public void cleanup() {

private static void cleanStaticReference() {
instance = null;
indexFile = null;
indexStoreService = null;
}

@Nullable
Expand All @@ -271,17 +261,17 @@ public ImmutableList<CompositeQueueFlatFile> deepCopyFlatFileToList() {
}

public void shutdown() {
if (indexFile != null) {
indexFile.commit(true);
if (indexStoreService != null) {
indexStoreService.shutdown();
}
for (CompositeFlatFile flatFile : deepCopyFlatFileToList()) {
flatFile.shutdown();
}
}

public void destroy() {
if (indexFile != null) {
indexFile.destroy();
if (indexStoreService != null) {
indexStoreService.destroy();
}
ImmutableList<CompositeQueueFlatFile> flatFileList = deepCopyFlatFileToList();
cleanup();
Expand Down
Loading

0 comments on commit 810ca7e

Please sign in to comment.