Skip to content

Commit

Permalink
Merge branch 'apache:develop' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
fujian-zfj authored Oct 31, 2023
2 parents 973a997 + 26fa050 commit 49c3dc0
Show file tree
Hide file tree
Showing 52 changed files with 909 additions and 438 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ public boolean updateGlobalWhiteAddrsConfig(List<String> globalWhiteAddrsList, S
return false;
}

if (!fileName.startsWith(fileHome)) {
if (!file.getAbsolutePath().startsWith(fileHome)) {
log.error("Parameter value " + fileName + " is not in the directory rocketmq.home.dir " + fileHome);
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,9 @@ public void onChannelIdle(String remoteAddr, Channel channel) {
this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel);
this.brokerController.getBrokerStatsManager().incChannelIdleNum();
}

@Override
public void onChannelActive(String remoteAddr, Channel channel) {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.apache.rocketmq.broker.plugin.BrokerAttachedPlugin;
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil;
import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.LockCallback;
Expand Down Expand Up @@ -130,8 +129,6 @@
import org.apache.rocketmq.remoting.protocol.header.GetAllProducerInfoRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.GetAllTopicConfigResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.GetBrokerAclConfigResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.GetBrokerClusterAclConfigResponseBody;
import org.apache.rocketmq.remoting.protocol.header.GetBrokerClusterAclConfigResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.GetBrokerConfigResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.GetConsumeStatsInBrokerHeader;
import org.apache.rocketmq.remoting.protocol.header.GetConsumeStatsRequestHeader;
Expand Down Expand Up @@ -311,8 +308,6 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx,
return updateGlobalWhiteAddrsConfig(ctx, request);
case RequestCode.RESUME_CHECK_HALF_MESSAGE:
return resumeCheckHalfMessage(ctx, request);
case RequestCode.GET_BROKER_CLUSTER_ACL_CONFIG:
return getBrokerClusterAclConfig(ctx, request);
case RequestCode.GET_TOPIC_CONFIG:
return getTopicConfig(ctx, request);
case RequestCode.UPDATE_AND_CREATE_STATIC_TOPIC:
Expand Down Expand Up @@ -445,6 +440,13 @@ private synchronized RemotingCommand updateAndCreateTopic(ChannelHandlerContext
return response;
}

if (topicConfig.equals(this.brokerController.getTopicConfigManager().getTopicConfigTable().get(topic))) {
LOGGER.info("Broker receive request to update or create topic={}, but topicConfig has no changes , so idempotent, caller address={}",
requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
response.setCode(ResponseCode.SUCCESS);
return response;
}

try {
this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) {
Expand Down Expand Up @@ -523,12 +525,13 @@ private synchronized RemotingCommand deleteTopic(ChannelHandlerContext ctx,
requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(ctx.channel()));

String topic = requestHeader.getTopic();
TopicValidator.ValidateTopicResult result = TopicValidator.validateTopic(topic);
if (!result.isValid()) {

if (UtilAll.isBlank(topic)) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(result.getRemark());
response.setRemark("The specified topic is blank.");
return response;
}

if (brokerController.getBrokerConfig().isValidateSystemTopicWhenUpdateTopic()) {
if (TopicValidator.isSystemTopic(topic)) {
response.setCode(ResponseCode.SYSTEM_ERROR);
Expand Down Expand Up @@ -699,27 +702,6 @@ private RemotingCommand getBrokerAclConfigVersion(ChannelHandlerContext ctx, Rem
return null;
}

private RemotingCommand getBrokerClusterAclConfig(ChannelHandlerContext ctx, RemotingCommand request) {

final RemotingCommand response = RemotingCommand.createResponseCommand(GetBrokerClusterAclConfigResponseHeader.class);

try {
AccessValidator accessValidator = this.brokerController.getAccessValidatorMap().get(PlainAccessValidator.class);
GetBrokerClusterAclConfigResponseBody body = new GetBrokerClusterAclConfigResponseBody();
AclConfig aclConfig = accessValidator.getAllAclConfig();
body.setGlobalWhiteAddrs(aclConfig.getGlobalWhiteAddrs());
body.setPlainAccessConfigs(aclConfig.getPlainAccessConfigs());
response.setCode(ResponseCode.SUCCESS);
response.setBody(body.encode());
response.setRemark(null);
return response;
} catch (Exception e) {
LOGGER.error("Failed to generate a proper getBrokerClusterAclConfig response", e);
}

return null;
}

private RemotingCommand getUnknownCmdResponse(ChannelHandlerContext ctx, RemotingCommand request) {
String error = " request type " + request.getCode() + " not supported";
final RemotingCommand response =
Expand Down Expand Up @@ -2752,7 +2734,7 @@ private RemotingCommand getBrokerEpochCache(ChannelHandlerContext ctx, RemotingC
return response;
}
final EpochEntryCache entryCache = new EpochEntryCache(brokerConfig.getBrokerClusterName(),
brokerConfig.getBrokerName(), brokerConfig.getBrokerId(), replicasManager.getEpochEntries(), this.brokerController.getMessageStore().getMaxPhyOffset());
brokerConfig.getBrokerName(), brokerConfig.getBrokerId(), replicasManager.getEpochEntries(), this.brokerController.getMessageStore().getMaxPhyOffset());

response.setBody(entryCache.encode());
response.setCode(ResponseCode.SUCCESS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingC
RemotingCommand response = RemotingCommand.createResponseCommand(PopMessageResponseHeader.class);
final PopMessageResponseHeader responseHeader = (PopMessageResponseHeader) response.readCustomHeader();
final PopMessageRequestHeader requestHeader =
(PopMessageRequestHeader) request.decodeCommandCustomHeader(PopMessageRequestHeader.class);
(PopMessageRequestHeader) request.decodeCommandCustomHeader(PopMessageRequestHeader.class, true);
StringBuilder startOffsetInfo = new StringBuilder(64);
StringBuilder msgOffsetInfo = new StringBuilder(64);
StringBuilder orderCountInfo = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ private boolean reviveRetry(PopCheckPoint popCheckPoint, MessageExt messageExt)
PutMessageResult putMessageResult = brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
PopMetricsManager.incPopReviveRetryMessageCount(popCheckPoint, putMessageResult.getPutMessageStatus());
if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.info("reviveQueueId={},retry msg , ck={}, msg queueId {}, offset {}, reviveDelay={}, result is {} ",
POP_LOGGER.info("reviveQueueId={},retry msg, ck={}, msg queueId {}, offset {}, reviveDelay={}, result is {} ",
queueId, popCheckPoint, messageExt.getQueueId(), messageExt.getQueueOffset(),
(System.currentTimeMillis() - popCheckPoint.getReviveTime()) / 1000, putMessageResult);
}
Expand Down Expand Up @@ -319,7 +319,7 @@ protected void consumeReviveMessage(ConsumeReviveObj consumeReviveObj) {
// offset self amend
while (true) {
if (!shouldRunPopRevive) {
POP_LOGGER.info("slave skip scan , revive topic={}, reviveQueueId={}", reviveTopic, queueId);
POP_LOGGER.info("slave skip scan, revive topic={}, reviveQueueId={}", reviveTopic, queueId);
break;
}
List<MessageExt> messageExts = getReviveMessage(offset, queueId);
Expand Down Expand Up @@ -351,7 +351,7 @@ protected void consumeReviveMessage(ConsumeReviveObj consumeReviveObj) {
noMsgCount = 0;
}
if (System.currentTimeMillis() - startScanTime > brokerController.getBrokerConfig().getReviveScanTime()) {
POP_LOGGER.info("reviveQueueId={}, scan timeout ", queueId);
POP_LOGGER.info("reviveQueueId={}, scan timeout ", queueId);
break;
}
for (MessageExt messageExt : messageExts) {
Expand All @@ -373,7 +373,7 @@ protected void consumeReviveMessage(ConsumeReviveObj consumeReviveObj) {
} else if (PopAckConstants.ACK_TAG.equals(messageExt.getTags())) {
String raw = new String(messageExt.getBody(), DataConverter.CHARSET_UTF8);
if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.info("reviveQueueId={},find ack, offset:{}, raw : {}", messageExt.getQueueId(), messageExt.getQueueOffset(), raw);
POP_LOGGER.info("reviveQueueId={}, find ack, offset:{}, raw : {}", messageExt.getQueueId(), messageExt.getQueueOffset(), raw);
}
AckMsg ackMsg = JSON.parseObject(raw, AckMsg.class);
PopMetricsManager.incPopReviveAckGetCount(ackMsg, queueId);
Expand Down Expand Up @@ -465,15 +465,15 @@ private PopCheckPoint createMockCkForAck(AckMsg ackMsg, long reviveOffset) {

protected void mergeAndRevive(ConsumeReviveObj consumeReviveObj) throws Throwable {
ArrayList<PopCheckPoint> sortList = consumeReviveObj.genSortList();
POP_LOGGER.info("reviveQueueId={},ck listSize={}", queueId, sortList.size());
POP_LOGGER.info("reviveQueueId={}, ck listSize={}", queueId, sortList.size());
if (sortList.size() != 0) {
POP_LOGGER.info("reviveQueueId={}, 1st ck, startOffset={}, reviveOffset={} ; last ck, startOffset={}, reviveOffset={}", queueId, sortList.get(0).getStartOffset(),
POP_LOGGER.info("reviveQueueId={}, 1st ck, startOffset={}, reviveOffset={}; last ck, startOffset={}, reviveOffset={}", queueId, sortList.get(0).getStartOffset(),
sortList.get(0).getReviveOffset(), sortList.get(sortList.size() - 1).getStartOffset(), sortList.get(sortList.size() - 1).getReviveOffset());
}
long newOffset = consumeReviveObj.oldOffset;
for (PopCheckPoint popCheckPoint : sortList) {
if (!shouldRunPopRevive) {
POP_LOGGER.info("slave skip ck process , revive topic={}, reviveQueueId={}", reviveTopic, queueId);
POP_LOGGER.info("slave skip ck process, revive topic={}, reviveQueueId={}", reviveTopic, queueId);
break;
}
if (consumeReviveObj.endTime - popCheckPoint.getReviveTime() <= (PopAckConstants.ackTimeInterval + PopAckConstants.SECOND)) {
Expand All @@ -483,12 +483,12 @@ protected void mergeAndRevive(ConsumeReviveObj consumeReviveObj) throws Throwabl
// check normal topic, skip ck , if normal topic is not exist
String normalTopic = KeyBuilder.parseNormalTopic(popCheckPoint.getTopic(), popCheckPoint.getCId());
if (brokerController.getTopicConfigManager().selectTopicConfig(normalTopic) == null) {
POP_LOGGER.warn("reviveQueueId={},can not get normal topic {} , then continue ", queueId, popCheckPoint.getTopic());
POP_LOGGER.warn("reviveQueueId={}, can not get normal topic {}, then continue", queueId, popCheckPoint.getTopic());
newOffset = popCheckPoint.getReviveOffset();
continue;
}
if (null == brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(popCheckPoint.getCId())) {
POP_LOGGER.warn("reviveQueueId={},can not get cid {} , then continue ", queueId, popCheckPoint.getCId());
POP_LOGGER.warn("reviveQueueId={}, can not get cid {}, then continue", queueId, popCheckPoint.getCId());
newOffset = popCheckPoint.getReviveOffset();
continue;
}
Expand Down Expand Up @@ -520,7 +520,7 @@ protected void mergeAndRevive(ConsumeReviveObj consumeReviveObj) throws Throwabl

private void reviveMsgFromCk(PopCheckPoint popCheckPoint) {
if (!shouldRunPopRevive) {
POP_LOGGER.info("slave skip retry , revive topic={}, reviveQueueId={}", reviveTopic, queueId);
POP_LOGGER.info("slave skip retry, revive topic={}, reviveQueueId={}", reviveTopic, queueId);
return;
}
inflightReviveRequestMap.put(popCheckPoint, new Pair<>(System.currentTimeMillis(), false));
Expand Down Expand Up @@ -646,7 +646,7 @@ public void run() {
consumeReviveMessage(consumeReviveObj);

if (!shouldRunPopRevive) {
POP_LOGGER.info("slave skip scan , revive topic={}, reviveQueueId={}", reviveTopic, queueId);
POP_LOGGER.info("slave skip scan, revive topic={}, reviveQueueId={}", reviveTopic, queueId);
continue;
}

Expand All @@ -662,7 +662,7 @@ public void run() {
currentReviveMessageTimestamp = System.currentTimeMillis();
}

POP_LOGGER.info("reviveQueueId={},revive finish,old offset is {}, new offset is {}, ckDelay={} ",
POP_LOGGER.info("reviveQueueId={}, revive finish,old offset is {}, new offset is {}, ckDelay={} ",
queueId, consumeReviveObj.oldOffset, consumeReviveObj.newOffset, delay);

if (sortList == null || sortList.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
Expand Down Expand Up @@ -154,7 +153,6 @@
import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil;
import org.apache.rocketmq.remoting.protocol.header.GetAllProducerInfoRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.GetBrokerAclConfigResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.GetBrokerClusterAclConfigResponseBody;
import org.apache.rocketmq.remoting.protocol.header.GetConsumeStatsInBrokerHeader;
import org.apache.rocketmq.remoting.protocol.header.GetConsumeStatsRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.GetConsumerConnectionListRequestHeader;
Expand Down Expand Up @@ -520,31 +518,6 @@ public ClusterAclVersionInfo getBrokerClusterAclInfo(final String addr,

}

public AclConfig getBrokerClusterConfig(final String addr,
final long timeoutMillis) throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException, MQBrokerException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CLUSTER_ACL_CONFIG, null);

RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
if (response.getBody() != null) {
GetBrokerClusterAclConfigResponseBody body =
GetBrokerClusterAclConfigResponseBody.decode(response.getBody(), GetBrokerClusterAclConfigResponseBody.class);
AclConfig aclConfig = new AclConfig();
aclConfig.setGlobalWhiteAddrs(body.getGlobalWhiteAddrs());
aclConfig.setPlainAccessConfigs(body.getPlainAccessConfigs());
return aclConfig;
}
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark(), addr);

}

public SendResult sendMessage(
final String addr,
final String brokerName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,14 +159,6 @@ public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String cli
private final ConcurrentMap<String, HashMap<Long, String>> brokerAddrTable = MQClientInstance.this.brokerAddrTable;
@Override
public void onChannelConnect(String remoteAddr, Channel channel) {
for (Map.Entry<String, HashMap<Long, String>> addressEntry : brokerAddrTable.entrySet()) {
for (String address : addressEntry.getValue().values()) {
if (address.equals(remoteAddr)) {
sendHeartbeatToAllBrokerWithLockV2(false);
break;
}
}
}
}

@Override
Expand All @@ -180,6 +172,18 @@ public void onChannelException(String remoteAddr, Channel channel) {
@Override
public void onChannelIdle(String remoteAddr, Channel channel) {
}

@Override
public void onChannelActive(String remoteAddr, Channel channel) {
for (Map.Entry<String, HashMap<Long, String>> addressEntry : brokerAddrTable.entrySet()) {
for (String address : addressEntry.getValue().values()) {
if (address.equals(remoteAddr)) {
sendHeartbeatToAllBrokerWithLockV2(false);
break;
}
}
}
}
};
} else {
channelEventListener = null;
Expand Down
Loading

0 comments on commit 49c3dc0

Please sign in to comment.