Skip to content

Commit

Permalink
[ISSUE #9075]Avoid message type validate in message sync scenario. (#…
Browse files Browse the repository at this point in the history
…9076)

* Avoid message type validate in message sync scenario.
  • Loading branch information
dingshuangxi888 authored Dec 25, 2024
1 parent 152a955 commit 1c35adb
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,13 @@ public String getProperty(final String name) {
return this.properties.get(name);
}

public boolean hasProperty(final String name) {
if (null == this.properties) {
return false;
}
return this.properties.containsKey(name);
}

public String getTopic() {
return topic;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public CompletableFuture<List<SendResult>> sendMessage(ProxyContext ctx, QueueSe
try {
Message message = messageList.get(0);
String topic = message.getTopic();
if (ConfigurationManager.getProxyConfig().isEnableTopicMessageTypeCheck()) {
if (isNeedCheckTopicMessageType(message)) {
if (topicMessageTypeValidator != null) {
// Do not check retry or dlq topic
if (!NamespaceUtil.isRetryTopic(topic) && !NamespaceUtil.isDLQTopic(topic)) {
Expand Down Expand Up @@ -261,4 +261,8 @@ public CompletableFuture<RemotingCommand> forwardMessageToDeadLetterQueue(ProxyC
return FutureUtils.addExecutor(future, this.executor);
}

private boolean isNeedCheckTopicMessageType(Message message) {
return ConfigurationManager.getProxyConfig().isEnableTopicMessageTypeCheck()
&& !message.hasProperty(MessageConst.PROPERTY_TRANSFER_FLAG);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,18 @@
import java.time.Duration;
import java.util.Map;
import org.apache.rocketmq.common.attribute.TopicMessageType;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.remoting.protocol.NamespaceUtil;
import org.apache.rocketmq.remoting.protocol.RequestCode;
import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.processor.MessagingProcessor;
import org.apache.rocketmq.proxy.processor.validator.DefaultTopicMessageTypeValidator;
import org.apache.rocketmq.proxy.processor.validator.TopicMessageTypeValidator;
import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline;
import org.apache.rocketmq.remoting.protocol.NamespaceUtil;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RequestCode;
import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader;

public class SendMessageActivity extends AbstractRemotingActivity {
TopicMessageTypeValidator topicMessageTypeValidator;
Expand Down Expand Up @@ -66,7 +67,7 @@ protected RemotingCommand sendMessage(ChannelHandlerContext ctx, RemotingCommand
String topic = requestHeader.getTopic();
Map<String, String> property = MessageDecoder.string2messageProperties(requestHeader.getProperties());
TopicMessageType messageType = TopicMessageType.parseFromMessageProperty(property);
if (ConfigurationManager.getProxyConfig().isEnableTopicMessageTypeCheck()) {
if (isNeedCheckTopicMessageType(property)) {
if (topicMessageTypeValidator != null) {
// Do not check retry or dlq topic
if (!NamespaceUtil.isRetryTopic(topic) && !NamespaceUtil.isDLQTopic(topic)) {
Expand All @@ -87,4 +88,9 @@ protected RemotingCommand consumerSendMessage(ChannelHandlerContext ctx, Remotin
ProxyContext context) throws Exception {
return request(ctx, request, context, Duration.ofSeconds(3).toMillis());
}

private boolean isNeedCheckTopicMessageType(Map<String, String> property) {
return ConfigurationManager.getProxyConfig().isEnableTopicMessageTypeCheck()
&& !property.containsKey(MessageConst.PROPERTY_TRANSFER_FLAG);
}
}

0 comments on commit 1c35adb

Please sign in to comment.