From 2f1c31092260aaedf2b97c291c095be4eb6b2209 Mon Sep 17 00:00:00 2001 From: PiteXChen <775523362@qq.com> Date: Sat, 28 Oct 2023 23:45:49 +0800 Subject: [PATCH] sendMessage Optimize --- .../proxy/service/message/ClusterMessageService.java | 10 ++++++---- .../proxy/service/message/LocalMessageService.java | 10 ++++++---- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java index 9e8f448c0a0..fb82421d1c0 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java @@ -66,12 +66,13 @@ public CompletableFuture> sendMessage(ProxyContext ctx, Address List msgList, SendMessageRequestHeader requestHeader, long timeoutMillis) { CompletableFuture> future; Message message; - if(requestHeader.isBatch()){ + if (msgList.size() == 1) { + message = msgList.get(0); + } else { + requestHeader.setBatch(true); message = MessageBatch.generateFromList(msgList); MessageClientIDSetter.setUniqID(message); ((MessageBatch) message).fillBody(); - } else { - message = msgList.get(0); } future = this.mqClientAPIFactory.getClient().sendMessageAsync( messageQueue.getBrokerAddr(), @@ -143,7 +144,8 @@ public CompletableFuture ackMessage(ProxyContext ctx, ReceiptHandle h } @Override - public CompletableFuture batchAckMessage(ProxyContext ctx, List handleList, String consumerGroup, + public CompletableFuture batchAckMessage(ProxyContext ctx, List handleList, + String consumerGroup, String topic, long timeoutMillis) { List extraInfoList = handleList.stream().map(message -> message.getReceiptHandle().getReceiptHandle()).collect(Collectors.toList()); return this.mqClientAPIFactory.getClient().batchAckMessageAsync( diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java index a1d79b410e7..bebfaf1f09b 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java @@ -91,12 +91,13 @@ public LocalMessageService(BrokerController brokerController, ChannelManager cha public CompletableFuture> sendMessage(ProxyContext ctx, AddressableMessageQueue messageQueue, List msgList, SendMessageRequestHeader requestHeader, long timeoutMillis) { Message message; - if (requestHeader.isBatch()) { + if (msgList.size() == 1) { + message = msgList.get(0); + } else { + requestHeader.setBatch(true); message = MessageBatch.generateFromList(msgList); MessageClientIDSetter.setUniqID(message); ((MessageBatch) message).fillBody(); - } else { - message = msgList.get(0); } RemotingCommand request = LocalRemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader, ctx.getLanguage()); request.setBody(message.getBody()); @@ -171,7 +172,8 @@ public CompletableFuture sendMessageBack(ProxyContext ctx, Rece } @Override - public CompletableFuture endTransactionOneway(ProxyContext ctx, String brokerName, EndTransactionRequestHeader requestHeader, + public CompletableFuture endTransactionOneway(ProxyContext ctx, String brokerName, + EndTransactionRequestHeader requestHeader, long timeoutMillis) { CompletableFuture future = new CompletableFuture<>(); SimpleChannel channel = channelManager.createChannel(ctx);