Skip to content

Commit

Permalink
Merge branch 'develop' into dev_gyy_crc
Browse files Browse the repository at this point in the history
  • Loading branch information
guyinyou committed Oct 25, 2023
2 parents 2b9bff7 + f90c553 commit 147d3d7
Show file tree
Hide file tree
Showing 33 changed files with 145 additions and 401 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ public boolean updateGlobalWhiteAddrsConfig(List<String> globalWhiteAddrsList, S
return false;
}

if (!fileName.startsWith(fileHome)) {
if (!file.getAbsolutePath().startsWith(fileHome)) {
log.error("Parameter value " + fileName + " is not in the directory rocketmq.home.dir " + fileHome);
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,9 @@ public void onChannelIdle(String remoteAddr, Channel channel) {
this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel);
this.brokerController.getBrokerStatsManager().incChannelIdleNum();
}

@Override
public void onChannelActive(String remoteAddr, Channel channel) {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.apache.rocketmq.broker.plugin.BrokerAttachedPlugin;
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil;
import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.LockCallback;
Expand Down Expand Up @@ -130,8 +129,6 @@
import org.apache.rocketmq.remoting.protocol.header.GetAllProducerInfoRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.GetAllTopicConfigResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.GetBrokerAclConfigResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.GetBrokerClusterAclConfigResponseBody;
import org.apache.rocketmq.remoting.protocol.header.GetBrokerClusterAclConfigResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.GetBrokerConfigResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.GetConsumeStatsInBrokerHeader;
import org.apache.rocketmq.remoting.protocol.header.GetConsumeStatsRequestHeader;
Expand Down Expand Up @@ -311,8 +308,6 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx,
return updateGlobalWhiteAddrsConfig(ctx, request);
case RequestCode.RESUME_CHECK_HALF_MESSAGE:
return resumeCheckHalfMessage(ctx, request);
case RequestCode.GET_BROKER_CLUSTER_ACL_CONFIG:
return getBrokerClusterAclConfig(ctx, request);
case RequestCode.GET_TOPIC_CONFIG:
return getTopicConfig(ctx, request);
case RequestCode.UPDATE_AND_CREATE_STATIC_TOPIC:
Expand Down Expand Up @@ -699,27 +694,6 @@ private RemotingCommand getBrokerAclConfigVersion(ChannelHandlerContext ctx, Rem
return null;
}

private RemotingCommand getBrokerClusterAclConfig(ChannelHandlerContext ctx, RemotingCommand request) {

final RemotingCommand response = RemotingCommand.createResponseCommand(GetBrokerClusterAclConfigResponseHeader.class);

try {
AccessValidator accessValidator = this.brokerController.getAccessValidatorMap().get(PlainAccessValidator.class);
GetBrokerClusterAclConfigResponseBody body = new GetBrokerClusterAclConfigResponseBody();
AclConfig aclConfig = accessValidator.getAllAclConfig();
body.setGlobalWhiteAddrs(aclConfig.getGlobalWhiteAddrs());
body.setPlainAccessConfigs(aclConfig.getPlainAccessConfigs());
response.setCode(ResponseCode.SUCCESS);
response.setBody(body.encode());
response.setRemark(null);
return response;
} catch (Exception e) {
LOGGER.error("Failed to generate a proper getBrokerClusterAclConfig response", e);
}

return null;
}

private RemotingCommand getUnknownCmdResponse(ChannelHandlerContext ctx, RemotingCommand request) {
String error = " request type " + request.getCode() + " not supported";
final RemotingCommand response =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingC
RemotingCommand response = RemotingCommand.createResponseCommand(PopMessageResponseHeader.class);
final PopMessageResponseHeader responseHeader = (PopMessageResponseHeader) response.readCustomHeader();
final PopMessageRequestHeader requestHeader =
(PopMessageRequestHeader) request.decodeCommandCustomHeader(PopMessageRequestHeader.class);
(PopMessageRequestHeader) request.decodeCommandCustomHeader(PopMessageRequestHeader.class, true);
StringBuilder startOffsetInfo = new StringBuilder(64);
StringBuilder msgOffsetInfo = new StringBuilder(64);
StringBuilder orderCountInfo = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
Expand Down Expand Up @@ -154,7 +153,6 @@
import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil;
import org.apache.rocketmq.remoting.protocol.header.GetAllProducerInfoRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.GetBrokerAclConfigResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.GetBrokerClusterAclConfigResponseBody;
import org.apache.rocketmq.remoting.protocol.header.GetConsumeStatsInBrokerHeader;
import org.apache.rocketmq.remoting.protocol.header.GetConsumeStatsRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.GetConsumerConnectionListRequestHeader;
Expand Down Expand Up @@ -520,31 +518,6 @@ public ClusterAclVersionInfo getBrokerClusterAclInfo(final String addr,

}

public AclConfig getBrokerClusterConfig(final String addr,
final long timeoutMillis) throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException, MQBrokerException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CLUSTER_ACL_CONFIG, null);

RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
if (response.getBody() != null) {
GetBrokerClusterAclConfigResponseBody body =
GetBrokerClusterAclConfigResponseBody.decode(response.getBody(), GetBrokerClusterAclConfigResponseBody.class);
AclConfig aclConfig = new AclConfig();
aclConfig.setGlobalWhiteAddrs(body.getGlobalWhiteAddrs());
aclConfig.setPlainAccessConfigs(body.getPlainAccessConfigs());
return aclConfig;
}
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark(), addr);

}

public SendResult sendMessage(
final String addr,
final String brokerName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,14 +159,6 @@ public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String cli
private final ConcurrentMap<String, HashMap<Long, String>> brokerAddrTable = MQClientInstance.this.brokerAddrTable;
@Override
public void onChannelConnect(String remoteAddr, Channel channel) {
for (Map.Entry<String, HashMap<Long, String>> addressEntry : brokerAddrTable.entrySet()) {
for (String address : addressEntry.getValue().values()) {
if (address.equals(remoteAddr)) {
sendHeartbeatToAllBrokerWithLockV2(false);
break;
}
}
}
}

@Override
Expand All @@ -180,6 +172,18 @@ public void onChannelException(String remoteAddr, Channel channel) {
@Override
public void onChannelIdle(String remoteAddr, Channel channel) {
}

@Override
public void onChannelActive(String remoteAddr, Channel channel) {
for (Map.Entry<String, HashMap<Long, String>> addressEntry : brokerAddrTable.entrySet()) {
for (String address : addressEntry.getValue().values()) {
if (address.equals(remoteAddr)) {
sendHeartbeatToAllBrokerWithLockV2(false);
break;
}
}
}
}
};
} else {
channelEventListener = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.TopicConfig;
Expand All @@ -62,8 +61,6 @@
import org.apache.rocketmq.remoting.protocol.header.ChangeInvisibleTimeRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.ChangeInvisibleTimeResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil;
import org.apache.rocketmq.remoting.protocol.header.GetBrokerClusterAclConfigResponseBody;
import org.apache.rocketmq.remoting.protocol.header.GetBrokerClusterAclConfigResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.GetConsumerListByGroupResponseBody;
import org.apache.rocketmq.remoting.protocol.header.GetConsumerListByGroupResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.GetEarliestMsgStoretimeResponseHeader;
Expand Down Expand Up @@ -700,30 +697,6 @@ public RemotingCommand answer(InvocationOnMock mock) {
mqClientAPI.createTopic(brokerAddr, topic, new TopicConfig(), 10000);
}

@Test
public void testGetBrokerClusterConfig() throws Exception {
doAnswer(new Answer<RemotingCommand>() {
@Override
public RemotingCommand answer(InvocationOnMock mock) {
RemotingCommand request = mock.getArgument(1);

RemotingCommand response = RemotingCommand.createResponseCommand(GetBrokerClusterAclConfigResponseHeader.class);
GetBrokerClusterAclConfigResponseBody body = new GetBrokerClusterAclConfigResponseBody();
body.setGlobalWhiteAddrs(Collections.singletonList("1.1.1.1"));
body.setPlainAccessConfigs(Collections.singletonList(new PlainAccessConfig()));
response.setBody(body.encode());
response.makeCustomHeaderToNet();
response.setCode(ResponseCode.SUCCESS);
response.setOpaque(request.getOpaque());
return response;
}
}).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong());

AclConfig aclConfig = mqClientAPI.getBrokerClusterConfig(brokerAddr, 10000);
assertThat(aclConfig.getPlainAccessConfigs()).size().isGreaterThan(0);
assertThat(aclConfig.getGlobalWhiteAddrs()).size().isGreaterThan(0);
}

@Test
public void testViewMessage() throws Exception {
doAnswer(new Answer<RemotingCommand>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public void run0() {
LOG.error("ScheduledTask fetchNameServerAddr exception", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}, 1000 * 10, this.brokerContainerConfig.getUpdateNamesrvAddrInterval(), TimeUnit.MILLISECONDS);
} else if (this.brokerContainerConfig.isFetchNamesrvAddrByAddressServer()) {
this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(BrokerIdentity.BROKER_CONTAINER_IDENTITY) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ public class BrokerContainerConfig {
*/
private long fetchNamesrvAddrInterval = 10 * 1000;

/**
* The interval to update namesrv addr, default value is 120 second
*/
private long updateNamesrvAddrInterval = 60 * 2 * 1000;

public String getRocketmqHome() {
return rocketmqHome;
}
Expand Down Expand Up @@ -95,4 +100,12 @@ public long getFetchNamesrvAddrInterval() {
public void setFetchNamesrvAddrInterval(final long fetchNamesrvAddrInterval) {
this.fetchNamesrvAddrInterval = fetchNamesrvAddrInterval;
}

public long getUpdateNamesrvAddrInterval() {
return updateNamesrvAddrInterval;
}

public void setUpdateNamesrvAddrInterval(long updateNamesrvAddrInterval) {
this.updateNamesrvAddrInterval = updateNamesrvAddrInterval;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,10 @@ private synchronized RemotingCommand addBroker(ChannelHandlerContext ctx,
LOGGER.error("addBroker load config from {} failed, {}", configPath, e);
}
} else {
byte[] body = request.getBody();
if (body != null) {
String bodyStr = new String(body, MixAll.DEFAULT_CHARSET);
brokerProperties = MixAll.string2Properties(bodyStr);
}
LOGGER.error("addBroker config path is empty");
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("addBroker config path is empty");
return response;
}

if (brokerProperties == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ public void onChannelIdle(String remoteAddr, Channel channel) {
onChannelOperation(CallbackCode.IDLE, remoteAddr, channel);
}

@Override
public void onChannelActive(String remoteAddr, Channel channel) {
onChannelOperation(CallbackCode.ACTIVE, remoteAddr, channel);
}

private void onChannelOperation(CallbackCode callbackCode, String remoteAddr, Channel channel) {
Collection<InnerBrokerController> masterBrokers = this.brokerContainer.getMasterBrokers();
Collection<InnerSalveBrokerController> slaveBrokers = this.brokerContainer.getSlaveBrokers();
Expand Down Expand Up @@ -103,6 +108,10 @@ public enum CallbackCode {
/**
* onChannelIdle
*/
IDLE
IDLE,
/**
* onChannelActive
*/
ACTIVE
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,9 @@ public void onChannelException(String remoteAddr, Channel channel) {
public void onChannelIdle(String remoteAddr, Channel channel) {
this.controllerManager.getHeartbeatManager().onBrokerChannelClose(channel);
}

@Override
public void onChannelActive(String remoteAddr, Channel channel) {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public void run() {

if (msgs != null && !msgs.isEmpty()) {
this.doSomething(msgs);
//update offset to broker
//update offset to local memory, eventually to broker
consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset());
//print pull tps
this.incPullTPS(topic, pullResult.getMsgFoundList().size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,9 @@ public void onChannelException(String remoteAddr, Channel channel) {
public void onChannelIdle(String remoteAddr, Channel channel) {
this.namesrvController.getRouteInfoManager().onChannelDestroy(channel);
}

@Override
public void onChannelActive(String remoteAddr, Channel channel) {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,9 @@ public void onChannelIdle(String remoteAddr, Channel channel) {
this.clientManagerActivity.doChannelCloseEvent(remoteAddr, channel);
}

@Override
public void onChannelActive(String remoteAddr, Channel channel) {

}
}

Loading

0 comments on commit 147d3d7

Please sign in to comment.