Skip to content

Commit

Permalink
sendMessage Optimize
Browse files Browse the repository at this point in the history
  • Loading branch information
CLFutureX committed Oct 28, 2023
1 parent 3dac0e2 commit 40805be
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,8 @@ public boolean rejectRequest() {
return false;
}

private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend, boolean brokerAllowFlowCtrSuspend)
private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend,
boolean brokerAllowFlowCtrSuspend)
throws RemotingCommandException {
RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();
Expand Down Expand Up @@ -438,7 +439,7 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re
}

subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());
if (null == subscriptionData){
if (null == subscriptionData) {
LOGGER.warn("the consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), requestHeader.getTopic());
response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
response.setRemark("the consumer's subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
Expand Down Expand Up @@ -488,7 +489,7 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re

final MessageStore messageStore = brokerController.getMessageStore();
if (this.brokerController.getMessageStore() instanceof DefaultMessageStore) {
DefaultMessageStore defaultMessageStore = (DefaultMessageStore)this.brokerController.getMessageStore();
DefaultMessageStore defaultMessageStore = (DefaultMessageStore) this.brokerController.getMessageStore();
boolean cgNeedColdDataFlowCtr = brokerController.getColdDataCgCtrService().isCgNeedColdDataFlowCtr(requestHeader.getConsumerGroup());
if (cgNeedColdDataFlowCtr) {
boolean isMsgLogicCold = defaultMessageStore.getCommitLog()
Expand Down Expand Up @@ -537,7 +538,7 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re
SubscriptionData finalSubscriptionData = subscriptionData;
RemotingCommand finalResponse = response;
messageStore.getMessageAsync(group, topic, queueId, requestHeader.getQueueOffset(),
requestHeader.getMaxMsgNums(), messageFilter)
requestHeader.getMaxMsgNums(), messageFilter)
.thenApply(result -> {
if (null == result) {
finalResponse.setCode(ResponseCode.SYSTEM_ERROR);
Expand Down Expand Up @@ -586,12 +587,13 @@ public boolean hasConsumeMessageHook() {

/**
* Composes the header of the response message to be sent back to the client
* @param requestHeader - the header of the request message
* @param getMessageResult - the result of the GetMessage request
* @param topicSysFlag - the system flag of the topic
*
* @param requestHeader - the header of the request message
* @param getMessageResult - the result of the GetMessage request
* @param topicSysFlag - the system flag of the topic
* @param subscriptionGroupConfig - configuration of the subscription group
* @param response - the response message to be sent back to the client
* @param clientAddress - the address of the client
* @param response - the response message to be sent back to the client
* @param clientAddress - the address of the client
*/
protected void composeResponseHeader(PullMessageRequestHeader requestHeader, GetMessageResult getMessageResult,
int topicSysFlag, SubscriptionGroupConfig subscriptionGroupConfig, RemotingCommand response,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@ public boolean putMessage(final List<MessageExt> msgs) {
msgSize.addAndGet(msg.getBody().length);
}
}
// 为什么不原子自增呢?
msgCount.addAndGet(validMsgCnt);

if (!msgTreeMap.isEmpty() && !this.consuming) {
Expand All @@ -156,7 +155,6 @@ public boolean putMessage(final List<MessageExt> msgs) {
if (property != null) {
long accTotal = Long.parseLong(property) - messageExt.getQueueOffset();
if (accTotal > 0) {
// 消息accCnt? 当前拉取消息的总数
this.msgAccCnt = accTotal;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ public ScheduledExecutorService getScheduledExecutorService() {
}

private void pullMessage(final PullRequest pullRequest) {
// 以消费者组为单位,一个消费者组对应一个消费者实例,因为一个消费者只能属于一个消费者组
final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
if (consumer != null) {
DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,6 @@ private boolean updateProcessQueueTableInRebalance(final String topic, final Set
}

this.removeDirtyOffset(mq);
// 传了topic,却都没有使用
ProcessQueue pq = createProcessQueue(topic);
pq.setLocked(true);
long nextOffset = this.computePullFromWhere(mq);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public byte[] encode() {
return MessageDecoder.encodeMessages(messages);
}

public void fillBody(){
public void fillBody() {
super.setBody(MessageDecoder.encodeMessages(messages));
}

Expand Down

0 comments on commit 40805be

Please sign in to comment.