Skip to content

Commit

Permalink
[ISSUE #7231] Fix: proxy client language error (#7200)
Browse files Browse the repository at this point in the history
* Adding null does not update

* add langeuga code

* add langeuga code

* add langeuga code

* add langeuga code

* add langeuga code

* Rerun ci

* Rerun ci

* Rerun ci

* remove redundant package imports

* redundant line

* modify the parameter passed as proxyContext to language

* format
  • Loading branch information
weihubeats authored Oct 20, 2023
1 parent dbc633d commit 3968c18
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public CompletableFuture<List<SendResult>> sendMessage(ProxyContext ctx, Address
body = message.getBody();
messageId = MessageClientIDSetter.getUniqID(message);
}
RemotingCommand request = LocalRemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
RemotingCommand request = LocalRemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader, ctx.getLanguage());
request.setBody(body);
CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
SimpleChannel channel = channelManager.createInvocationChannel(ctx);
Expand Down Expand Up @@ -162,7 +162,7 @@ public CompletableFuture<RemotingCommand> sendMessageBack(ProxyContext ctx, Rece
ConsumerSendMsgBackRequestHeader requestHeader, long timeoutMillis) {
SimpleChannel channel = channelManager.createChannel(ctx);
ChannelHandlerContext channelHandlerContext = channel.getChannelHandlerContext();
RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);
RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader, ctx.getLanguage());
CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
try {
RemotingCommand response = brokerController.getSendMessageProcessor()
Expand All @@ -181,7 +181,7 @@ public CompletableFuture<Void> endTransactionOneway(ProxyContext ctx, String bro
CompletableFuture<Void> future = new CompletableFuture<>();
SimpleChannel channel = channelManager.createChannel(ctx);
ChannelHandlerContext channelHandlerContext = channel.getChannelHandlerContext();
RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, requestHeader);
RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, requestHeader, ctx.getLanguage());
try {
brokerController.getEndTransactionProcessor()
.processRequest(channelHandlerContext, command);
Expand All @@ -196,7 +196,7 @@ public CompletableFuture<Void> endTransactionOneway(ProxyContext ctx, String bro
public CompletableFuture<PopResult> popMessage(ProxyContext ctx, AddressableMessageQueue messageQueue,
PopMessageRequestHeader requestHeader, long timeoutMillis) {
requestHeader.setBornTime(System.currentTimeMillis());
RemotingCommand request = LocalRemotingCommand.createRequestCommand(RequestCode.POP_MESSAGE, requestHeader);
RemotingCommand request = LocalRemotingCommand.createRequestCommand(RequestCode.POP_MESSAGE, requestHeader, ctx.getLanguage());
CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
SimpleChannel channel = channelManager.createInvocationChannel(ctx);
InvocationContext invocationContext = new InvocationContext(future);
Expand Down Expand Up @@ -307,7 +307,7 @@ public CompletableFuture<AckResult> changeInvisibleTime(ProxyContext ctx, Receip
ChangeInvisibleTimeRequestHeader requestHeader, long timeoutMillis) {
SimpleChannel channel = channelManager.createChannel(ctx);
ChannelHandlerContext channelHandlerContext = channel.getChannelHandlerContext();
RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.CHANGE_MESSAGE_INVISIBLETIME, requestHeader);
RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.CHANGE_MESSAGE_INVISIBLETIME, requestHeader, ctx.getLanguage());
CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
try {
RemotingCommand response = brokerController.getChangeInvisibleTimeProcessor()
Expand Down Expand Up @@ -346,7 +346,7 @@ public CompletableFuture<AckResult> ackMessage(ProxyContext ctx, ReceiptHandle h
AckMessageRequestHeader requestHeader, long timeoutMillis) {
SimpleChannel channel = channelManager.createChannel(ctx);
ChannelHandlerContext channelHandlerContext = channel.getChannelHandlerContext();
RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE, requestHeader);
RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE, requestHeader, ctx.getLanguage());
CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
try {
RemotingCommand response = brokerController.getAckMessageProcessor()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,19 @@
*/
package org.apache.rocketmq.proxy.service.message;

import java.util.HashMap;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;

import java.util.HashMap;

public class LocalRemotingCommand extends RemotingCommand {

public static LocalRemotingCommand createRequestCommand(int code, CommandCustomHeader customHeader) {
public static LocalRemotingCommand createRequestCommand(int code, CommandCustomHeader customHeader, String language) {
LocalRemotingCommand cmd = new LocalRemotingCommand();
cmd.setCode(code);
cmd.setLanguage(LanguageCode.getCode(language));
cmd.writeCustomHeader(customHeader);
cmd.setExtFields(new HashMap<>());
setCmdVersion(cmd);
Expand All @@ -37,4 +40,5 @@ public CommandCustomHeader decodeCommandCustomHeader(
Class<? extends CommandCustomHeader> classHeader) throws RemotingCommandException {
return classHeader.cast(readCustomHeader());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@

package org.apache.rocketmq.remoting.protocol;

import java.util.Arrays;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

public enum LanguageCode {
JAVA((byte) 0),
CPP((byte) 1),
Expand Down Expand Up @@ -50,4 +55,10 @@ public static LanguageCode valueOf(byte code) {
public byte getCode() {
return code;
}

private static final Map<String, LanguageCode> MAP = Arrays.stream(LanguageCode.values()).collect(Collectors.toMap(LanguageCode::name, Function.identity()));

public static LanguageCode getCode(String language) {
return MAP.get(language);
}
}

0 comments on commit 3968c18

Please sign in to comment.