diff --git a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainPermissionManager.java b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainPermissionManager.java index f6699fa13bc..345aed06c5a 100644 --- a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainPermissionManager.java +++ b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainPermissionManager.java @@ -484,7 +484,7 @@ public boolean updateGlobalWhiteAddrsConfig(List globalWhiteAddrsList, S return false; } - if (!fileName.startsWith(fileHome)) { + if (!file.getAbsolutePath().startsWith(fileHome)) { log.error("Parameter value " + fileName + " is not in the directory rocketmq.home.dir " + fileHome); return false; } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java index cbb81f632b4..7878d0eec59 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java @@ -87,4 +87,9 @@ public void onChannelIdle(String remoteAddr, Channel channel) { this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel); this.brokerController.getBrokerStatsManager().incChannelIdleNum(); } + + @Override + public void onChannelActive(String remoteAddr, Channel channel) { + + } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index dd4ec960fe5..fbba6633b64 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -49,7 +49,6 @@ import org.apache.rocketmq.broker.plugin.BrokerAttachedPlugin; import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager; import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil; -import org.apache.rocketmq.common.AclConfig; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.KeyBuilder; import org.apache.rocketmq.common.LockCallback; @@ -130,8 +129,6 @@ import org.apache.rocketmq.remoting.protocol.header.GetAllProducerInfoRequestHeader; import org.apache.rocketmq.remoting.protocol.header.GetAllTopicConfigResponseHeader; import org.apache.rocketmq.remoting.protocol.header.GetBrokerAclConfigResponseHeader; -import org.apache.rocketmq.remoting.protocol.header.GetBrokerClusterAclConfigResponseBody; -import org.apache.rocketmq.remoting.protocol.header.GetBrokerClusterAclConfigResponseHeader; import org.apache.rocketmq.remoting.protocol.header.GetBrokerConfigResponseHeader; import org.apache.rocketmq.remoting.protocol.header.GetConsumeStatsInBrokerHeader; import org.apache.rocketmq.remoting.protocol.header.GetConsumeStatsRequestHeader; @@ -311,8 +308,6 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, return updateGlobalWhiteAddrsConfig(ctx, request); case RequestCode.RESUME_CHECK_HALF_MESSAGE: return resumeCheckHalfMessage(ctx, request); - case RequestCode.GET_BROKER_CLUSTER_ACL_CONFIG: - return getBrokerClusterAclConfig(ctx, request); case RequestCode.GET_TOPIC_CONFIG: return getTopicConfig(ctx, request); case RequestCode.UPDATE_AND_CREATE_STATIC_TOPIC: @@ -445,6 +440,13 @@ private synchronized RemotingCommand updateAndCreateTopic(ChannelHandlerContext return response; } + if (topicConfig.equals(this.brokerController.getTopicConfigManager().getTopicConfigTable().get(topic))) { + LOGGER.info("Broker receive request to update or create topic={}, but topicConfig has no changes , so idempotent, caller address={}", + requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(ctx.channel())); + response.setCode(ResponseCode.SUCCESS); + return response; + } + try { this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig); if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) { @@ -523,12 +525,13 @@ private synchronized RemotingCommand deleteTopic(ChannelHandlerContext ctx, requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(ctx.channel())); String topic = requestHeader.getTopic(); - TopicValidator.ValidateTopicResult result = TopicValidator.validateTopic(topic); - if (!result.isValid()) { + + if (UtilAll.isBlank(topic)) { response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark(result.getRemark()); + response.setRemark("The specified topic is blank."); return response; } + if (brokerController.getBrokerConfig().isValidateSystemTopicWhenUpdateTopic()) { if (TopicValidator.isSystemTopic(topic)) { response.setCode(ResponseCode.SYSTEM_ERROR); @@ -699,27 +702,6 @@ private RemotingCommand getBrokerAclConfigVersion(ChannelHandlerContext ctx, Rem return null; } - private RemotingCommand getBrokerClusterAclConfig(ChannelHandlerContext ctx, RemotingCommand request) { - - final RemotingCommand response = RemotingCommand.createResponseCommand(GetBrokerClusterAclConfigResponseHeader.class); - - try { - AccessValidator accessValidator = this.brokerController.getAccessValidatorMap().get(PlainAccessValidator.class); - GetBrokerClusterAclConfigResponseBody body = new GetBrokerClusterAclConfigResponseBody(); - AclConfig aclConfig = accessValidator.getAllAclConfig(); - body.setGlobalWhiteAddrs(aclConfig.getGlobalWhiteAddrs()); - body.setPlainAccessConfigs(aclConfig.getPlainAccessConfigs()); - response.setCode(ResponseCode.SUCCESS); - response.setBody(body.encode()); - response.setRemark(null); - return response; - } catch (Exception e) { - LOGGER.error("Failed to generate a proper getBrokerClusterAclConfig response", e); - } - - return null; - } - private RemotingCommand getUnknownCmdResponse(ChannelHandlerContext ctx, RemotingCommand request) { String error = " request type " + request.getCode() + " not supported"; final RemotingCommand response = @@ -2752,7 +2734,7 @@ private RemotingCommand getBrokerEpochCache(ChannelHandlerContext ctx, RemotingC return response; } final EpochEntryCache entryCache = new EpochEntryCache(brokerConfig.getBrokerClusterName(), - brokerConfig.getBrokerName(), brokerConfig.getBrokerId(), replicasManager.getEpochEntries(), this.brokerController.getMessageStore().getMaxPhyOffset()); + brokerConfig.getBrokerName(), brokerConfig.getBrokerId(), replicasManager.getEpochEntries(), this.brokerController.getMessageStore().getMaxPhyOffset()); response.setBody(entryCache.encode()); response.setCode(ResponseCode.SUCCESS); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java index f5d07c5aae9..7ed4d53ab1c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java @@ -204,7 +204,7 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingC RemotingCommand response = RemotingCommand.createResponseCommand(PopMessageResponseHeader.class); final PopMessageResponseHeader responseHeader = (PopMessageResponseHeader) response.readCustomHeader(); final PopMessageRequestHeader requestHeader = - (PopMessageRequestHeader) request.decodeCommandCustomHeader(PopMessageRequestHeader.class); + (PopMessageRequestHeader) request.decodeCommandCustomHeader(PopMessageRequestHeader.class, true); StringBuilder startOffsetInfo = new StringBuilder(64); StringBuilder msgOffsetInfo = new StringBuilder(64); StringBuilder orderCountInfo = null; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java index 4f80752e190..3fb689ed6a9 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java @@ -129,7 +129,7 @@ private boolean reviveRetry(PopCheckPoint popCheckPoint, MessageExt messageExt) PutMessageResult putMessageResult = brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner); PopMetricsManager.incPopReviveRetryMessageCount(popCheckPoint, putMessageResult.getPutMessageStatus()); if (brokerController.getBrokerConfig().isEnablePopLog()) { - POP_LOGGER.info("reviveQueueId={},retry msg , ck={}, msg queueId {}, offset {}, reviveDelay={}, result is {} ", + POP_LOGGER.info("reviveQueueId={},retry msg, ck={}, msg queueId {}, offset {}, reviveDelay={}, result is {} ", queueId, popCheckPoint, messageExt.getQueueId(), messageExt.getQueueOffset(), (System.currentTimeMillis() - popCheckPoint.getReviveTime()) / 1000, putMessageResult); } @@ -319,7 +319,7 @@ protected void consumeReviveMessage(ConsumeReviveObj consumeReviveObj) { // offset self amend while (true) { if (!shouldRunPopRevive) { - POP_LOGGER.info("slave skip scan , revive topic={}, reviveQueueId={}", reviveTopic, queueId); + POP_LOGGER.info("slave skip scan, revive topic={}, reviveQueueId={}", reviveTopic, queueId); break; } List messageExts = getReviveMessage(offset, queueId); @@ -351,7 +351,7 @@ protected void consumeReviveMessage(ConsumeReviveObj consumeReviveObj) { noMsgCount = 0; } if (System.currentTimeMillis() - startScanTime > brokerController.getBrokerConfig().getReviveScanTime()) { - POP_LOGGER.info("reviveQueueId={}, scan timeout ", queueId); + POP_LOGGER.info("reviveQueueId={}, scan timeout ", queueId); break; } for (MessageExt messageExt : messageExts) { @@ -373,7 +373,7 @@ protected void consumeReviveMessage(ConsumeReviveObj consumeReviveObj) { } else if (PopAckConstants.ACK_TAG.equals(messageExt.getTags())) { String raw = new String(messageExt.getBody(), DataConverter.CHARSET_UTF8); if (brokerController.getBrokerConfig().isEnablePopLog()) { - POP_LOGGER.info("reviveQueueId={},find ack, offset:{}, raw : {}", messageExt.getQueueId(), messageExt.getQueueOffset(), raw); + POP_LOGGER.info("reviveQueueId={}, find ack, offset:{}, raw : {}", messageExt.getQueueId(), messageExt.getQueueOffset(), raw); } AckMsg ackMsg = JSON.parseObject(raw, AckMsg.class); PopMetricsManager.incPopReviveAckGetCount(ackMsg, queueId); @@ -465,15 +465,15 @@ private PopCheckPoint createMockCkForAck(AckMsg ackMsg, long reviveOffset) { protected void mergeAndRevive(ConsumeReviveObj consumeReviveObj) throws Throwable { ArrayList sortList = consumeReviveObj.genSortList(); - POP_LOGGER.info("reviveQueueId={},ck listSize={}", queueId, sortList.size()); + POP_LOGGER.info("reviveQueueId={}, ck listSize={}", queueId, sortList.size()); if (sortList.size() != 0) { - POP_LOGGER.info("reviveQueueId={}, 1st ck, startOffset={}, reviveOffset={} ; last ck, startOffset={}, reviveOffset={}", queueId, sortList.get(0).getStartOffset(), + POP_LOGGER.info("reviveQueueId={}, 1st ck, startOffset={}, reviveOffset={}; last ck, startOffset={}, reviveOffset={}", queueId, sortList.get(0).getStartOffset(), sortList.get(0).getReviveOffset(), sortList.get(sortList.size() - 1).getStartOffset(), sortList.get(sortList.size() - 1).getReviveOffset()); } long newOffset = consumeReviveObj.oldOffset; for (PopCheckPoint popCheckPoint : sortList) { if (!shouldRunPopRevive) { - POP_LOGGER.info("slave skip ck process , revive topic={}, reviveQueueId={}", reviveTopic, queueId); + POP_LOGGER.info("slave skip ck process, revive topic={}, reviveQueueId={}", reviveTopic, queueId); break; } if (consumeReviveObj.endTime - popCheckPoint.getReviveTime() <= (PopAckConstants.ackTimeInterval + PopAckConstants.SECOND)) { @@ -483,12 +483,12 @@ protected void mergeAndRevive(ConsumeReviveObj consumeReviveObj) throws Throwabl // check normal topic, skip ck , if normal topic is not exist String normalTopic = KeyBuilder.parseNormalTopic(popCheckPoint.getTopic(), popCheckPoint.getCId()); if (brokerController.getTopicConfigManager().selectTopicConfig(normalTopic) == null) { - POP_LOGGER.warn("reviveQueueId={},can not get normal topic {} , then continue ", queueId, popCheckPoint.getTopic()); + POP_LOGGER.warn("reviveQueueId={}, can not get normal topic {}, then continue", queueId, popCheckPoint.getTopic()); newOffset = popCheckPoint.getReviveOffset(); continue; } if (null == brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(popCheckPoint.getCId())) { - POP_LOGGER.warn("reviveQueueId={},can not get cid {} , then continue ", queueId, popCheckPoint.getCId()); + POP_LOGGER.warn("reviveQueueId={}, can not get cid {}, then continue", queueId, popCheckPoint.getCId()); newOffset = popCheckPoint.getReviveOffset(); continue; } @@ -520,7 +520,7 @@ protected void mergeAndRevive(ConsumeReviveObj consumeReviveObj) throws Throwabl private void reviveMsgFromCk(PopCheckPoint popCheckPoint) { if (!shouldRunPopRevive) { - POP_LOGGER.info("slave skip retry , revive topic={}, reviveQueueId={}", reviveTopic, queueId); + POP_LOGGER.info("slave skip retry, revive topic={}, reviveQueueId={}", reviveTopic, queueId); return; } inflightReviveRequestMap.put(popCheckPoint, new Pair<>(System.currentTimeMillis(), false)); @@ -646,7 +646,7 @@ public void run() { consumeReviveMessage(consumeReviveObj); if (!shouldRunPopRevive) { - POP_LOGGER.info("slave skip scan , revive topic={}, reviveQueueId={}", reviveTopic, queueId); + POP_LOGGER.info("slave skip scan, revive topic={}, reviveQueueId={}", reviveTopic, queueId); continue; } @@ -662,7 +662,7 @@ public void run() { currentReviveMessageTimestamp = System.currentTimeMillis(); } - POP_LOGGER.info("reviveQueueId={},revive finish,old offset is {}, new offset is {}, ckDelay={} ", + POP_LOGGER.info("reviveQueueId={}, revive finish,old offset is {}, new offset is {}, ckDelay={} ", queueId, consumeReviveObj.oldOffset, consumeReviveObj.newOffset, delay); if (sortList == null || sortList.isEmpty()) { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index e152be81193..6074081c10e 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -53,7 +53,6 @@ import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; -import org.apache.rocketmq.common.AclConfig; import org.apache.rocketmq.common.BoundaryType; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MixAll; @@ -154,7 +153,6 @@ import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil; import org.apache.rocketmq.remoting.protocol.header.GetAllProducerInfoRequestHeader; import org.apache.rocketmq.remoting.protocol.header.GetBrokerAclConfigResponseHeader; -import org.apache.rocketmq.remoting.protocol.header.GetBrokerClusterAclConfigResponseBody; import org.apache.rocketmq.remoting.protocol.header.GetConsumeStatsInBrokerHeader; import org.apache.rocketmq.remoting.protocol.header.GetConsumeStatsRequestHeader; import org.apache.rocketmq.remoting.protocol.header.GetConsumerConnectionListRequestHeader; @@ -520,31 +518,6 @@ public ClusterAclVersionInfo getBrokerClusterAclInfo(final String addr, } - public AclConfig getBrokerClusterConfig(final String addr, - final long timeoutMillis) throws InterruptedException, RemotingTimeoutException, - RemotingSendRequestException, RemotingConnectException, MQBrokerException { - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CLUSTER_ACL_CONFIG, null); - - RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis); - assert response != null; - switch (response.getCode()) { - case ResponseCode.SUCCESS: { - if (response.getBody() != null) { - GetBrokerClusterAclConfigResponseBody body = - GetBrokerClusterAclConfigResponseBody.decode(response.getBody(), GetBrokerClusterAclConfigResponseBody.class); - AclConfig aclConfig = new AclConfig(); - aclConfig.setGlobalWhiteAddrs(body.getGlobalWhiteAddrs()); - aclConfig.setPlainAccessConfigs(body.getPlainAccessConfigs()); - return aclConfig; - } - } - default: - break; - } - throw new MQBrokerException(response.getCode(), response.getRemark(), addr); - - } - public SendResult sendMessage( final String addr, final String brokerName, diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index 09534a1768b..ba72a6dce77 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -159,14 +159,6 @@ public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String cli private final ConcurrentMap> brokerAddrTable = MQClientInstance.this.brokerAddrTable; @Override public void onChannelConnect(String remoteAddr, Channel channel) { - for (Map.Entry> addressEntry : brokerAddrTable.entrySet()) { - for (String address : addressEntry.getValue().values()) { - if (address.equals(remoteAddr)) { - sendHeartbeatToAllBrokerWithLockV2(false); - break; - } - } - } } @Override @@ -180,6 +172,18 @@ public void onChannelException(String remoteAddr, Channel channel) { @Override public void onChannelIdle(String remoteAddr, Channel channel) { } + + @Override + public void onChannelActive(String remoteAddr, Channel channel) { + for (Map.Entry> addressEntry : brokerAddrTable.entrySet()) { + for (String address : addressEntry.getValue().values()) { + if (address.equals(remoteAddr)) { + sendHeartbeatToAllBrokerWithLockV2(false); + break; + } + } + } + } }; } else { channelEventListener = null; diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java index c152d38ea50..cf399802baf 100644 --- a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java @@ -37,7 +37,6 @@ import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; -import org.apache.rocketmq.common.AclConfig; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.TopicConfig; @@ -62,8 +61,6 @@ import org.apache.rocketmq.remoting.protocol.header.ChangeInvisibleTimeRequestHeader; import org.apache.rocketmq.remoting.protocol.header.ChangeInvisibleTimeResponseHeader; import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil; -import org.apache.rocketmq.remoting.protocol.header.GetBrokerClusterAclConfigResponseBody; -import org.apache.rocketmq.remoting.protocol.header.GetBrokerClusterAclConfigResponseHeader; import org.apache.rocketmq.remoting.protocol.header.GetConsumerListByGroupResponseBody; import org.apache.rocketmq.remoting.protocol.header.GetConsumerListByGroupResponseHeader; import org.apache.rocketmq.remoting.protocol.header.GetEarliestMsgStoretimeResponseHeader; @@ -700,30 +697,6 @@ public RemotingCommand answer(InvocationOnMock mock) { mqClientAPI.createTopic(brokerAddr, topic, new TopicConfig(), 10000); } - @Test - public void testGetBrokerClusterConfig() throws Exception { - doAnswer(new Answer() { - @Override - public RemotingCommand answer(InvocationOnMock mock) { - RemotingCommand request = mock.getArgument(1); - - RemotingCommand response = RemotingCommand.createResponseCommand(GetBrokerClusterAclConfigResponseHeader.class); - GetBrokerClusterAclConfigResponseBody body = new GetBrokerClusterAclConfigResponseBody(); - body.setGlobalWhiteAddrs(Collections.singletonList("1.1.1.1")); - body.setPlainAccessConfigs(Collections.singletonList(new PlainAccessConfig())); - response.setBody(body.encode()); - response.makeCustomHeaderToNet(); - response.setCode(ResponseCode.SUCCESS); - response.setOpaque(request.getOpaque()); - return response; - } - }).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong()); - - AclConfig aclConfig = mqClientAPI.getBrokerClusterConfig(brokerAddr, 10000); - assertThat(aclConfig.getPlainAccessConfigs()).size().isGreaterThan(0); - assertThat(aclConfig.getGlobalWhiteAddrs()).size().isGreaterThan(0); - } - @Test public void testViewMessage() throws Exception { doAnswer(new Answer() { diff --git a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java index d2b7c374b74..95b6b09b416 100644 --- a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java +++ b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java @@ -307,6 +307,20 @@ public static int crc32(byte[] array, int offset, int length) { return (int) (crc32.getValue() & 0x7FFFFFFF); } + public static int crc32(ByteBuffer byteBuffer) { + CRC32 crc32 = new CRC32(); + crc32.update(byteBuffer); + return (int) (crc32.getValue() & 0x7FFFFFFF); + } + + public static int crc32(ByteBuffer[] byteBuffers) { + CRC32 crc32 = new CRC32(); + for (ByteBuffer buffer : byteBuffers) { + crc32.update(buffer); + } + return (int) (crc32.getValue() & 0x7FFFFFFF); + } + public static String bytes2string(byte[] src) { char[] hexChars = new char[src.length * 2]; for (int j = 0; j < src.length; j++) { diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java index 87fed7c192e..24f7bdb99a5 100644 --- a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java @@ -97,6 +97,7 @@ public class MessageConst { public static final String PROPERTY_TIMER_DEL_UNIQKEY = "TIMER_DEL_UNIQKEY"; public static final String PROPERTY_TIMER_DELAY_LEVEL = "TIMER_DELAY_LEVEL"; public static final String PROPERTY_TIMER_DELAY_MS = "TIMER_DELAY_MS"; + public static final String PROPERTY_CRC32 = "__CRC32#"; /** * properties for DLQ @@ -155,5 +156,6 @@ public class MessageConst { STRING_HASH_SET.add(PROPERTY_BORN_TIMESTAMP); STRING_HASH_SET.add(PROPERTY_DLQ_ORIGIN_TOPIC); STRING_HASH_SET.add(PROPERTY_DLQ_ORIGIN_MESSAGE_ID); + STRING_HASH_SET.add(PROPERTY_CRC32); } } diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java index 6de0b69fb75..b053f827597 100644 --- a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.common.message; +import io.netty.buffer.ByteBuf; import java.io.IOException; import java.net.Inet4Address; import java.net.InetAddress; @@ -152,6 +153,34 @@ public static Map decodeProperties(ByteBuffer byteBuffer) { return null; } + public static void createCrc32(final ByteBuffer input, int crc32) { + input.put(MessageConst.PROPERTY_CRC32.getBytes(StandardCharsets.UTF_8)); + input.put((byte) NAME_VALUE_SEPARATOR); + for (int i = 0; i < 10; i++) { + byte b = '0'; + if (crc32 > 0) { + b += (byte) (crc32 % 10); + crc32 /= 10; + } + input.put(b); + } + input.put((byte) PROPERTY_SEPARATOR); + } + + public static void createCrc32(final ByteBuf input, int crc32) { + input.writeBytes(MessageConst.PROPERTY_CRC32.getBytes(StandardCharsets.UTF_8)); + input.writeByte((byte) NAME_VALUE_SEPARATOR); + for (int i = 0; i < 10; i++) { + byte b = '0'; + if (crc32 > 0) { + b += (byte) (crc32 % 10); + crc32 /= 10; + } + input.writeByte(b); + } + input.writeByte((byte) PROPERTY_SEPARATOR); + } + public static MessageExt decode(ByteBuffer byteBuffer) { return decode(byteBuffer, true, true, false); } @@ -601,9 +630,6 @@ public static String messageProperties2String(Map properties) { sb.append(value); sb.append(PROPERTY_SEPARATOR); } - if (sb.length() > 0) { - sb.deleteCharAt(sb.length() - 1); - } return sb.toString(); } diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java index 91599653c5f..4e5d3419a3a 100644 --- a/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java @@ -20,6 +20,9 @@ import org.apache.rocketmq.common.TopicFilterType; +import static org.apache.rocketmq.common.message.MessageDecoder.NAME_VALUE_SEPARATOR; +import static org.apache.rocketmq.common.message.MessageDecoder.PROPERTY_SEPARATOR; + public class MessageExtBrokerInner extends MessageExt { private static final long serialVersionUID = 7256001576878700634L; private String propertiesString; @@ -55,6 +58,52 @@ public void setPropertiesString(String propertiesString) { this.propertiesString = propertiesString; } + + public void deleteProperty(String name) { + super.clearProperty(name); + if (propertiesString != null) { + int idx0 = 0; + int idx1; + int idx2; + idx1 = propertiesString.indexOf(name, idx0); + if (idx1 != -1) { + // cropping may be required + StringBuilder stringBuilder = new StringBuilder(propertiesString.length()); + while (true) { + int startIdx = idx0; + while (true) { + idx1 = propertiesString.indexOf(name, startIdx); + if (idx1 == -1) { + break; + } + startIdx = idx1 + name.length(); + if (idx1 == 0 || propertiesString.charAt(idx1 - 1) == PROPERTY_SEPARATOR) { + if (propertiesString.length() > idx1 + name.length() + && propertiesString.charAt(idx1 + name.length()) == NAME_VALUE_SEPARATOR) { + break; + } + } + } + if (idx1 == -1) { + // there are no characters that need to be skipped. Append all remaining characters. + stringBuilder.append(propertiesString, idx0, propertiesString.length()); + break; + } + // there are characters that need to be cropped + stringBuilder.append(propertiesString, idx0, idx1); + // move idx2 to the end of the cropped character + idx2 = propertiesString.indexOf(PROPERTY_SEPARATOR, idx1 + name.length() + 1); + // all subsequent characters will be cropped + if (idx2 == -1) { + break; + } + idx0 = idx2 + 1; + } + this.setPropertiesString(stringBuilder.toString()); + } + } + } + public long getTagsCode() { return tagsCode; } diff --git a/common/src/test/java/org/apache/rocketmq/common/MessageExtBrokerInnerTest.java b/common/src/test/java/org/apache/rocketmq/common/MessageExtBrokerInnerTest.java new file mode 100644 index 00000000000..77d69e5ad76 --- /dev/null +++ b/common/src/test/java/org/apache/rocketmq/common/MessageExtBrokerInnerTest.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common; + +import org.apache.rocketmq.common.message.MessageExtBrokerInner; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class MessageExtBrokerInnerTest { + @Test + public void testDeleteProperty() { + MessageExtBrokerInner messageExtBrokerInner = new MessageExtBrokerInner(); + String propertiesString = ""; + messageExtBrokerInner.setPropertiesString(propertiesString); + messageExtBrokerInner.deleteProperty("KeyA"); + assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo(""); + + propertiesString = "KeyA\u0001ValueA"; + messageExtBrokerInner.setPropertiesString(propertiesString); + messageExtBrokerInner.deleteProperty("KeyA"); + assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo(""); + + propertiesString = "KeyA\u0001ValueA\u0002"; + messageExtBrokerInner.setPropertiesString(propertiesString); + messageExtBrokerInner.deleteProperty("KeyA"); + assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo(""); + + propertiesString = "KeyA\u0001ValueA\u0002KeyA\u0001ValueA"; + messageExtBrokerInner.setPropertiesString(propertiesString); + messageExtBrokerInner.deleteProperty("KeyA"); + assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo(""); + + propertiesString = "KeyA\u0001ValueA\u0002KeyA\u0001ValueA\u0002"; + messageExtBrokerInner.setPropertiesString(propertiesString); + messageExtBrokerInner.deleteProperty("KeyA"); + assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo(""); + + propertiesString = "KeyB\u0001ValueB\u0002KeyA\u0001ValueA"; + messageExtBrokerInner.setPropertiesString(propertiesString); + messageExtBrokerInner.deleteProperty("KeyA"); + assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB\u0002"); + + propertiesString = "KeyB\u0001ValueB\u0002KeyA\u0001ValueA\u0002"; + messageExtBrokerInner.setPropertiesString(propertiesString); + messageExtBrokerInner.deleteProperty("KeyA"); + assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB\u0002"); + + propertiesString = "KeyB\u0001ValueB\u0002KeyA\u0001ValueA\u0002KeyB\u0001ValueB\u0002"; + messageExtBrokerInner.setPropertiesString(propertiesString); + messageExtBrokerInner.deleteProperty("KeyA"); + assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB\u0002KeyB\u0001ValueB\u0002"); + + propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueB\u0002"; + messageExtBrokerInner.setPropertiesString(propertiesString); + messageExtBrokerInner.deleteProperty("KeyA"); + assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB\u0002"); + + propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueB"; + messageExtBrokerInner.setPropertiesString(propertiesString); + messageExtBrokerInner.deleteProperty("KeyA"); + assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB"); + + propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueBKeyA\u0001ValueA\u0002"; + messageExtBrokerInner.setPropertiesString(propertiesString); + messageExtBrokerInner.deleteProperty("KeyA"); + assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueBKeyA\u0001ValueA\u0002"); + + propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueBKeyA\u0001"; + messageExtBrokerInner.setPropertiesString(propertiesString); + messageExtBrokerInner.deleteProperty("KeyA"); + assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueBKeyA\u0001"); + + propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueBKeyA"; + messageExtBrokerInner.setPropertiesString(propertiesString); + messageExtBrokerInner.deleteProperty("KeyA"); + assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueBKeyA"); + } +} diff --git a/container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java b/container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java index 5b712bc30db..d0a550be635 100644 --- a/container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java +++ b/container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java @@ -164,7 +164,7 @@ public void run0() { LOG.error("ScheduledTask fetchNameServerAddr exception", e); } } - }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS); + }, 1000 * 10, this.brokerContainerConfig.getUpdateNamesrvAddrInterval(), TimeUnit.MILLISECONDS); } else if (this.brokerContainerConfig.isFetchNamesrvAddrByAddressServer()) { this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(BrokerIdentity.BROKER_CONTAINER_IDENTITY) { diff --git a/container/src/main/java/org/apache/rocketmq/container/BrokerContainerConfig.java b/container/src/main/java/org/apache/rocketmq/container/BrokerContainerConfig.java index 77422adde8f..e03b10c34d2 100644 --- a/container/src/main/java/org/apache/rocketmq/container/BrokerContainerConfig.java +++ b/container/src/main/java/org/apache/rocketmq/container/BrokerContainerConfig.java @@ -44,6 +44,11 @@ public class BrokerContainerConfig { */ private long fetchNamesrvAddrInterval = 10 * 1000; + /** + * The interval to update namesrv addr, default value is 120 second + */ + private long updateNamesrvAddrInterval = 60 * 2 * 1000; + public String getRocketmqHome() { return rocketmqHome; } @@ -95,4 +100,12 @@ public long getFetchNamesrvAddrInterval() { public void setFetchNamesrvAddrInterval(final long fetchNamesrvAddrInterval) { this.fetchNamesrvAddrInterval = fetchNamesrvAddrInterval; } + + public long getUpdateNamesrvAddrInterval() { + return updateNamesrvAddrInterval; + } + + public void setUpdateNamesrvAddrInterval(long updateNamesrvAddrInterval) { + this.updateNamesrvAddrInterval = updateNamesrvAddrInterval; + } } diff --git a/container/src/main/java/org/apache/rocketmq/container/BrokerContainerProcessor.java b/container/src/main/java/org/apache/rocketmq/container/BrokerContainerProcessor.java index 2ac69112d76..5b825fe811c 100644 --- a/container/src/main/java/org/apache/rocketmq/container/BrokerContainerProcessor.java +++ b/container/src/main/java/org/apache/rocketmq/container/BrokerContainerProcessor.java @@ -91,11 +91,10 @@ private synchronized RemotingCommand addBroker(ChannelHandlerContext ctx, LOGGER.error("addBroker load config from {} failed, {}", configPath, e); } } else { - byte[] body = request.getBody(); - if (body != null) { - String bodyStr = new String(body, MixAll.DEFAULT_CHARSET); - brokerProperties = MixAll.string2Properties(bodyStr); - } + LOGGER.error("addBroker config path is empty"); + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("addBroker config path is empty"); + return response; } if (brokerProperties == null) { diff --git a/container/src/main/java/org/apache/rocketmq/container/ContainerClientHouseKeepingService.java b/container/src/main/java/org/apache/rocketmq/container/ContainerClientHouseKeepingService.java index 8bf4b4a33d0..90c912247ef 100644 --- a/container/src/main/java/org/apache/rocketmq/container/ContainerClientHouseKeepingService.java +++ b/container/src/main/java/org/apache/rocketmq/container/ContainerClientHouseKeepingService.java @@ -49,6 +49,11 @@ public void onChannelIdle(String remoteAddr, Channel channel) { onChannelOperation(CallbackCode.IDLE, remoteAddr, channel); } + @Override + public void onChannelActive(String remoteAddr, Channel channel) { + onChannelOperation(CallbackCode.ACTIVE, remoteAddr, channel); + } + private void onChannelOperation(CallbackCode callbackCode, String remoteAddr, Channel channel) { Collection masterBrokers = this.brokerContainer.getMasterBrokers(); Collection slaveBrokers = this.brokerContainer.getSlaveBrokers(); @@ -103,6 +108,10 @@ public enum CallbackCode { /** * onChannelIdle */ - IDLE + IDLE, + /** + * onChannelActive + */ + ACTIVE } } diff --git a/controller/src/main/java/org/apache/rocketmq/controller/BrokerHousekeepingService.java b/controller/src/main/java/org/apache/rocketmq/controller/BrokerHousekeepingService.java index 652a9eeb0d6..d22d0b6069b 100644 --- a/controller/src/main/java/org/apache/rocketmq/controller/BrokerHousekeepingService.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/BrokerHousekeepingService.java @@ -48,4 +48,9 @@ public void onChannelException(String remoteAddr, Channel channel) { public void onChannelIdle(String remoteAddr, Channel channel) { this.controllerManager.getHeartbeatManager().onBrokerChannelClose(channel); } + + @Override + public void onChannelActive(String remoteAddr, Channel channel) { + + } } diff --git a/docs/cn/Debug_In_Idea.md b/docs/cn/Debug_In_Idea.md new file mode 100644 index 00000000000..fd01751ee99 --- /dev/null +++ b/docs/cn/Debug_In_Idea.md @@ -0,0 +1,55 @@ +## 本地调试RocketMQ + +### Step0: 解决依赖问题 +1. 运行前下载RocketMQ需要的maven依赖,可以使用`mvn clean install -Dmaven.test.skip=true` +2. 确保本地能够编译通过 + +### Step1: 启动NameServer +1. NamerServer的启动类在`org.apache.rocketmq.namesrv.NamesrvStartup` +2. `Idea-Edit Configurations`中添加运行参数 `ROCKETMQ_HOME=` +![Idea_config_nameserver.png](image/Idea_config_nameserver.png) +3. 运行NameServer,观察到如下日志输出则启动成功 +```shell +The Name Server boot success. serializeType=JSON, address 0.0.0.0:9876 +``` + +### Step2: 启动Broker +1. Broker的启动类在`org.apache.rocketmq.broker.BrokerStartup` +2. 创建`/rocketmq/conf/broker.conf`文件或直接在官方release发布包中拷贝即可 +```shell +# broker.conf + +brokerClusterName = DefaultCluster +brokerName = broker-a +brokerId = 0 +deleteWhen = 04 +fileReservedTime = 48 +brokerRole = ASYNC_MASTER +flushDiskType = ASYNC_FLUSH +namesrvAddr = 127.0.0.1:9876 # name server地址 +``` +3. `Idea-Edit Configurations`中添加运行参数 `ROCKETMQ_HOME=` 以及环境变量`-c /Users/xxx/rocketmq/conf/broker.conf` +![Idea_config_broker.png](image/Idea_config_broker.png) +4. 运行Broker,观察到如下日志则启动成功 +```shell +The broker[broker-a,192.169.1.2:10911] boot success... +``` + +### Step3: 发送或消费消息 +至此已经完成了RocketMQ的启动,可以使用`/example`里的示例进行收发消息 + +### 补充:本地启动Proxy +1. RocketMQ5.x支持了Proxy模式,使用`LOCAL`模式可以免去`Step2`,启动类在`org.apache.rocketmq.proxy.ProxyStartup` +2. `Idea-Edit Configurations`中添加运行参数 `ROCKETMQ_HOME=` +3. 在`/conf/`下新建配置文件`rmq-proxy.json` +```json +{ + "rocketMQClusterName": "DefaultCluster", + "nameSrvAddr": "127.0.0.1:9876", + "proxyMode": "local" +} +``` +4. 运行Proxy,观察到如下日志则启动成功 +```shell +Sat Aug 26 15:29:33 CST 2023 rocketmq-proxy startup successfully +``` \ No newline at end of file diff --git a/docs/cn/image/Idea_config_broker.png b/docs/cn/image/Idea_config_broker.png new file mode 100644 index 00000000000..6fbedcfb627 Binary files /dev/null and b/docs/cn/image/Idea_config_broker.png differ diff --git a/docs/cn/image/Idea_config_nameserver.png b/docs/cn/image/Idea_config_nameserver.png new file mode 100644 index 00000000000..65edd991135 Binary files /dev/null and b/docs/cn/image/Idea_config_nameserver.png differ diff --git a/docs/en/Debug_In_Idea.md b/docs/en/Debug_In_Idea.md new file mode 100644 index 00000000000..9967980671f --- /dev/null +++ b/docs/en/Debug_In_Idea.md @@ -0,0 +1,55 @@ +## How to Debug RocketMQ in Idea + +### Step0: Resolve dependencies +1. To download the Maven dependencies required for running RocketMQ, you can use the following command:`mvn clean install -Dmaven.test.skip=true` +2. Ensure successful local compilation. + +### Step1: Start NameServer +1. The startup class for NameServer is located in `org.apache.rocketmq.namesrv.NamesrvStartup`. +2. Add runtime `ROCKETMQ_HOME=` parameters in `Idea-Edit Configurations`. +![Idea_config_nameserver.png](../cn/image/Idea_config_nameserver.png) +3. Run NameServer and if the following log output is observed, it indicates successful startup. +```shell +The Name Server boot success. serializeType=JSON, address 0.0.0.0:9876 +``` + +### Step2: Start Broker +1. The startup class for Broker is located in`org.apache.rocketmq.broker.BrokerStartup` +2. Create the `/rocketmq/conf/broker.conf` file or simply copy it from the official release package. +```shell +# broker.conf + +brokerClusterName = DefaultCluster +brokerName = broker-a +brokerId = 0 +deleteWhen = 04 +fileReservedTime = 48 +brokerRole = ASYNC_MASTER +flushDiskType = ASYNC_FLUSH +namesrvAddr = 127.0.0.1:9876 # name server地址 +``` +3. Add the runtime parameter `ROCKETMQ_HOME=` and the environment variable `-c /Users/xxx/rocketmq/conf/broker.conf` in `Idea-Edit Configurations`. +![Idea_config_broker.png](../cn/image/Idea_config_broker.png) +4. Run the Broker and if the following log is observed, it indicates successful startup. +```shell +The broker[broker-a,192.169.1.2:10911] boot success... +``` + +### Step3: Send or Consume Messages +RocketMQ startup is now complete. You can use the examples provided in `/example` to send and consume messages. + +### Additional: Start the Proxy locally. +1. RocketMQ 5.x introduced the Proxy mode. Using the `LOCAL` mode eliminates the need for `Step2`. The startup class is located at `org.apache.rocketmq.proxy.ProxyStartup`. +2. Add the runtime parameter `ROCKETMQ_HOME=` in `Idea-Edit Configurations`. +3. Create a new configuration file named `rmq-proxy.json` in the `/conf/` directory. +```json +{ + "rocketMQClusterName": "DefaultCluster", + "nameSrvAddr": "127.0.0.1:9876", + "proxyMode": "local" +} +``` +4. Run the Proxy, and if the following log is observed, it indicates successful startup. +```shell +Sat Aug 26 15:29:33 CST 2023 rocketmq-proxy startup successfully +``` \ No newline at end of file diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java b/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java index 5ac8d247d95..e1a02aa2664 100644 --- a/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java @@ -75,7 +75,7 @@ public void run() { if (msgs != null && !msgs.isEmpty()) { this.doSomething(msgs); - //update offset to broker + //update offset to local memory, eventually to broker consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset()); //print pull tps this.incPullTPS(topic, pullResult.getMsgFoundList().size()); diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/BrokerHousekeepingService.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/BrokerHousekeepingService.java index 80d9939923f..b527429f77d 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/BrokerHousekeepingService.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/BrokerHousekeepingService.java @@ -46,4 +46,9 @@ public void onChannelException(String remoteAddr, Channel channel) { public void onChannelIdle(String remoteAddr, Channel channel) { this.namesrvController.getRouteInfoManager().onChannelDestroy(channel); } + + @Override + public void onChannelActive(String remoteAddr, Channel channel) { + + } } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/ClientHousekeepingService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/ClientHousekeepingService.java index e213ae85540..74eb6f2db2f 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/ClientHousekeepingService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/ClientHousekeepingService.java @@ -49,5 +49,9 @@ public void onChannelIdle(String remoteAddr, Channel channel) { this.clientManagerActivity.doChannelCloseEvent(remoteAddr, channel); } + @Override + public void onChannelActive(String remoteAddr, Channel channel) { + + } } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java index ca7dcc9eb0c..aaa688fee64 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java @@ -104,7 +104,7 @@ public CompletableFuture> sendMessage(ProxyContext ctx, Address body = message.getBody(); messageId = MessageClientIDSetter.getUniqID(message); } - RemotingCommand request = LocalRemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader); + RemotingCommand request = LocalRemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader, ctx.getLanguage()); request.setBody(body); CompletableFuture future = new CompletableFuture<>(); SimpleChannel channel = channelManager.createInvocationChannel(ctx); @@ -162,7 +162,7 @@ public CompletableFuture sendMessageBack(ProxyContext ctx, Rece ConsumerSendMsgBackRequestHeader requestHeader, long timeoutMillis) { SimpleChannel channel = channelManager.createChannel(ctx); ChannelHandlerContext channelHandlerContext = channel.getChannelHandlerContext(); - RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader); + RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader, ctx.getLanguage()); CompletableFuture future = new CompletableFuture<>(); try { RemotingCommand response = brokerController.getSendMessageProcessor() @@ -181,7 +181,7 @@ public CompletableFuture endTransactionOneway(ProxyContext ctx, String bro CompletableFuture future = new CompletableFuture<>(); SimpleChannel channel = channelManager.createChannel(ctx); ChannelHandlerContext channelHandlerContext = channel.getChannelHandlerContext(); - RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, requestHeader); + RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, requestHeader, ctx.getLanguage()); try { brokerController.getEndTransactionProcessor() .processRequest(channelHandlerContext, command); @@ -196,7 +196,7 @@ public CompletableFuture endTransactionOneway(ProxyContext ctx, String bro public CompletableFuture popMessage(ProxyContext ctx, AddressableMessageQueue messageQueue, PopMessageRequestHeader requestHeader, long timeoutMillis) { requestHeader.setBornTime(System.currentTimeMillis()); - RemotingCommand request = LocalRemotingCommand.createRequestCommand(RequestCode.POP_MESSAGE, requestHeader); + RemotingCommand request = LocalRemotingCommand.createRequestCommand(RequestCode.POP_MESSAGE, requestHeader, ctx.getLanguage()); CompletableFuture future = new CompletableFuture<>(); SimpleChannel channel = channelManager.createInvocationChannel(ctx); InvocationContext invocationContext = new InvocationContext(future); @@ -307,7 +307,7 @@ public CompletableFuture changeInvisibleTime(ProxyContext ctx, Receip ChangeInvisibleTimeRequestHeader requestHeader, long timeoutMillis) { SimpleChannel channel = channelManager.createChannel(ctx); ChannelHandlerContext channelHandlerContext = channel.getChannelHandlerContext(); - RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.CHANGE_MESSAGE_INVISIBLETIME, requestHeader); + RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.CHANGE_MESSAGE_INVISIBLETIME, requestHeader, ctx.getLanguage()); CompletableFuture future = new CompletableFuture<>(); try { RemotingCommand response = brokerController.getChangeInvisibleTimeProcessor() @@ -346,7 +346,7 @@ public CompletableFuture ackMessage(ProxyContext ctx, ReceiptHandle h AckMessageRequestHeader requestHeader, long timeoutMillis) { SimpleChannel channel = channelManager.createChannel(ctx); ChannelHandlerContext channelHandlerContext = channel.getChannelHandlerContext(); - RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE, requestHeader); + RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE, requestHeader, ctx.getLanguage()); CompletableFuture future = new CompletableFuture<>(); try { RemotingCommand response = brokerController.getAckMessageProcessor() diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalRemotingCommand.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalRemotingCommand.java index 73048dbbc24..915cafcd579 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalRemotingCommand.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalRemotingCommand.java @@ -16,16 +16,19 @@ */ package org.apache.rocketmq.proxy.service.message; -import java.util.HashMap; import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.protocol.LanguageCode; import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import java.util.HashMap; + public class LocalRemotingCommand extends RemotingCommand { - public static LocalRemotingCommand createRequestCommand(int code, CommandCustomHeader customHeader) { + public static LocalRemotingCommand createRequestCommand(int code, CommandCustomHeader customHeader, String language) { LocalRemotingCommand cmd = new LocalRemotingCommand(); cmd.setCode(code); + cmd.setLanguage(LanguageCode.getCode(language)); cmd.writeCustomHeader(customHeader); cmd.setExtFields(new HashMap<>()); setCmdVersion(cmd); @@ -37,4 +40,5 @@ public CommandCustomHeader decodeCommandCustomHeader( Class classHeader) throws RemotingCommandException { return classHeader.cast(readCustomHeader()); } + } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/ChannelEventListener.java b/remoting/src/main/java/org/apache/rocketmq/remoting/ChannelEventListener.java index c99133b3a2d..6802e69b90d 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/ChannelEventListener.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/ChannelEventListener.java @@ -26,4 +26,6 @@ public interface ChannelEventListener { void onChannelException(final String remoteAddr, final Channel channel); void onChannelIdle(final String remoteAddr, final Channel channel); + + void onChannelActive(final String remoteAddr, final Channel channel); } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEventType.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEventType.java index 9ac944aad30..4bc9d57dda0 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEventType.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEventType.java @@ -20,5 +20,6 @@ public enum NettyEventType { CONNECT, CLOSE, IDLE, - EXCEPTION + EXCEPTION, + ACTIVE } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java index 07ace28ea54..62a8a72901c 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java @@ -701,6 +701,9 @@ public void run() { case EXCEPTION: listener.onChannelException(event.getRemoteAddr(), event.getChannel()); break; + case ACTIVE: + listener.onChannelActive(event.getRemoteAddr(), event.getChannel()); + break; default: break; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java index 4bc51bd833a..9f151913067 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java @@ -716,20 +716,25 @@ private Channel createChannel(final String addr) throws InterruptedException { } if (cw != null) { - ChannelFuture channelFuture = cw.getChannelFuture(); - if (channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis())) { - if (cw.isOK()) { - LOGGER.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString()); - return cw.getChannel(); - } else { - LOGGER.warn("createChannel: connect remote host[" + addr + "] failed, " + channelFuture.toString()); - } + return waitChannelFuture(addr, cw); + } + + return null; + } + + private Channel waitChannelFuture(String addr, ChannelWrapper cw) { + ChannelFuture channelFuture = cw.getChannelFuture(); + if (channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis())) { + if (cw.isOK()) { + LOGGER.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString()); + return cw.getChannel(); } else { - LOGGER.warn("createChannel: connect remote host[{}] timeout {}ms, {}", addr, this.nettyClientConfig.getConnectTimeoutMillis(), - channelFuture.toString()); + LOGGER.warn("createChannel: connect remote host[{}] failed, {}", addr, channelFuture.toString()); } + } else { + LOGGER.warn("createChannel: connect remote host[{}] timeout {}ms, {}", addr, this.nettyClientConfig.getConnectTimeoutMillis(), + channelFuture.toString()); } - return null; } @@ -818,8 +823,14 @@ public CompletableFuture invokeImpl(final Channel channel, final long duration = stopwatch.elapsed(TimeUnit.MILLISECONDS); stopwatch.stop(); RemotingCommand retryRequest = RemotingCommand.createRequestCommand(request.getCode(), request.readCustomHeader()); - Channel retryChannel = channelWrapper.getChannel(); - if (channel != retryChannel) { + retryRequest.setBody(request.getBody()); + Channel retryChannel; + if (channelWrapper.isOK()) { + retryChannel = channelWrapper.getChannel(); + } else { + retryChannel = waitChannelFuture(channelWrapper.getChannelAddress(), channelWrapper); + } + if (retryChannel != null && channel != retryChannel) { return super.invokeImpl(retryChannel, retryRequest, timeoutMillis - duration); } } @@ -994,6 +1005,10 @@ public void updateLastResponseTime() { this.lastResponseTime = System.currentTimeMillis(); } + public String getChannelAddress() { + return channelAddress; + } + public boolean reconnect() { if (lock.writeLock().tryLock()) { try { @@ -1091,6 +1106,17 @@ public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, Sock } } + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); + LOGGER.info("NETTY CLIENT PIPELINE: ACTIVE, {}", remoteAddress); + super.channelActive(ctx); + + if (NettyRemotingClient.this.channelEventListener != null) { + NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.ACTIVE, remoteAddress, ctx.channel())); + } + } + @Override public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java index 19280f99672..2df9fbf0278 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java @@ -17,6 +17,11 @@ package org.apache.rocketmq.remoting.protocol; +import java.util.Arrays; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + public enum LanguageCode { JAVA((byte) 0), CPP((byte) 1), @@ -50,4 +55,10 @@ public static LanguageCode valueOf(byte code) { public byte getCode() { return code; } + + private static final Map MAP = Arrays.stream(LanguageCode.values()).collect(Collectors.toMap(LanguageCode::name, Function.identity())); + + public static LanguageCode getCode(String language) { + return MAP.get(language); + } } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java index d27135132c9..e93072adff8 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java @@ -89,6 +89,7 @@ public class RemotingCommand { private String remark; private HashMap extFields; private transient CommandCustomHeader customHeader; + private transient CommandCustomHeader cachedHeader; private SerializeType serializeTypeCurrentRPC = serializeTypeConfigInThisServer; @@ -260,10 +261,19 @@ public void writeCustomHeader(CommandCustomHeader customHeader) { public CommandCustomHeader decodeCommandCustomHeader( Class classHeader) throws RemotingCommandException { - return decodeCommandCustomHeader(classHeader, true); + return decodeCommandCustomHeader(classHeader, false); } - public CommandCustomHeader decodeCommandCustomHeader(Class classHeader, + public CommandCustomHeader decodeCommandCustomHeader( + Class classHeader, boolean isCached) throws RemotingCommandException { + if (isCached && cachedHeader != null) { + return cachedHeader; + } + cachedHeader = decodeCommandCustomHeaderDirectly(classHeader, true); + return cachedHeader; + } + + public CommandCustomHeader decodeCommandCustomHeaderDirectly(Class classHeader, boolean useFastEncode) throws RemotingCommandException { CommandCustomHeader objectHeader; try { diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java index 0b1a5e0104b..1811deba20a 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java @@ -80,6 +80,7 @@ public class RequestCode { public static final int UPDATE_GLOBAL_WHITE_ADDRS_CONFIG = 53; + @Deprecated public static final int GET_BROKER_CLUSTER_ACL_CONFIG = 54; public static final int GET_TIMER_CHECK_POINT = 60; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetBrokerClusterAclConfigResponseBody.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetBrokerClusterAclConfigResponseBody.java deleted file mode 100644 index 4987242c2a9..00000000000 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetBrokerClusterAclConfigResponseBody.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.remoting.protocol.header; - -import java.util.List; -import org.apache.rocketmq.common.PlainAccessConfig; -import org.apache.rocketmq.remoting.protocol.RemotingSerializable; - -public class GetBrokerClusterAclConfigResponseBody extends RemotingSerializable { - - private List globalWhiteAddrs; - - private List plainAccessConfigs; - - public List getGlobalWhiteAddrs() { - return globalWhiteAddrs; - } - - public void setGlobalWhiteAddrs(List globalWhiteAddrs) { - this.globalWhiteAddrs = globalWhiteAddrs; - } - - public List getPlainAccessConfigs() { - return plainAccessConfigs; - } - - public void setPlainAccessConfigs(List plainAccessConfigs) { - this.plainAccessConfigs = plainAccessConfigs; - } -} diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetBrokerClusterAclConfigResponseHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetBrokerClusterAclConfigResponseHeader.java deleted file mode 100644 index 7de73aa4daa..00000000000 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetBrokerClusterAclConfigResponseHeader.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.remoting.protocol.header; - -import java.util.List; -import org.apache.rocketmq.common.PlainAccessConfig; -import org.apache.rocketmq.remoting.CommandCustomHeader; -import org.apache.rocketmq.remoting.annotation.CFNotNull; -import org.apache.rocketmq.remoting.exception.RemotingCommandException; - -public class GetBrokerClusterAclConfigResponseHeader implements CommandCustomHeader { - - @CFNotNull - private List plainAccessConfigs; - - @Override - public void checkFields() throws RemotingCommandException { - } - - public List getPlainAccessConfigs() { - return plainAccessConfigs; - } - - public void setPlainAccessConfigs(List plainAccessConfigs) { - this.plainAccessConfigs = plainAccessConfigs; - } -} diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/header/FastCodesHeaderTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/header/FastCodesHeaderTest.java index 6bb100f574f..b6a0d631129 100644 --- a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/header/FastCodesHeaderTest.java +++ b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/header/FastCodesHeaderTest.java @@ -73,7 +73,7 @@ private HashMap buildExtFields(List fields) { private void check(RemotingCommand command, List fields, Class classHeader) throws Exception { - CommandCustomHeader o1 = command.decodeCommandCustomHeader(classHeader, false); + CommandCustomHeader o1 = command.decodeCommandCustomHeaderDirectly(classHeader, false); CommandCustomHeader o2 = classHeader.getDeclaredConstructor().newInstance(); ((FastCodesHeader)o2).decode(command.getExtFields()); for (Field f : fields) { diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index 93102799b7c..3d3ee86b8f1 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -73,6 +73,10 @@ public class CommitLog implements Swappable { protected static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); // End of file empty MAGIC CODE cbd43194 public final static int BLANK_MAGIC_CODE = -875286124; + /** + * CRC32 Format: [PROPERTY_CRC32 + NAME_VALUE_SEPARATOR + 10-digit fixed-length string + PROPERTY_SEPARATOR] + */ + public static final int CRC32_RESERVED_LEN = MessageConst.PROPERTY_CRC32.length() + 1 + 10 + 1; protected final MappedFileQueue mappedFileQueue; protected final DefaultMessageStore defaultMessageStore; @@ -96,6 +100,8 @@ public class CommitLog implements Swappable { protected int commitLogSize; + private final boolean enabledAppendPropCRC; + public CommitLog(final DefaultMessageStore messageStore) { String storePath = messageStore.getMessageStoreConfig().getStorePathCommitLog(); if (storePath.contains(MixAll.MULTI_PATH_SPLITTER)) { @@ -117,7 +123,9 @@ public CommitLog(final DefaultMessageStore messageStore) { putMessageThreadLocal = new ThreadLocal() { @Override protected PutMessageThreadLocal initialValue() { - return new PutMessageThreadLocal(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize()); + return new PutMessageThreadLocal( + defaultMessageStore.getMessageStoreConfig().getMaxMessageSize(), + defaultMessageStore.getMessageStoreConfig().isEnabledAppendPropCRC()); } }; this.putMessageLock = messageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock(); @@ -127,6 +135,8 @@ protected PutMessageThreadLocal initialValue() { this.topicQueueLock = new TopicQueueLock(messageStore.getMessageStoreConfig().getTopicQueueLockNum()); this.commitLogSize = messageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(); + + this.enabledAppendPropCRC = messageStore.getMessageStoreConfig().isEnabledAppendPropCRC(); } public void setFullStorePaths(Set fullStorePaths) { @@ -470,10 +480,16 @@ public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer, byteBuffer.get(bytesContent, 0, bodyLen); if (checkCRC) { - int crc = UtilAll.crc32(bytesContent, 0, bodyLen); - if (crc != bodyCRC) { - log.warn("CRC check failed. bodyCRC={}, currentCRC={}", crc, bodyCRC); - return new DispatchRequest(-1, false/* success */); + /** + * When the forceVerifyPropCRC = false, + * use original bodyCrc validation. + */ + if (!this.defaultMessageStore.getMessageStoreConfig().isForceVerifyPropCRC()) { + int crc = UtilAll.crc32(bytesContent, 0, bodyLen); + if (crc != bodyCRC) { + log.warn("CRC check failed. bodyCRC={}, currentCRC={}", crc, bodyCRC); + return new DispatchRequest(-1, false/* success */); + } } } } else { @@ -531,6 +547,43 @@ public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer, } } + if (checkCRC) { + /** + * When the forceVerifyPropCRC = true, + * Crc verification needs to be performed on the entire message data (excluding the length reserved at the tail) + */ + if (this.defaultMessageStore.getMessageStoreConfig().isForceVerifyPropCRC()) { + int expectedCRC = -1; + if (propertiesMap != null) { + String crc32Str = propertiesMap.get(MessageConst.PROPERTY_CRC32); + if (crc32Str != null) { + expectedCRC = 0; + for (int i = crc32Str.length() - 1; i >= 0; i--) { + int num = crc32Str.charAt(i) - '0'; + expectedCRC *= 10; + expectedCRC += num; + } + } + } + if (expectedCRC > 0) { + ByteBuffer tmpBuffer = byteBuffer.duplicate(); + tmpBuffer.position(tmpBuffer.position() - totalSize); + tmpBuffer.limit(tmpBuffer.position() + totalSize - CommitLog.CRC32_RESERVED_LEN); + int crc = UtilAll.crc32(tmpBuffer); + if (crc != expectedCRC) { + log.warn( + "CommitLog#checkAndDispatchMessage: failed to check message CRC, expected " + + "CRC={}, actual CRC={}", bodyCRC, crc); + return new DispatchRequest(-1, false/* success */); + } + } else { + log.warn( + "CommitLog#checkAndDispatchMessage: failed to check message CRC, not found CRC in properties"); + return new DispatchRequest(-1, false/* success */); + } + } + } + int readLength = MessageExtEncoder.calMsgLength(messageVersion, sysFlag, bodyLen, topicLen, propertiesLength); if (totalSize != readLength) { doNothingForDeadCode(reconsumeTimes); @@ -846,9 +899,12 @@ public CompletableFuture asyncPutMessage(final MessageExtBroke if (!defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) { msg.setStoreTimestamp(System.currentTimeMillis()); } - // Set the message body CRC (consider the most appropriate setting on the client) msg.setBodyCRC(UtilAll.crc32(msg.getBody())); + if (enabledAppendPropCRC) { + // delete crc32 properties if exist + msg.deleteProperty(MessageConst.PROPERTY_CRC32); + } // Back to Results AppendMessageResult result = null; @@ -1764,6 +1820,7 @@ class DefaultAppendMessageCallback implements AppendMessageCallback { private static final int END_FILE_MIN_BLANK_LENGTH = 4 + 4; // Store the message content private final ByteBuffer msgStoreItemMemory; + private final int crc32ReservedLength = CommitLog.CRC32_RESERVED_LEN; DefaultAppendMessageCallback() { this.msgStoreItemMemory = ByteBuffer.allocate(END_FILE_MIN_BLANK_LENGTH); @@ -1837,6 +1894,15 @@ public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer pos += 8 + 4 + 8 + ipLen; // refresh store time stamp in lock preEncodeBuffer.putLong(pos, msgInner.getStoreTimestamp()); + if (enabledAppendPropCRC) { + // 18 CRC32 + int checkSize = msgLen - crc32ReservedLength; + ByteBuffer tmpBuffer = preEncodeBuffer.duplicate(); + tmpBuffer.limit(tmpBuffer.position() + checkSize); + int crc32 = UtilAll.crc32(tmpBuffer); + tmpBuffer.limit(tmpBuffer.position() + crc32ReservedLength); + MessageDecoder.createCrc32(tmpBuffer, crc32); + } final long beginTimeMills = CommitLog.this.defaultMessageStore.now(); CommitLog.this.getMessageStore().getPerfCounter().startTick("WRITE_MEMORY_TIME_MS"); @@ -1918,6 +1984,15 @@ public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer pos += 8 + 4 + 8 + bornHostLength; // refresh store time stamp in lock messagesByteBuff.putLong(pos, messageExtBatch.getStoreTimestamp()); + if (enabledAppendPropCRC) { + //append crc32 + int checkSize = msgLen - crc32ReservedLength; + ByteBuffer tmpBuffer = messagesByteBuff.duplicate(); + tmpBuffer.position(msgPos).limit(msgPos + checkSize); + int crc32 = UtilAll.crc32(tmpBuffer); + messagesByteBuff.position(msgPos + checkSize); + MessageDecoder.createCrc32(messagesByteBuff, crc32); + } putMessageContext.getPhyPos()[index++] = wroteOffset + totalMsgLen - msgLen; queueOffset++; diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java b/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java index ee609a337bc..c1d80872859 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java +++ b/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java @@ -19,6 +19,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.UnpooledByteBufAllocator; +import java.nio.ByteBuffer; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.message.MessageDecoder; @@ -29,8 +30,6 @@ import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; -import java.nio.ByteBuffer; - public class MessageExtEncoder { protected static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); private ByteBuf byteBuf; @@ -38,7 +37,13 @@ public class MessageExtEncoder { private int maxMessageBodySize; // The maximum length of the full message. private int maxMessageSize; + private final int crc32ReservedLength; + public MessageExtEncoder(final int maxMessageBodySize) { + this(maxMessageBodySize, false); + } + + public MessageExtEncoder(final int maxMessageBodySize, boolean enabledAppendPropCRC) { ByteBufAllocator alloc = UnpooledByteBufAllocator.DEFAULT; //Reserve 64kb for encoding buffer outside body int maxMessageSize = Integer.MAX_VALUE - maxMessageBodySize >= 64 * 1024 ? @@ -46,6 +51,7 @@ public MessageExtEncoder(final int maxMessageBodySize) { byteBuf = alloc.directBuffer(maxMessageSize); this.maxMessageBodySize = maxMessageBodySize; this.maxMessageSize = maxMessageSize; + this.crc32ReservedLength = enabledAppendPropCRC ? CommitLog.CRC32_RESERVED_LEN : 0; } public static int calMsgLength(MessageVersion messageVersion, @@ -81,10 +87,13 @@ public PutMessageResult encode(MessageExtBrokerInner msgInner) { final byte[] propertiesData = msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8); - final int propertiesLength = propertiesData == null ? 0 : propertiesData.length; + boolean needAppendLastPropertySeparator = crc32ReservedLength > 0 && propertiesData != null && propertiesData.length > 0 + && propertiesData[propertiesData.length - 1] != MessageDecoder.PROPERTY_SEPARATOR; + + final int propertiesLength = (propertiesData == null ? 0 : propertiesData.length) + (needAppendLastPropertySeparator ? 1 : 0) + crc32ReservedLength; if (propertiesLength > Short.MAX_VALUE) { - log.warn("putMessage message properties length too long. length={}", propertiesData.length); + log.warn("putMessage message properties length too long. length={}", propertiesLength); return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null); } @@ -160,8 +169,14 @@ public PutMessageResult encode(MessageExtBrokerInner msgInner) { // 17 PROPERTIES this.byteBuf.writeShort((short) propertiesLength); - if (propertiesLength > 0) + if (propertiesLength > crc32ReservedLength) { this.byteBuf.writeBytes(propertiesData); + } + if (needAppendLastPropertySeparator) { + this.byteBuf.writeByte((byte) MessageDecoder.PROPERTY_SEPARATOR); + } + // 18 CRC32 + this.byteBuf.writerIndex(this.byteBuf.writerIndex() + crc32ReservedLength); return null; } @@ -213,10 +228,11 @@ public ByteBuffer encode(final MessageExtBatch messageExtBatch, PutMessageContex final byte[] topicData = messageExtBatch.getTopic().getBytes(MessageDecoder.CHARSET_UTF8); final int topicLength = topicData.length; - final int topicLengthSize = messageExtBatch.getVersion().getTopicLengthSize(); int totalPropLen = needAppendLastPropertySeparator ? - propertiesLen + batchPropLen + topicLengthSize : propertiesLen + batchPropLen; + propertiesLen + batchPropLen + 1 : propertiesLen + batchPropLen; + // properties need to add crc32 + totalPropLen += crc32ReservedLength; final int msgLen = calMsgLength( messageExtBatch.getVersion(), messageExtBatch.getSysFlag(), bodyLen, topicLength, totalPropLen); @@ -278,6 +294,7 @@ public ByteBuffer encode(final MessageExtBatch messageExtBatch, PutMessageContex } this.byteBuf.writeBytes(batchPropData, 0, batchPropLen); } + this.byteBuf.writerIndex(this.byteBuf.writerIndex() + crc32ReservedLength); } putMessageContext.setBatchSize(batchSize); putMessageContext.setPhyPos(new long[batchSize]); @@ -304,8 +321,13 @@ public void updateEncoderBufferCapacity(int newMaxMessageBodySize) { static class PutMessageThreadLocal { private final MessageExtEncoder encoder; private final StringBuilder keyBuilder; + PutMessageThreadLocal(int size) { - encoder = new MessageExtEncoder(size); + this(size, false); + } + + PutMessageThreadLocal(int size, boolean enabledAppendPropCRC) { + encoder = new MessageExtEncoder(size, enabledAppendPropCRC); keyBuilder = new StringBuilder(); } diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java index 028facbdc6d..8cb3ea6e9ee 100644 --- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java +++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java @@ -270,6 +270,12 @@ public class MessageStoreConfig { */ private boolean autoMessageVersionOnTopicLen = true; + /** + * It cannot be changed after the broker is started. + * Modifications need to be restarted to take effect. + */ + private boolean enabledAppendPropCRC = false; + private boolean forceVerifyPropCRC = false; private int travelCqFileNumWhenGetMessage = 1; // Sleep interval between to corrections private int correctLogicMinOffsetSleepInterval = 1; @@ -405,6 +411,14 @@ public class MessageStoreConfig { private int topicQueueLockNum = 32; + public boolean isEnabledAppendPropCRC() { + return enabledAppendPropCRC; + } + + public void setEnabledAppendPropCRC(boolean enabledAppendPropCRC) { + this.enabledAppendPropCRC = enabledAppendPropCRC; + } + public boolean isDebugLockEnable() { return debugLockEnable; } @@ -640,6 +654,15 @@ public void setCheckCRCOnRecover(boolean checkCRCOnRecover) { this.checkCRCOnRecover = checkCRCOnRecover; } + public boolean isForceVerifyPropCRC() { + return forceVerifyPropCRC; + } + + public void setForceVerifyPropCRC(boolean forceVerifyPropCRC) { + this.forceVerifyPropCRC = forceVerifyPropCRC; + } + + public String getStorePathCommitLog() { if (storePathCommitLog == null) { return storePathRootDir + File.separator + "commitlog"; diff --git a/store/src/test/java/org/apache/rocketmq/store/AppendPropCRCTest.java b/store/src/test/java/org/apache/rocketmq/store/AppendPropCRCTest.java new file mode 100644 index 00000000000..c8ed4d74db8 --- /dev/null +++ b/store/src/test/java/org/apache/rocketmq/store/AppendPropCRCTest.java @@ -0,0 +1,200 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.store; + +import java.io.File; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageExtBatch; +import org.apache.rocketmq.common.message.MessageExtBrokerInner; +import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class AppendPropCRCTest { + + AppendMessageCallback callback; + + MessageExtEncoder encoder; + + CommitLog commitLog; + + @Before + public void init() throws Exception { + MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); + messageStoreConfig.setMappedFileSizeCommitLog(1024 * 8); + messageStoreConfig.setMappedFileSizeConsumeQueue(1024 * 4); + messageStoreConfig.setMaxHashSlotNum(100); + messageStoreConfig.setMaxIndexNum(100 * 10); + messageStoreConfig.setStorePathRootDir(System.getProperty("java.io.tmpdir") + File.separator + "unitteststore"); + messageStoreConfig.setStorePathCommitLog(System.getProperty("java.io.tmpdir") + File.separator + "unitteststore" + File.separator + "commitlog"); + messageStoreConfig.setForceVerifyPropCRC(true); + messageStoreConfig.setEnabledAppendPropCRC(true); + //too much reference + DefaultMessageStore messageStore = new DefaultMessageStore(messageStoreConfig, null, null, new BrokerConfig(), new ConcurrentHashMap<>()); + commitLog = new CommitLog(messageStore); + encoder = new MessageExtEncoder(10 * 1024 * 1024, true); + callback = commitLog.new DefaultAppendMessageCallback(); + } + + @After + public void destroy() { + UtilAll.deleteFile(new File(System.getProperty("user.home") + File.separator + "unitteststore")); + } + + @Test + public void testAppendMessageSucc() throws Exception { + String topic = "test-topic"; + int queue = 0; + int msgNum = 10; + int propertiesLen = 0; + Message msg = new Message(); + msg.setBody("body".getBytes()); + msg.setTopic(topic); + msg.setTags("abc"); + msg.putUserProperty("a", "aaaaaaaa"); + msg.putUserProperty("b", "bbbbbbbb"); + msg.putUserProperty("c", "cccccccc"); + msg.putUserProperty("d", "dddddddd"); + msg.putUserProperty("e", "eeeeeeee"); + msg.putUserProperty("f", "ffffffff"); + + MessageExtBrokerInner messageExtBrokerInner = new MessageExtBrokerInner(); + messageExtBrokerInner.setTopic(topic); + messageExtBrokerInner.setQueueId(queue); + messageExtBrokerInner.setBornTimestamp(System.currentTimeMillis()); + messageExtBrokerInner.setBornHost(new InetSocketAddress("127.0.0.1", 123)); + messageExtBrokerInner.setStoreHost(new InetSocketAddress("127.0.0.1", 124)); + messageExtBrokerInner.setBody(msg.getBody()); + messageExtBrokerInner.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); + propertiesLen = messageExtBrokerInner.getPropertiesString().length(); + + ByteBuffer buff = ByteBuffer.allocate(1024 * 10); + for (int i = 0; i < msgNum; i++) { + encoder.encode(messageExtBrokerInner); + messageExtBrokerInner.setEncodedBuff(encoder.getEncoderBuffer()); + AppendMessageResult allresult = callback.doAppend(0, buff, 1024 * 10, messageExtBrokerInner, null); + assertEquals(AppendMessageStatus.PUT_OK, allresult.getStatus()); + } + // Expected to pass when message is not modified + buff.flip(); + for (int i = 0; i < msgNum - 1; i++) { + DispatchRequest request = commitLog.checkMessageAndReturnSize(buff, true, false); + assertTrue(request.isSuccess()); + } + // Modify the properties of the last message and expect the verification to fail. + int idx = buff.limit() - (propertiesLen / 2); + buff.put(idx, (byte) (buff.get(idx) + 1)); + DispatchRequest request = commitLog.checkMessageAndReturnSize(buff, true, false); + assertFalse(request.isSuccess()); + } + + @Test + public void testAppendMessageBatchSucc() throws Exception { + List messages = new ArrayList<>(); + String topic = "test-topic"; + int queue = 0; + int propertiesLen = 0; + for (int i = 0; i < 10; i++) { + Message msg = new Message(); + msg.setBody("body".getBytes()); + msg.setTopic(topic); + msg.setTags("abc"); + msg.putUserProperty("a", "aaaaaaaa"); + msg.putUserProperty("b", "bbbbbbbb"); + msg.putUserProperty("c", "cccccccc"); + msg.putUserProperty("d", "dddddddd"); + msg.putUserProperty("e", "eeeeeeee"); + msg.putUserProperty("f", "ffffffff"); + String propertiesString = MessageDecoder.messageProperties2String(msg.getProperties()); + propertiesLen = propertiesString.length(); + messages.add(msg); + } + MessageExtBatch messageExtBatch = new MessageExtBatch(); + messageExtBatch.setTopic(topic); + messageExtBatch.setQueueId(queue); + messageExtBatch.setBornTimestamp(System.currentTimeMillis()); + messageExtBatch.setBornHost(new InetSocketAddress("127.0.0.1", 123)); + messageExtBatch.setStoreHost(new InetSocketAddress("127.0.0.1", 124)); + messageExtBatch.setBody(MessageDecoder.encodeMessages(messages)); + + PutMessageContext putMessageContext = new PutMessageContext(topic + "-" + queue); + messageExtBatch.setEncodedBuff(encoder.encode(messageExtBatch, putMessageContext)); + ByteBuffer buff = ByteBuffer.allocate(1024 * 10); + //encounter end of file when append half of the data + AppendMessageResult allresult = + callback.doAppend(0, buff, 1024 * 10, messageExtBatch, putMessageContext); + + assertEquals(AppendMessageStatus.PUT_OK, allresult.getStatus()); + assertEquals(0, allresult.getWroteOffset()); + assertEquals(0, allresult.getLogicsOffset()); + assertEquals(buff.position(), allresult.getWroteBytes()); + + assertEquals(messages.size(), allresult.getMsgNum()); + + Set msgIds = new HashSet<>(); + for (String msgId : allresult.getMsgId().split(",")) { + assertEquals(32, msgId.length()); + msgIds.add(msgId); + } + assertEquals(messages.size(), msgIds.size()); + + List decodeMsgs = MessageDecoder.decodes((ByteBuffer) buff.flip()); + assertEquals(decodeMsgs.size(), decodeMsgs.size()); + long queueOffset = decodeMsgs.get(0).getQueueOffset(); + long storeTimeStamp = decodeMsgs.get(0).getStoreTimestamp(); + for (int i = 0; i < messages.size(); i++) { + assertEquals(messages.get(i).getTopic(), decodeMsgs.get(i).getTopic()); + assertEquals(new String(messages.get(i).getBody()), new String(decodeMsgs.get(i).getBody())); + assertEquals(messages.get(i).getTags(), decodeMsgs.get(i).getTags()); + + assertEquals(messageExtBatch.getBornHostNameString(), decodeMsgs.get(i).getBornHostNameString()); + + assertEquals(messageExtBatch.getBornTimestamp(), decodeMsgs.get(i).getBornTimestamp()); + assertEquals(storeTimeStamp, decodeMsgs.get(i).getStoreTimestamp()); + assertEquals(queueOffset++, decodeMsgs.get(i).getQueueOffset()); + } + + // Expected to pass when message is not modified + buff.flip(); + for (int i = 0; i < messages.size() - 1; i++) { + DispatchRequest request = commitLog.checkMessageAndReturnSize(buff, true, false); + assertTrue(request.isSuccess()); + } + // Modify the properties of the last message and expect the verification to fail. + int idx = buff.limit() - (propertiesLen / 2); + buff.put(idx, (byte) (buff.get(idx) + 1)); + DispatchRequest request = commitLog.checkMessageAndReturnSize(buff, true, false); + assertFalse(request.isSuccess()); + } +} diff --git a/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java b/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java index 43ca38eb484..768029ca1af 100644 --- a/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java @@ -108,7 +108,7 @@ public void testPutMessages() throws Exception { short propertiesLength = (short) propertiesBytes.length; final byte[] topicData = msg.getTopic().getBytes(MessageDecoder.CHARSET_UTF8); final int topicLength = topicData.length; - msgLengthArr[j] = calMsgLength(msg.getBody().length, topicLength, propertiesLength + batchPropLen + 1) + msgLengthArr[j - 1]; + msgLengthArr[j] = calMsgLength(msg.getBody().length, topicLength, propertiesLength + batchPropLen) + msgLengthArr[j - 1]; j++; } byte[] batchMessageBody = MessageDecoder.encodeMessages(messages); diff --git a/store/src/test/java/org/apache/rocketmq/store/MessageExtBrokerInnerTest.java b/store/src/test/java/org/apache/rocketmq/store/MessageExtBrokerInnerTest.java new file mode 100644 index 00000000000..415dc381175 --- /dev/null +++ b/store/src/test/java/org/apache/rocketmq/store/MessageExtBrokerInnerTest.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.store; + +import org.apache.rocketmq.common.message.MessageExtBrokerInner; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class MessageExtBrokerInnerTest { + @Test + public void testDeleteProperty() { + MessageExtBrokerInner messageExtBrokerInner = new MessageExtBrokerInner(); + String propertiesString = ""; + messageExtBrokerInner.setPropertiesString(propertiesString); + messageExtBrokerInner.deleteProperty("KeyA"); + assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo(""); + + propertiesString = "KeyA\u0001ValueA"; + messageExtBrokerInner.setPropertiesString(propertiesString); + messageExtBrokerInner.deleteProperty("KeyA"); + assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo(""); + + propertiesString = "KeyA\u0001ValueA\u0002"; + messageExtBrokerInner.setPropertiesString(propertiesString); + messageExtBrokerInner.deleteProperty("KeyA"); + assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo(""); + + propertiesString = "KeyA\u0001ValueA\u0002KeyA\u0001ValueA"; + messageExtBrokerInner.setPropertiesString(propertiesString); + messageExtBrokerInner.deleteProperty("KeyA"); + assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo(""); + + propertiesString = "KeyA\u0001ValueA\u0002KeyA\u0001ValueA\u0002"; + messageExtBrokerInner.setPropertiesString(propertiesString); + messageExtBrokerInner.deleteProperty("KeyA"); + assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo(""); + + propertiesString = "KeyB\u0001ValueB\u0002KeyA\u0001ValueA"; + messageExtBrokerInner.setPropertiesString(propertiesString); + messageExtBrokerInner.deleteProperty("KeyA"); + assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB\u0002"); + + propertiesString = "KeyB\u0001ValueB\u0002KeyA\u0001ValueA\u0002"; + messageExtBrokerInner.setPropertiesString(propertiesString); + messageExtBrokerInner.deleteProperty("KeyA"); + assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB\u0002"); + + propertiesString = "KeyB\u0001ValueB\u0002KeyA\u0001ValueA\u0002KeyB\u0001ValueB\u0002"; + messageExtBrokerInner.setPropertiesString(propertiesString); + messageExtBrokerInner.deleteProperty("KeyA"); + assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB\u0002KeyB\u0001ValueB\u0002"); + + propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueB\u0002"; + messageExtBrokerInner.setPropertiesString(propertiesString); + messageExtBrokerInner.deleteProperty("KeyA"); + assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB\u0002"); + + propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueB"; + messageExtBrokerInner.setPropertiesString(propertiesString); + messageExtBrokerInner.deleteProperty("KeyA"); + assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB"); + + propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueBKeyA\u0001ValueA\u0002"; + messageExtBrokerInner.setPropertiesString(propertiesString); + messageExtBrokerInner.deleteProperty("KeyA"); + assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueBKeyA\u0001ValueA\u0002"); + + propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueBKeyA\u0001"; + messageExtBrokerInner.setPropertiesString(propertiesString); + messageExtBrokerInner.deleteProperty("KeyA"); + assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueBKeyA\u0001"); + + propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueBKeyA"; + messageExtBrokerInner.setPropertiesString(propertiesString); + messageExtBrokerInner.deleteProperty("KeyA"); + assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueBKeyA"); + + propertiesString = "__CRC32#\u0001"; + messageExtBrokerInner.setPropertiesString(propertiesString); + messageExtBrokerInner.deleteProperty("__CRC32#"); + assertThat(messageExtBrokerInner.getPropertiesString()).isEmpty(); + + propertiesString = "__CRC32#"; + messageExtBrokerInner.setPropertiesString(propertiesString); + messageExtBrokerInner.deleteProperty("__CRC32#"); + assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo(propertiesString); + } + +} diff --git a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java index db5c5af4cd3..7d659d2f6ae 100644 --- a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java +++ b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java @@ -465,7 +465,7 @@ public void testCheckSynchronizingSyncStateSetFlag() throws Exception { // Step2: check flag SynchronizingSyncStateSet Assert.assertTrue(masterHAService.isSynchronizingSyncStateSet()); - Assert.assertEquals(this.messageStore1.getConfirmOffset(), 1570); + Assert.assertEquals(this.messageStore1.getConfirmOffset(), 1580); Set syncStateSet = masterHAService.getSyncStateSet(); Assert.assertEquals(syncStateSet.size(), 2); Assert.assertTrue(syncStateSet.contains(1L)); diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java index 2e028ada32a..58842430483 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java @@ -74,7 +74,7 @@ public void testAppendCommitLog() { ByteBuffer message = MessageBufferUtilTest.buildMockedMessageBuffer(); AppendResult result = flatFile.appendCommitLog(message); Assert.assertEquals(AppendResult.SUCCESS, result); - Assert.assertEquals(122L, flatFile.commitLog.getFlatFile().getFileToWrite().getAppendPosition()); + Assert.assertEquals(123L, flatFile.commitLog.getFlatFile().getFileToWrite().getAppendPosition()); Assert.assertEquals(0L, flatFile.commitLog.getFlatFile().getFileToWrite().getCommitPosition()); flatFile = new CompositeQueueFlatFile(tieredFileAllocator, mq); diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java index 1f38d4f6c57..a413f2113e1 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java @@ -47,7 +47,7 @@ public class MessageBufferUtilTest { + 8 //Prepared Transaction Offset + 4 + 0 //BODY + 2 + 0 //TOPIC - + 2 + 30 //properties + + 2 + 31 //properties + 0; public static ByteBuffer buildMockedMessageBuffer() { diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java index f0a08dfb1a3..40bd5d56d30 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java @@ -25,7 +25,6 @@ import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.common.AclConfig; import org.apache.rocketmq.common.Pair; import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.TopicConfig; @@ -231,12 +230,6 @@ public ClusterAclVersionInfo examineBrokerClusterAclVersionInfo( return defaultMQAdminExtImpl.examineBrokerClusterAclVersionInfo(addr); } - @Override - public AclConfig examineBrokerClusterAclConfig( - String addr) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - return defaultMQAdminExtImpl.examineBrokerClusterAclConfig(addr); - } - @Override public void createAndUpdateSubscriptionGroupConfig(String addr, SubscriptionGroupConfig config) throws RemotingException, diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java index 1ebff6d8afc..331b24d6068 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java @@ -45,7 +45,6 @@ import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.MQClientManager; import org.apache.rocketmq.client.impl.factory.MQClientInstance; -import org.apache.rocketmq.common.AclConfig; import org.apache.rocketmq.common.KeyBuilder; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.Pair; @@ -305,12 +304,6 @@ public ClusterAclVersionInfo examineBrokerClusterAclVersionInfo( return this.mqClientInstance.getMQClientAPIImpl().getBrokerClusterAclInfo(addr, timeoutMillis); } - @Override - public AclConfig examineBrokerClusterAclConfig( - String addr) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - return this.mqClientInstance.getMQClientAPIImpl().getBrokerClusterConfig(addr, timeoutMillis); - } - @Override public void createAndUpdateSubscriptionGroupConfig(String addr, SubscriptionGroupConfig config) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java index 7dcfc4fa5e0..3148fc0987e 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java @@ -24,7 +24,6 @@ import org.apache.rocketmq.client.MQAdmin; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.common.AclConfig; import org.apache.rocketmq.common.Pair; import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.TopicConfig; @@ -110,9 +109,6 @@ ClusterAclVersionInfo examineBrokerClusterAclVersionInfo( final String addr) throws RemotingException, MQBrokerException, InterruptedException, MQClientException; - AclConfig examineBrokerClusterAclConfig(final String addr) throws RemotingException, MQBrokerException, - InterruptedException, MQClientException; - void createAndUpdateSubscriptionGroupConfig(final String addr, final SubscriptionGroupConfig config) throws RemotingException, MQBrokerException, InterruptedException, MQClientException; diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java index 788fa83c2cd..35f00748228 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java @@ -29,7 +29,6 @@ import org.apache.rocketmq.srvutil.ServerUtil; import org.apache.rocketmq.tools.command.acl.ClusterAclConfigVersionListSubCommand; import org.apache.rocketmq.tools.command.acl.DeleteAccessConfigSubCommand; -import org.apache.rocketmq.tools.command.acl.GetAccessConfigSubCommand; import org.apache.rocketmq.tools.command.acl.UpdateAccessConfigSubCommand; import org.apache.rocketmq.tools.command.acl.UpdateGlobalWhiteAddrSubCommand; import org.apache.rocketmq.tools.command.broker.BrokerConsumeStatsSubCommad; @@ -248,7 +247,6 @@ public static void initCommand() { initCommand(new DeleteAccessConfigSubCommand()); initCommand(new ClusterAclConfigVersionListSubCommand()); initCommand(new UpdateGlobalWhiteAddrSubCommand()); - initCommand(new GetAccessConfigSubCommand()); initCommand(new UpdateStaticTopicSubCommand()); initCommand(new RemappingStaticTopicSubCommand()); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/acl/GetAccessConfigSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/acl/GetAccessConfigSubCommand.java deleted file mode 100644 index f1c9a14969f..00000000000 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/acl/GetAccessConfigSubCommand.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.rocketmq.tools.command.acl; - -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.OptionGroup; -import org.apache.commons.cli.Options; -import org.apache.rocketmq.client.exception.MQBrokerException; -import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.common.AclConfig; -import org.apache.rocketmq.common.PlainAccessConfig; -import org.apache.rocketmq.remoting.RPCHook; -import org.apache.rocketmq.remoting.exception.RemotingException; -import org.apache.rocketmq.srvutil.ServerUtil; -import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; -import org.apache.rocketmq.tools.command.CommandUtil; -import org.apache.rocketmq.tools.command.SubCommand; -import org.apache.rocketmq.tools.command.SubCommandException; - -import java.lang.reflect.Field; -import java.util.List; -import java.util.Set; - -public class GetAccessConfigSubCommand implements SubCommand { - @Override - public String commandName() { - return "getAclConfig"; - } - - @Override - public String commandAlias() { - return "getAccessConfigSubCommand"; - } - - @Override - public String commandDesc() { - return "List all of acl config information in cluster."; - } - - @Override - public Options buildCommandlineOptions(Options options) { - OptionGroup optionGroup = new OptionGroup(); - - Option opt = new Option("b", "brokerAddr", true, "query acl config version for which broker"); - optionGroup.addOption(opt); - - opt = new Option("c", "clusterName", true, "query acl config version for specified cluster"); - optionGroup.addOption(opt); - - optionGroup.setRequired(true); - options.addOptionGroup(optionGroup); - - return options; - } - - @Override - public void execute(CommandLine commandLine, Options options, - RPCHook rpcHook) throws SubCommandException { - - DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); - defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); - - try { - - if (commandLine.hasOption('b')) { - String addr = commandLine.getOptionValue('b').trim(); - defaultMQAdminExt.start(); - printClusterBaseInfo(defaultMQAdminExt, addr); - return; - - } else if (commandLine.hasOption('c')) { - String clusterName = commandLine.getOptionValue('c').trim(); - - defaultMQAdminExt.start(); - - Set masterSet = - CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName); - for (String addr : masterSet) { - printClusterBaseInfo(defaultMQAdminExt, addr); - } - return; - } - - ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options); - } catch (Exception e) { - throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); - } finally { - defaultMQAdminExt.shutdown(); - } - } - - private void printClusterBaseInfo( - final DefaultMQAdminExt defaultMQAdminExt, final String addr) throws - InterruptedException, MQBrokerException, RemotingException, MQClientException, IllegalAccessException { - AclConfig aclConfig = defaultMQAdminExt.examineBrokerClusterAclConfig(addr); - List configs = aclConfig.getPlainAccessConfigs(); - List globalWhiteAddrs = aclConfig.getGlobalWhiteAddrs(); - System.out.printf("\n"); - System.out.printf("%-20s: %s\n", "globalWhiteRemoteAddresses", globalWhiteAddrs.toString()); - System.out.printf("\n"); - System.out.printf("accounts:\n"); - if (configs != null && configs.size() > 0) { - for (PlainAccessConfig config : configs) { - Field[] fields = config.getClass().getDeclaredFields(); - for (Field field : fields) { - field.setAccessible(true); - if (field.get(config) != null) { - System.out.printf("%-1s %-18s: %s\n", "", field.getName(), field.get(config).toString()); - } else { - System.out.printf("%-1s %-18s: %s\n", "", field.getName(), ""); - } - } - System.out.printf("\n"); - } - } - } -} diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/acl/GetAccessConfigSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/acl/GetAccessConfigSubCommandTest.java deleted file mode 100644 index ae4eca4356c..00000000000 --- a/tools/src/test/java/org/apache/rocketmq/tools/command/acl/GetAccessConfigSubCommandTest.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.rocketmq.tools.command.acl; - -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.DefaultParser; -import org.apache.commons.cli.Options; -import org.apache.rocketmq.srvutil.ServerUtil; -import org.junit.Test; - -import static org.assertj.core.api.Assertions.assertThat; - -public class GetAccessConfigSubCommandTest { - - @Test - public void testExecute() { - GetAccessConfigSubCommand cmd = new GetAccessConfigSubCommand(); - Options options = ServerUtil.buildCommandlineOptions(new Options()); - String[] subargs = new String[] {"-c default-cluster"}; - final CommandLine commandLine = - ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, - cmd.buildCommandlineOptions(options), new DefaultParser()); - assertThat(commandLine.getOptionValue('c').trim()).isEqualTo("default-cluster"); - } -}