Skip to content

Commit

Permalink
sendMessage Optimize
Browse files Browse the repository at this point in the history
  • Loading branch information
CLFutureX committed Oct 28, 2023
1 parent 40805be commit 2f1c310
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,13 @@ public CompletableFuture<List<SendResult>> sendMessage(ProxyContext ctx, Address
List<Message> msgList, SendMessageRequestHeader requestHeader, long timeoutMillis) {
CompletableFuture<List<SendResult>> future;
Message message;
if(requestHeader.isBatch()){
if (msgList.size() == 1) {
message = msgList.get(0);
} else {
requestHeader.setBatch(true);
message = MessageBatch.generateFromList(msgList);
MessageClientIDSetter.setUniqID(message);
((MessageBatch) message).fillBody();
} else {
message = msgList.get(0);
}
future = this.mqClientAPIFactory.getClient().sendMessageAsync(
messageQueue.getBrokerAddr(),
Expand Down Expand Up @@ -143,7 +144,8 @@ public CompletableFuture<AckResult> ackMessage(ProxyContext ctx, ReceiptHandle h
}

@Override
public CompletableFuture<AckResult> batchAckMessage(ProxyContext ctx, List<ReceiptHandleMessage> handleList, String consumerGroup,
public CompletableFuture<AckResult> batchAckMessage(ProxyContext ctx, List<ReceiptHandleMessage> handleList,
String consumerGroup,
String topic, long timeoutMillis) {
List<String> extraInfoList = handleList.stream().map(message -> message.getReceiptHandle().getReceiptHandle()).collect(Collectors.toList());
return this.mqClientAPIFactory.getClient().batchAckMessageAsync(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,13 @@ public LocalMessageService(BrokerController brokerController, ChannelManager cha
public CompletableFuture<List<SendResult>> sendMessage(ProxyContext ctx, AddressableMessageQueue messageQueue,
List<Message> msgList, SendMessageRequestHeader requestHeader, long timeoutMillis) {
Message message;
if (requestHeader.isBatch()) {
if (msgList.size() == 1) {
message = msgList.get(0);
} else {
requestHeader.setBatch(true);
message = MessageBatch.generateFromList(msgList);
MessageClientIDSetter.setUniqID(message);
((MessageBatch) message).fillBody();
} else {
message = msgList.get(0);
}
RemotingCommand request = LocalRemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader, ctx.getLanguage());
request.setBody(message.getBody());
Expand Down Expand Up @@ -171,7 +172,8 @@ public CompletableFuture<RemotingCommand> sendMessageBack(ProxyContext ctx, Rece
}

@Override
public CompletableFuture<Void> endTransactionOneway(ProxyContext ctx, String brokerName, EndTransactionRequestHeader requestHeader,
public CompletableFuture<Void> endTransactionOneway(ProxyContext ctx, String brokerName,
EndTransactionRequestHeader requestHeader,
long timeoutMillis) {
CompletableFuture<Void> future = new CompletableFuture<>();
SimpleChannel channel = channelManager.createChannel(ctx);
Expand Down

0 comments on commit 2f1c310

Please sign in to comment.