Skip to content

Commit

Permalink
[ISSUE #7437] Add the CRC check of commitlog (#7468)
Browse files Browse the repository at this point in the history
* Added CRC32 check for full data

* add unit test

* add MessageExtBrokerInnerTest

* fix codestyle

* fix codestyle

---------

Co-authored-by: guyinyou <[email protected]>
  • Loading branch information
guyinyou and guyinyou authored Oct 25, 2023
1 parent f90c553 commit 91349f3
Show file tree
Hide file tree
Showing 14 changed files with 630 additions and 21 deletions.
14 changes: 14 additions & 0 deletions common/src/main/java/org/apache/rocketmq/common/UtilAll.java
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,20 @@ public static int crc32(byte[] array, int offset, int length) {
return (int) (crc32.getValue() & 0x7FFFFFFF);
}

public static int crc32(ByteBuffer byteBuffer) {
CRC32 crc32 = new CRC32();
crc32.update(byteBuffer);
return (int) (crc32.getValue() & 0x7FFFFFFF);
}

public static int crc32(ByteBuffer[] byteBuffers) {
CRC32 crc32 = new CRC32();
for (ByteBuffer buffer : byteBuffers) {
crc32.update(buffer);
}
return (int) (crc32.getValue() & 0x7FFFFFFF);
}

public static String bytes2string(byte[] src) {
char[] hexChars = new char[src.length * 2];
for (int j = 0; j < src.length; j++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public class MessageConst {
public static final String PROPERTY_TIMER_DEL_UNIQKEY = "TIMER_DEL_UNIQKEY";
public static final String PROPERTY_TIMER_DELAY_LEVEL = "TIMER_DELAY_LEVEL";
public static final String PROPERTY_TIMER_DELAY_MS = "TIMER_DELAY_MS";
public static final String PROPERTY_CRC32 = "__CRC32#";

/**
* properties for DLQ
Expand Down Expand Up @@ -155,5 +156,6 @@ public class MessageConst {
STRING_HASH_SET.add(PROPERTY_BORN_TIMESTAMP);
STRING_HASH_SET.add(PROPERTY_DLQ_ORIGIN_TOPIC);
STRING_HASH_SET.add(PROPERTY_DLQ_ORIGIN_MESSAGE_ID);
STRING_HASH_SET.add(PROPERTY_CRC32);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.common.message;

import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.net.Inet4Address;
import java.net.InetAddress;
Expand Down Expand Up @@ -152,6 +153,34 @@ public static Map<String, String> decodeProperties(ByteBuffer byteBuffer) {
return null;
}

public static void createCrc32(final ByteBuffer input, int crc32) {
input.put(MessageConst.PROPERTY_CRC32.getBytes(StandardCharsets.UTF_8));
input.put((byte) NAME_VALUE_SEPARATOR);
for (int i = 0; i < 10; i++) {
byte b = '0';
if (crc32 > 0) {
b += (byte) (crc32 % 10);
crc32 /= 10;
}
input.put(b);
}
input.put((byte) PROPERTY_SEPARATOR);
}

public static void createCrc32(final ByteBuf input, int crc32) {
input.writeBytes(MessageConst.PROPERTY_CRC32.getBytes(StandardCharsets.UTF_8));
input.writeByte((byte) NAME_VALUE_SEPARATOR);
for (int i = 0; i < 10; i++) {
byte b = '0';
if (crc32 > 0) {
b += (byte) (crc32 % 10);
crc32 /= 10;
}
input.writeByte(b);
}
input.writeByte((byte) PROPERTY_SEPARATOR);
}

public static MessageExt decode(ByteBuffer byteBuffer) {
return decode(byteBuffer, true, true, false);
}
Expand Down Expand Up @@ -601,9 +630,6 @@ public static String messageProperties2String(Map<String, String> properties) {
sb.append(value);
sb.append(PROPERTY_SEPARATOR);
}
if (sb.length() > 0) {
sb.deleteCharAt(sb.length() - 1);
}
return sb.toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@

import org.apache.rocketmq.common.TopicFilterType;

import static org.apache.rocketmq.common.message.MessageDecoder.NAME_VALUE_SEPARATOR;
import static org.apache.rocketmq.common.message.MessageDecoder.PROPERTY_SEPARATOR;

public class MessageExtBrokerInner extends MessageExt {
private static final long serialVersionUID = 7256001576878700634L;
private String propertiesString;
Expand Down Expand Up @@ -55,6 +58,52 @@ public void setPropertiesString(String propertiesString) {
this.propertiesString = propertiesString;
}


public void deleteProperty(String name) {
super.clearProperty(name);
if (propertiesString != null) {
int idx0 = 0;
int idx1;
int idx2;
idx1 = propertiesString.indexOf(name, idx0);
if (idx1 != -1) {
// cropping may be required
StringBuilder stringBuilder = new StringBuilder(propertiesString.length());
while (true) {
int startIdx = idx0;
while (true) {
idx1 = propertiesString.indexOf(name, startIdx);
if (idx1 == -1) {
break;
}
startIdx = idx1 + name.length();
if (idx1 == 0 || propertiesString.charAt(idx1 - 1) == PROPERTY_SEPARATOR) {
if (propertiesString.length() > idx1 + name.length()
&& propertiesString.charAt(idx1 + name.length()) == NAME_VALUE_SEPARATOR) {
break;
}
}
}
if (idx1 == -1) {
// there are no characters that need to be skipped. Append all remaining characters.
stringBuilder.append(propertiesString, idx0, propertiesString.length());
break;
}
// there are characters that need to be cropped
stringBuilder.append(propertiesString, idx0, idx1);
// move idx2 to the end of the cropped character
idx2 = propertiesString.indexOf(PROPERTY_SEPARATOR, idx1 + name.length() + 1);
// all subsequent characters will be cropped
if (idx2 == -1) {
break;
}
idx0 = idx2 + 1;
}
this.setPropertiesString(stringBuilder.toString());
}
}
}

public long getTagsCode() {
return tagsCode;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common;

import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.junit.Test;

import static org.assertj.core.api.Assertions.assertThat;

public class MessageExtBrokerInnerTest {
@Test
public void testDeleteProperty() {
MessageExtBrokerInner messageExtBrokerInner = new MessageExtBrokerInner();
String propertiesString = "";
messageExtBrokerInner.setPropertiesString(propertiesString);
messageExtBrokerInner.deleteProperty("KeyA");
assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("");

propertiesString = "KeyA\u0001ValueA";
messageExtBrokerInner.setPropertiesString(propertiesString);
messageExtBrokerInner.deleteProperty("KeyA");
assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("");

propertiesString = "KeyA\u0001ValueA\u0002";
messageExtBrokerInner.setPropertiesString(propertiesString);
messageExtBrokerInner.deleteProperty("KeyA");
assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("");

propertiesString = "KeyA\u0001ValueA\u0002KeyA\u0001ValueA";
messageExtBrokerInner.setPropertiesString(propertiesString);
messageExtBrokerInner.deleteProperty("KeyA");
assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("");

propertiesString = "KeyA\u0001ValueA\u0002KeyA\u0001ValueA\u0002";
messageExtBrokerInner.setPropertiesString(propertiesString);
messageExtBrokerInner.deleteProperty("KeyA");
assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("");

propertiesString = "KeyB\u0001ValueB\u0002KeyA\u0001ValueA";
messageExtBrokerInner.setPropertiesString(propertiesString);
messageExtBrokerInner.deleteProperty("KeyA");
assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB\u0002");

propertiesString = "KeyB\u0001ValueB\u0002KeyA\u0001ValueA\u0002";
messageExtBrokerInner.setPropertiesString(propertiesString);
messageExtBrokerInner.deleteProperty("KeyA");
assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB\u0002");

propertiesString = "KeyB\u0001ValueB\u0002KeyA\u0001ValueA\u0002KeyB\u0001ValueB\u0002";
messageExtBrokerInner.setPropertiesString(propertiesString);
messageExtBrokerInner.deleteProperty("KeyA");
assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB\u0002KeyB\u0001ValueB\u0002");

propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueB\u0002";
messageExtBrokerInner.setPropertiesString(propertiesString);
messageExtBrokerInner.deleteProperty("KeyA");
assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB\u0002");

propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueB";
messageExtBrokerInner.setPropertiesString(propertiesString);
messageExtBrokerInner.deleteProperty("KeyA");
assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB");

propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueBKeyA\u0001ValueA\u0002";
messageExtBrokerInner.setPropertiesString(propertiesString);
messageExtBrokerInner.deleteProperty("KeyA");
assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueBKeyA\u0001ValueA\u0002");

propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueBKeyA\u0001";
messageExtBrokerInner.setPropertiesString(propertiesString);
messageExtBrokerInner.deleteProperty("KeyA");
assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueBKeyA\u0001");

propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueBKeyA";
messageExtBrokerInner.setPropertiesString(propertiesString);
messageExtBrokerInner.deleteProperty("KeyA");
assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueBKeyA");
}
}
Loading

0 comments on commit 91349f3

Please sign in to comment.