diff --git a/common/src/main/java/org/apache/rocketmq/common/message/Message.java b/common/src/main/java/org/apache/rocketmq/common/message/Message.java index c7997c47318..acd4df96d28 100644 --- a/common/src/main/java/org/apache/rocketmq/common/message/Message.java +++ b/common/src/main/java/org/apache/rocketmq/common/message/Message.java @@ -108,6 +108,13 @@ public String getProperty(final String name) { return this.properties.get(name); } + public boolean hasProperty(final String name) { + if (null == this.properties) { + return false; + } + return this.properties.containsKey(name); + } + public String getTopic() { return topic; } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java index 43e16ddd2d7..17a2f27fa74 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java @@ -74,7 +74,7 @@ public CompletableFuture> sendMessage(ProxyContext ctx, QueueSe try { Message message = messageList.get(0); String topic = message.getTopic(); - if (ConfigurationManager.getProxyConfig().isEnableTopicMessageTypeCheck()) { + if (isNeedCheckTopicMessageType(message)) { if (topicMessageTypeValidator != null) { // Do not check retry or dlq topic if (!NamespaceUtil.isRetryTopic(topic) && !NamespaceUtil.isDLQTopic(topic)) { @@ -261,4 +261,8 @@ public CompletableFuture forwardMessageToDeadLetterQueue(ProxyC return FutureUtils.addExecutor(future, this.executor); } + private boolean isNeedCheckTopicMessageType(Message message) { + return ConfigurationManager.getProxyConfig().isEnableTopicMessageTypeCheck() + && !message.hasProperty(MessageConst.PROPERTY_TRANSFER_FLAG); + } } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivity.java index 17af0fdcb37..22d9efd9347 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivity.java @@ -21,17 +21,18 @@ import java.time.Duration; import java.util.Map; import org.apache.rocketmq.common.attribute.TopicMessageType; +import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageDecoder; -import org.apache.rocketmq.remoting.protocol.NamespaceUtil; -import org.apache.rocketmq.remoting.protocol.RequestCode; -import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader; import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.proxy.config.ConfigurationManager; import org.apache.rocketmq.proxy.processor.MessagingProcessor; import org.apache.rocketmq.proxy.processor.validator.DefaultTopicMessageTypeValidator; import org.apache.rocketmq.proxy.processor.validator.TopicMessageTypeValidator; import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline; +import org.apache.rocketmq.remoting.protocol.NamespaceUtil; import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.protocol.RequestCode; +import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader; public class SendMessageActivity extends AbstractRemotingActivity { TopicMessageTypeValidator topicMessageTypeValidator; @@ -66,7 +67,7 @@ protected RemotingCommand sendMessage(ChannelHandlerContext ctx, RemotingCommand String topic = requestHeader.getTopic(); Map property = MessageDecoder.string2messageProperties(requestHeader.getProperties()); TopicMessageType messageType = TopicMessageType.parseFromMessageProperty(property); - if (ConfigurationManager.getProxyConfig().isEnableTopicMessageTypeCheck()) { + if (isNeedCheckTopicMessageType(property)) { if (topicMessageTypeValidator != null) { // Do not check retry or dlq topic if (!NamespaceUtil.isRetryTopic(topic) && !NamespaceUtil.isDLQTopic(topic)) { @@ -87,4 +88,9 @@ protected RemotingCommand consumerSendMessage(ChannelHandlerContext ctx, Remotin ProxyContext context) throws Exception { return request(ctx, request, context, Duration.ofSeconds(3).toMillis()); } + + private boolean isNeedCheckTopicMessageType(Map property) { + return ConfigurationManager.getProxyConfig().isEnableTopicMessageTypeCheck() + && !property.containsKey(MessageConst.PROPERTY_TRANSFER_FLAG); + } }