Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #7231] Fix: proxy client language error #7200

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public CompletableFuture<List<SendResult>> sendMessage(ProxyContext ctx, Address
body = message.getBody();
messageId = MessageClientIDSetter.getUniqID(message);
}
RemotingCommand request = LocalRemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
RemotingCommand request = LocalRemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader, ctx.getLanguage());
request.setBody(body);
CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
SimpleChannel channel = channelManager.createInvocationChannel(ctx);
Expand Down Expand Up @@ -162,7 +162,7 @@ public CompletableFuture<RemotingCommand> sendMessageBack(ProxyContext ctx, Rece
ConsumerSendMsgBackRequestHeader requestHeader, long timeoutMillis) {
SimpleChannel channel = channelManager.createChannel(ctx);
ChannelHandlerContext channelHandlerContext = channel.getChannelHandlerContext();
RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);
RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader, ctx.getLanguage());
CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
try {
RemotingCommand response = brokerController.getSendMessageProcessor()
Expand All @@ -181,7 +181,7 @@ public CompletableFuture<Void> endTransactionOneway(ProxyContext ctx, String bro
CompletableFuture<Void> future = new CompletableFuture<>();
SimpleChannel channel = channelManager.createChannel(ctx);
ChannelHandlerContext channelHandlerContext = channel.getChannelHandlerContext();
RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, requestHeader);
RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, requestHeader, ctx.getLanguage());
try {
brokerController.getEndTransactionProcessor()
.processRequest(channelHandlerContext, command);
Expand All @@ -196,7 +196,7 @@ public CompletableFuture<Void> endTransactionOneway(ProxyContext ctx, String bro
public CompletableFuture<PopResult> popMessage(ProxyContext ctx, AddressableMessageQueue messageQueue,
PopMessageRequestHeader requestHeader, long timeoutMillis) {
requestHeader.setBornTime(System.currentTimeMillis());
RemotingCommand request = LocalRemotingCommand.createRequestCommand(RequestCode.POP_MESSAGE, requestHeader);
RemotingCommand request = LocalRemotingCommand.createRequestCommand(RequestCode.POP_MESSAGE, requestHeader, ctx.getLanguage());
CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
SimpleChannel channel = channelManager.createInvocationChannel(ctx);
InvocationContext invocationContext = new InvocationContext(future);
Expand Down Expand Up @@ -307,7 +307,7 @@ public CompletableFuture<AckResult> changeInvisibleTime(ProxyContext ctx, Receip
ChangeInvisibleTimeRequestHeader requestHeader, long timeoutMillis) {
SimpleChannel channel = channelManager.createChannel(ctx);
ChannelHandlerContext channelHandlerContext = channel.getChannelHandlerContext();
RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.CHANGE_MESSAGE_INVISIBLETIME, requestHeader);
RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.CHANGE_MESSAGE_INVISIBLETIME, requestHeader, ctx.getLanguage());
CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
try {
RemotingCommand response = brokerController.getChangeInvisibleTimeProcessor()
Expand Down Expand Up @@ -346,7 +346,7 @@ public CompletableFuture<AckResult> ackMessage(ProxyContext ctx, ReceiptHandle h
AckMessageRequestHeader requestHeader, long timeoutMillis) {
SimpleChannel channel = channelManager.createChannel(ctx);
ChannelHandlerContext channelHandlerContext = channel.getChannelHandlerContext();
RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE, requestHeader);
RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE, requestHeader, ctx.getLanguage());
CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
try {
RemotingCommand response = brokerController.getAckMessageProcessor()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,19 @@
*/
package org.apache.rocketmq.proxy.service.message;

import java.util.HashMap;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;

import java.util.HashMap;

public class LocalRemotingCommand extends RemotingCommand {

public static LocalRemotingCommand createRequestCommand(int code, CommandCustomHeader customHeader) {
public static LocalRemotingCommand createRequestCommand(int code, CommandCustomHeader customHeader, String language) {
LocalRemotingCommand cmd = new LocalRemotingCommand();
cmd.setCode(code);
cmd.setLanguage(LanguageCode.getCode(language));
cmd.writeCustomHeader(customHeader);
cmd.setExtFields(new HashMap<>());
setCmdVersion(cmd);
Expand All @@ -37,4 +40,5 @@ public CommandCustomHeader decodeCommandCustomHeader(
Class<? extends CommandCustomHeader> classHeader) throws RemotingCommandException {
return classHeader.cast(readCustomHeader());
}

weihubeats marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@

package org.apache.rocketmq.remoting.protocol;

import java.util.Arrays;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

public enum LanguageCode {
JAVA((byte) 0),
CPP((byte) 1),
Expand Down Expand Up @@ -50,4 +55,10 @@ public static LanguageCode valueOf(byte code) {
public byte getCode() {
return code;
}

private static final Map<String, LanguageCode> MAP = Arrays.stream(LanguageCode.values()).collect(Collectors.toMap(LanguageCode::name, Function.identity()));

public static LanguageCode getCode(String language) {
return MAP.get(language);
}
}
Loading