From 2b9bff7d1d2e2705d27a743f797828870c333297 Mon Sep 17 00:00:00 2001 From: guyinyou Date: Wed, 25 Oct 2023 10:29:23 +0800 Subject: [PATCH] add unit test --- .../org/apache/rocketmq/store/CommitLog.java | 5 +- .../store/config/MessageStoreConfig.java | 1 + .../rocketmq/store/AppendPropCRCTest.java | 200 ++++++++++++++++++ 3 files changed, 204 insertions(+), 2 deletions(-) create mode 100644 store/src/test/java/org/apache/rocketmq/store/AppendPropCRCTest.java diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index 3786a137fc4..3d3ee86b8f1 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -16,8 +16,6 @@ */ package org.apache.rocketmq.store; -import com.sun.jna.NativeLong; -import com.sun.jna.Pointer; import java.net.Inet6Address; import java.net.InetSocketAddress; import java.nio.ByteBuffer; @@ -35,6 +33,8 @@ import java.util.concurrent.TimeoutException; import java.util.function.Supplier; import java.util.stream.Collectors; +import com.sun.jna.NativeLong; +import com.sun.jna.Pointer; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.ServiceThread; import org.apache.rocketmq.common.SystemClock; @@ -61,6 +61,7 @@ import org.apache.rocketmq.store.logfile.MappedFile; import org.apache.rocketmq.store.util.LibC; import org.rocksdb.RocksDBException; + import sun.nio.ch.DirectBuffer; /** diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java index c3aa7b1ed57..8cb3ea6e9ee 100644 --- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java +++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.store.config; import java.io.File; + import org.apache.rocketmq.common.annotation.ImportantField; import org.apache.rocketmq.store.ConsumeQueue; import org.apache.rocketmq.store.StoreType; diff --git a/store/src/test/java/org/apache/rocketmq/store/AppendPropCRCTest.java b/store/src/test/java/org/apache/rocketmq/store/AppendPropCRCTest.java new file mode 100644 index 00000000000..0d66c780317 --- /dev/null +++ b/store/src/test/java/org/apache/rocketmq/store/AppendPropCRCTest.java @@ -0,0 +1,200 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.store; + +import java.io.File; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageExtBatch; +import org.apache.rocketmq.common.message.MessageExtBrokerInner; +import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class AppendPropCRCTest { + + AppendMessageCallback callback; + + MessageExtEncoder encoder; + + CommitLog commitLog; + + @Before + public void init() throws Exception { + MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); + messageStoreConfig.setMappedFileSizeCommitLog(1024 * 8); + messageStoreConfig.setMappedFileSizeConsumeQueue(1024 * 4); + messageStoreConfig.setMaxHashSlotNum(100); + messageStoreConfig.setMaxIndexNum(100 * 10); + messageStoreConfig.setStorePathRootDir(System.getProperty("java.io.tmpdir") + File.separator + "unitteststore"); + messageStoreConfig.setStorePathCommitLog(System.getProperty("java.io.tmpdir") + File.separator + "unitteststore" + File.separator + "commitlog"); + messageStoreConfig.setForceVerifyPropCRC(true); + messageStoreConfig.setEnabledAppendPropCRC(true); + //too much reference + DefaultMessageStore messageStore = new DefaultMessageStore(messageStoreConfig, null, null, new BrokerConfig(), new ConcurrentHashMap<>()); + commitLog = new CommitLog(messageStore); + encoder = new MessageExtEncoder(10 * 1024 * 1024, true); + callback = commitLog.new DefaultAppendMessageCallback(); + } + + @After + public void destroy() { + UtilAll.deleteFile(new File(System.getProperty("user.home") + File.separator + "unitteststore")); + } + + @Test + public void testAppendMessageSucc() throws Exception { + String topic = "test-topic"; + int queue = 0; + int msgNum = 10; + int propertiesLen = 0; + Message msg = new Message(); + msg.setBody("body".getBytes()); + msg.setTopic(topic); + msg.setTags("abc"); + msg.putUserProperty("a", "aaaaaaaa"); + msg.putUserProperty("b", "bbbbbbbb"); + msg.putUserProperty("c", "cccccccc"); + msg.putUserProperty("d", "dddddddd"); + msg.putUserProperty("e", "eeeeeeee"); + msg.putUserProperty("f", "ffffffff"); + + MessageExtBrokerInner messageExtBrokerInner = new MessageExtBrokerInner(); + messageExtBrokerInner.setTopic(topic); + messageExtBrokerInner.setQueueId(queue); + messageExtBrokerInner.setBornTimestamp(System.currentTimeMillis()); + messageExtBrokerInner.setBornHost(new InetSocketAddress("127.0.0.1", 123)); + messageExtBrokerInner.setStoreHost(new InetSocketAddress("127.0.0.1", 124)); + messageExtBrokerInner.setBody(msg.getBody()); + messageExtBrokerInner.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); + propertiesLen = messageExtBrokerInner.getPropertiesString().length(); + + ByteBuffer buff = ByteBuffer.allocate(1024 * 10); + for (int i = 0; i < msgNum; i++) { + encoder.encode(messageExtBrokerInner); + messageExtBrokerInner.setEncodedBuff(encoder.getEncoderBuffer()); + AppendMessageResult allresult = callback.doAppend(0, buff, 1024 * 10, messageExtBrokerInner, null); + assertEquals(AppendMessageStatus.PUT_OK, allresult.getStatus()); + } + // 未修改消息时期望通过 + buff.flip(); + for (int i = 0; i < msgNum - 1; i++) { + DispatchRequest request = commitLog.checkMessageAndReturnSize(buff, true, false); + assertTrue(request.isSuccess()); + } + // 修改最后一条消息的properties,期望校验失败 + int idx = buff.limit() - (propertiesLen / 2); + buff.put(idx, (byte) (buff.get(idx) + 1)); + DispatchRequest request = commitLog.checkMessageAndReturnSize(buff, true, false); + assertFalse(request.isSuccess()); + } + + @Test + public void testAppendMessageBatchSucc() throws Exception { + List messages = new ArrayList<>(); + String topic = "test-topic"; + int queue = 0; + int propertiesLen = 0; + for (int i = 0; i < 10; i++) { + Message msg = new Message(); + msg.setBody("body".getBytes()); + msg.setTopic(topic); + msg.setTags("abc"); + msg.putUserProperty("a", "aaaaaaaa"); + msg.putUserProperty("b", "bbbbbbbb"); + msg.putUserProperty("c", "cccccccc"); + msg.putUserProperty("d", "dddddddd"); + msg.putUserProperty("e", "eeeeeeee"); + msg.putUserProperty("f", "ffffffff"); + String propertiesString = MessageDecoder.messageProperties2String(msg.getProperties()); + propertiesLen = propertiesString.length(); + messages.add(msg); + } + MessageExtBatch messageExtBatch = new MessageExtBatch(); + messageExtBatch.setTopic(topic); + messageExtBatch.setQueueId(queue); + messageExtBatch.setBornTimestamp(System.currentTimeMillis()); + messageExtBatch.setBornHost(new InetSocketAddress("127.0.0.1", 123)); + messageExtBatch.setStoreHost(new InetSocketAddress("127.0.0.1", 124)); + messageExtBatch.setBody(MessageDecoder.encodeMessages(messages)); + + PutMessageContext putMessageContext = new PutMessageContext(topic + "-" + queue); + messageExtBatch.setEncodedBuff(encoder.encode(messageExtBatch, putMessageContext)); + ByteBuffer buff = ByteBuffer.allocate(1024 * 10); + //encounter end of file when append half of the data + AppendMessageResult allresult = + callback.doAppend(0, buff, 1024 * 10, messageExtBatch, putMessageContext); + + assertEquals(AppendMessageStatus.PUT_OK, allresult.getStatus()); + assertEquals(0, allresult.getWroteOffset()); + assertEquals(0, allresult.getLogicsOffset()); + assertEquals(buff.position(), allresult.getWroteBytes()); + + assertEquals(messages.size(), allresult.getMsgNum()); + + Set msgIds = new HashSet<>(); + for (String msgId : allresult.getMsgId().split(",")) { + assertEquals(32, msgId.length()); + msgIds.add(msgId); + } + assertEquals(messages.size(), msgIds.size()); + + List decodeMsgs = MessageDecoder.decodes((ByteBuffer) buff.flip()); + assertEquals(decodeMsgs.size(), decodeMsgs.size()); + long queueOffset = decodeMsgs.get(0).getQueueOffset(); + long storeTimeStamp = decodeMsgs.get(0).getStoreTimestamp(); + for (int i = 0; i < messages.size(); i++) { + assertEquals(messages.get(i).getTopic(), decodeMsgs.get(i).getTopic()); + assertEquals(new String(messages.get(i).getBody()), new String(decodeMsgs.get(i).getBody())); + assertEquals(messages.get(i).getTags(), decodeMsgs.get(i).getTags()); + + assertEquals(messageExtBatch.getBornHostNameString(), decodeMsgs.get(i).getBornHostNameString()); + + assertEquals(messageExtBatch.getBornTimestamp(), decodeMsgs.get(i).getBornTimestamp()); + assertEquals(storeTimeStamp, decodeMsgs.get(i).getStoreTimestamp()); + assertEquals(queueOffset++, decodeMsgs.get(i).getQueueOffset()); + } + + // 未修改消息时期望通过 + buff.flip(); + for (int i = 0; i < messages.size() - 1; i++) { + DispatchRequest request = commitLog.checkMessageAndReturnSize(buff, true, false); + assertTrue(request.isSuccess()); + } + // 修改最后一条消息的properties,期望校验失败 + int idx = buff.limit() - (propertiesLen / 2); + buff.put(idx, (byte) (buff.get(idx) + 1)); + DispatchRequest request = commitLog.checkMessageAndReturnSize(buff, true, false); + assertFalse(request.isSuccess()); + } +}