Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #7300] jRaft-Controller Implemention #7301

Merged
merged 31 commits into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
7529187
jRaft-Controller Implemention
yulangz Sep 5, 2023
2edc6c7
fix bazel build
yulangz Sep 6, 2023
c24e268
reformat code
yulangz Sep 6, 2023
855a8ba
remove fury dependence
yulangz Sep 6, 2023
c973b56
fix bazel build
yulangz Sep 6, 2023
14d45c7
Merge branch 'develop' into jraft-controller
yulangz Sep 21, 2023
1196047
Merge branch 'develop' into jraft-controller
yulangz Sep 21, 2023
d775f87
resolve conflict
yulangz Sep 21, 2023
ed2e82f
clear code
yulangz Sep 21, 2023
8800d69
Optimize code style
yulangz Sep 22, 2023
8b38ee6
Optimize code style
yulangz Sep 27, 2023
e01ef73
Optimize code style
yulangz Sep 28, 2023
a172a60
Merge remote-tracking branch 'origin/develop' into jraft-controller
yulangz Oct 4, 2023
8f3f8c1
revert style only change
yulangz Oct 4, 2023
adffdf2
Merge branch 'apache:develop' into jraft-controller
yulangz Oct 9, 2023
9ce99b6
Update Producer.java
yulangz Oct 9, 2023
6c86484
chore: move raft startup to start method
leizhiyuan Dec 5, 2023
14bac4b
chore: use jraftconfig to collect all configs about jraft
leizhiyuan Dec 5, 2023
863b49e
fix: fix wrong store path because init use string constant
leizhiyuan Dec 5, 2023
05062f3
fix: fix CONTROLLER_NOT_LEADER error in follower
leizhiyuan Dec 5, 2023
27e8134
chore: seperate jraft and controller log
leizhiyuan Dec 5, 2023
44d5717
Merge branch 'develop' into jraft-controller
leizhiyuan Dec 5, 2023
323ea1c
chore: fix conflict with develop
leizhiyuan Dec 5, 2023
03b0cb2
chore: add comment to clear the filter logic
leizhiyuan Dec 6, 2023
15002d0
feat: triggerElectMaster will retry when failed
leizhiyuan Dec 6, 2023
677faf3
feat: when controller all restart, we use a timestamp to trace the fi…
leizhiyuan Dec 6, 2023
2b33ccb
fix: implements Serializable to enable snapshot serialize
leizhiyuan Dec 14, 2023
8e1f71c
fix: use for loop to simple the elect retry
leizhiyuan Dec 18, 2023
2f362ef
chore: update jraft version
leizhiyuan Jan 29, 2024
e0f8ede
Merge branch 'develop' into jraft-controller
leizhiyuan Jan 29, 2024
e5b4ae5
chore: opt import
leizhiyuan Jan 29, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ maven_install(
"com.adobe.testing:s3mock-junit4:2.11.0",
"io.github.aliyunmq:rocketmq-grpc-netty-codec-haproxy:1.0.0",
"io.github.aliyunmq:rocketmq-rocksdb:1.0.3",
"com.alipay.sofa:jraft-core:1.3.11",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be 1.3.14?

"com.alipay.sofa:hessian:3.3.6",
],
fetch_sources = True,
repositories = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@
import org.apache.rocketmq.common.metrics.MetricsExporterType;

public class ControllerConfig {

private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
private String configStorePath = System.getProperty("user.home") + File.separator + "controller" + File.separator + "controller.properties";

public static final String DLEDGER_CONTROLLER = "DLedger";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be better to use an enum

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using enum is indeed more elegant, but I think using string constants here is sufficient.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

public static final String JRAFT_CONTROLLER = "jRaft";
private String controllerType = DLEDGER_CONTROLLER;
/**
* Interval of periodic scanning for non-active broker;
* Unit: millisecond
Expand All @@ -45,7 +46,7 @@ public class ControllerConfig {
private String controllerDLegerPeers;
private String controllerDLegerSelfId;
private int mappedFileSize = 1024 * 1024 * 1024;
private String controllerStorePath = System.getProperty("user.home") + File.separator + "DledgerController";
private String controllerStorePath = System.getProperty("user.home") + File.separator + controllerType + "Controller";

/**
* Whether the controller can elect a master which is not in the syncStateSet.
Expand All @@ -67,6 +68,13 @@ public class ControllerConfig {
*/
private long scanInactiveMasterInterval = 5 * 1000;

private int jRaftElectionTimeoutMs = 1000;
private int jRaftSnapshotIntervalSecs = 3600;
private String jRaftGroupId = "jRaft-Controller";
private String jRaftServerId = "localhost:9880";
private String jRaftInitConf = "localhost:9880,localhost:9881,localhost:9882";
private String jRaftControllerRPCAddr = "localhost:9770,localhost:9771,localhost:9772";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving these configurations to a jraftConfig class will make it clearer.


private MetricsExporterType metricsExporterType = MetricsExporterType.DISABLE;

private String metricsGrpcExporterTarget = "";
Expand Down Expand Up @@ -201,6 +209,10 @@ public String getDLedgerAddress() {
.map(x -> x.split("-")[1]).findFirst().get();
}

public String getJRaftAddress() {
return jRaftServerId;
}

public MetricsExporterType getMetricsExporterType() {
return metricsExporterType;
}
Expand Down Expand Up @@ -280,4 +292,60 @@ public boolean isMetricsInDelta() {
public void setMetricsInDelta(boolean metricsInDelta) {
this.metricsInDelta = metricsInDelta;
}

public String getControllerType() {
return controllerType;
}

public void setControllerType(String controllerType) {
this.controllerType = controllerType;
}

public int getjRaftElectionTimeoutMs() {
return jRaftElectionTimeoutMs;
}

public void setjRaftElectionTimeoutMs(int jRaftElectionTimeoutMs) {
this.jRaftElectionTimeoutMs = jRaftElectionTimeoutMs;
}

public int getjRaftSnapshotIntervalSecs() {
return jRaftSnapshotIntervalSecs;
}

public void setjRaftSnapshotIntervalSecs(int jRaftSnapshotIntervalSecs) {
this.jRaftSnapshotIntervalSecs = jRaftSnapshotIntervalSecs;
}

public String getjRaftGroupId() {
return jRaftGroupId;
}

public void setjRaftGroupId(String jRaftGroupId) {
this.jRaftGroupId = jRaftGroupId;
}

public String getjRaftServerId() {
return jRaftServerId;
}

public void setjRaftServerId(String jRaftServerId) {
this.jRaftServerId = jRaftServerId;
}

public String getjRaftInitConf() {
return jRaftInitConf;
}

public void setjRaftInitConf(String jRaftInitConf) {
this.jRaftInitConf = jRaftInitConf;
}

public String getjRaftControllerRPCAddr() {
return jRaftControllerRPCAddr;
}

public void setjRaftControllerRPCAddr(String jRaftControllerRPCAddr) {
this.jRaftControllerRPCAddr = jRaftControllerRPCAddr;
}
}
4 changes: 3 additions & 1 deletion common/src/main/java/org/apache/rocketmq/common/Pair.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
*/
package org.apache.rocketmq.common;

public class Pair<T1, T2> {
import java.io.Serializable;

public class Pair<T1, T2> implements Serializable {
private T1 object1;
private T2 object2;

Expand Down
4 changes: 3 additions & 1 deletion controller/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ java_library(
"@maven//:io_opentelemetry_opentelemetry_exporter_logging",
"@maven//:io_opentelemetry_opentelemetry_exporter_logging_otlp",
"@maven//:org_slf4j_jul_to_slf4j",
"@maven//:com_alipay_sofa_jraft_core",
"@maven//:com_alipay_sofa_hessian",
],
)

Expand All @@ -69,7 +71,7 @@ java_library(
"@maven//:org_apache_commons_commons_lang3",
"@maven//:io_netty_netty_all",
"@maven//:com_google_guava_guava",
"@maven//:com_alibaba_fastjson",
"@maven//:com_alibaba_fastjson",
],
resources = glob(["src/test/resources/certs/*.pem"]) + glob(["src/test/resources/certs/*.key"])
)
Expand Down
22 changes: 21 additions & 1 deletion controller/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>rocketmq-all</artifactId>
<groupId>org.apache.rocketmq</groupId>
Expand Down Expand Up @@ -62,5 +63,24 @@
<groupId>org.slf4j</groupId>
<artifactId>jul-to-slf4j</artifactId>
</dependency>
<dependency>
<groupId>com.alipay.sofa</groupId>
<artifactId>jraft-core</artifactId>
<version>1.3.11</version>
<exclusions>
<exclusion>
<groupId>org.ow2.asm</groupId>
<artifactId>asm</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,32 @@
package org.apache.rocketmq.controller;

import io.netty.channel.Channel;
import java.util.Map;
import org.apache.rocketmq.common.ControllerConfig;
import org.apache.rocketmq.controller.helper.BrokerLifecycleListener;
import org.apache.rocketmq.controller.impl.heartbeat.BrokerLiveInfo;
import org.apache.rocketmq.controller.impl.heartbeat.DefaultBrokerHeartbeatManager;
import org.apache.rocketmq.controller.impl.heartbeat.RaftBrokerHeartBeatManager;

import java.util.Map;

public interface BrokerHeartbeatManager {
public static final long DEFAULT_BROKER_CHANNEL_EXPIRED_TIME = 1000 * 10;

public static BrokerHeartbeatManager newBrokerHeartbeatManager(ControllerConfig controllerConfig) {
if (controllerConfig.getControllerType().equals(ControllerConfig.JRAFT_CONTROLLER)) {
return new RaftBrokerHeartBeatManager(controllerConfig);
} else {
return new DefaultBrokerHeartbeatManager(controllerConfig);
}
}

/**
* initialize the resources
*
* @return
*/
void initialize();

/**
* Broker new heartbeat.
*/
Expand Down Expand Up @@ -67,6 +82,7 @@ void onBrokerHeartbeat(final String clusterName, final String brokerName, final

/**
* Count the number of active brokers in each broker-set of each cluster
*
* @return active brokers count
*/
Map<String/*cluster*/, Map<String/*broker-set*/, Integer/*active broker num*/>> getActiveBrokersNum();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.controller;

import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
Expand All @@ -26,17 +27,16 @@
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.ControllerConfig;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;

import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.controller.elect.impl.DefaultElectPolicy;
import org.apache.rocketmq.controller.impl.DLedgerController;
import org.apache.rocketmq.controller.impl.heartbeat.DefaultBrokerHeartbeatManager;
import org.apache.rocketmq.controller.impl.JRaftController;
import org.apache.rocketmq.controller.impl.heartbeat.RaftBrokerHeartBeatManager;
import org.apache.rocketmq.controller.metrics.ControllerMetricsManager;
import org.apache.rocketmq.controller.processor.ControllerRequestProcessor;
import org.apache.rocketmq.logging.org.slf4j.Logger;
Expand Down Expand Up @@ -68,12 +68,10 @@ public class ControllerManager {
private final Configuration configuration;
private final RemotingClient remotingClient;
private Controller controller;
private BrokerHeartbeatManager heartbeatManager;
private final BrokerHeartbeatManager heartbeatManager;
private ExecutorService controllerRequestExecutor;
private BlockingQueue<Runnable> controllerRequestThreadPoolQueue;

private NotifyService notifyService;

private final NotifyService notifyService;
private ControllerMetricsManager controllerMetricsManager;

public ControllerManager(ControllerConfig controllerConfig, NettyServerConfig nettyServerConfig,
Expand All @@ -85,7 +83,7 @@ public ControllerManager(ControllerConfig controllerConfig, NettyServerConfig ne
this.configuration = new Configuration(log, this.controllerConfig, this.nettyServerConfig);
this.configuration.setStorePathFromConfig(this.controllerConfig, "configStorePath");
this.remotingClient = new NettyRemotingClient(nettyClientConfig);
this.heartbeatManager = new DefaultBrokerHeartbeatManager(this.controllerConfig);
this.heartbeatManager = BrokerHeartbeatManager.newBrokerHeartbeatManager(controllerConfig);
this.notifyService = new NotifyService();
}

Expand All @@ -100,15 +98,31 @@ public boolean initialize() {
new ThreadFactoryImpl("ControllerRequestExecutorThread_"));

this.notifyService.initialize();
if (StringUtils.isEmpty(this.controllerConfig.getControllerDLegerPeers())) {
throw new IllegalArgumentException("Attribute value controllerDLegerPeers of ControllerConfig is null or empty");
}
if (StringUtils.isEmpty(this.controllerConfig.getControllerDLegerSelfId())) {
throw new IllegalArgumentException("Attribute value controllerDLegerSelfId of ControllerConfig is null or empty");

if (controllerConfig.getControllerType().equals(ControllerConfig.JRAFT_CONTROLLER)) {
if (StringUtils.isEmpty(this.controllerConfig.getjRaftInitConf())) {
throw new IllegalArgumentException("Attribute value jRaftInitConf of ControllerConfig is null or empty");
}
if (StringUtils.isEmpty(this.controllerConfig.getjRaftServerId())) {
throw new IllegalArgumentException("Attribute value jRaftServerId of ControllerConfig is null or empty");
}
try {
this.controller = new JRaftController(controllerConfig, this.brokerHousekeepingService);
((RaftBrokerHeartBeatManager) this.heartbeatManager).setController((JRaftController) this.controller);
} catch (IOException e) {
throw new RuntimeException(e);
}
} else {
if (StringUtils.isEmpty(this.controllerConfig.getControllerDLegerPeers())) {
throw new IllegalArgumentException("Attribute value controllerDLegerPeers of ControllerConfig is null or empty");
}
if (StringUtils.isEmpty(this.controllerConfig.getControllerDLegerSelfId())) {
throw new IllegalArgumentException("Attribute value controllerDLegerSelfId of ControllerConfig is null or empty");
}
this.controller = new DLedgerController(this.controllerConfig, this.heartbeatManager::isBrokerActive,
this.nettyServerConfig, this.nettyClientConfig, this.brokerHousekeepingService,
new DefaultElectPolicy(this.heartbeatManager::isBrokerActive, this.heartbeatManager::getBrokerLiveInfo));
}
this.controller = new DLedgerController(this.controllerConfig, this.heartbeatManager::isBrokerActive,
this.nettyServerConfig, this.nettyClientConfig, this.brokerHousekeepingService,
new DefaultElectPolicy(this.heartbeatManager::isBrokerActive, this.heartbeatManager::getBrokerLiveInfo));

// Initialize the basic resources
this.heartbeatManager.initialize();
Expand All @@ -126,10 +140,12 @@ public boolean initialize() {
* something else.
*
* @param clusterName The cluster name of this inactive broker
* @param brokerName The inactive broker name
* @param brokerId The inactive broker id, null means that the election forced to be triggered
* @param brokerName The inactive broker name
* @param brokerId The inactive broker id, null means that the election forced to be triggered
*/
private void onBrokerInactive(String clusterName, String brokerName, Long brokerId) {
log.info("Controller Manager received broker inactive event, clusterName: {}, brokerName: {}, brokerId: {}",
clusterName, brokerName, brokerId);
if (controller.isLeaderState()) {
if (brokerId == null) {
// Means that force triggering election for this broker-set
Expand Down Expand Up @@ -188,20 +204,21 @@ public void notifyBrokerRoleChanged(final RoleChangeNotifyEntry entry) {
// Inform all active brokers
final Map<Long, String> brokerAddrs = memberGroup.getBrokerAddrs();
brokerAddrs.entrySet().stream().filter(x -> this.heartbeatManager.isBrokerActive(clusterName, brokerName, x.getKey()))
.forEach(x -> this.notifyService.notifyBroker(x.getValue(), entry));
.forEach(x -> this.notifyService.notifyBroker(x.getValue(), entry));
}
}

/**
* Notify broker that there are roles-changing in controller
*
* @param brokerAddr target broker's address to notify
* @param entry role change entry
* @param entry role change entry
*/
public void doNotifyBrokerRoleChanged(final String brokerAddr, final RoleChangeNotifyEntry entry) {
if (StringUtils.isNoneEmpty(brokerAddr)) {
log.info("Try notify broker {} that role changed, RoleChangeNotifyEntry:{}", brokerAddr, entry);
final NotifyBrokerRoleChangedRequestHeader requestHeader = new NotifyBrokerRoleChangedRequestHeader(entry.getMasterAddress(), entry.getMasterBrokerId(),
entry.getMasterEpoch(), entry.getSyncStateSetEpoch());
entry.getMasterEpoch(), entry.getSyncStateSetEpoch());
final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.NOTIFY_BROKER_ROLE_CHANGED, requestHeader);
request.setBody(new SyncStateSet(entry.getSyncStateSet(), entry.getSyncStateSetEpoch()).encode());
try {
Expand All @@ -214,7 +231,7 @@ public void doNotifyBrokerRoleChanged(final String brokerAddr, final RoleChangeN

public void registerProcessor() {
final ControllerRequestProcessor controllerRequestProcessor = new ControllerRequestProcessor(this);
final RemotingServer controllerRemotingServer = this.controller.getRemotingServer();
RemotingServer controllerRemotingServer = this.controller.getRemotingServer();
assert controllerRemotingServer != null;
controllerRemotingServer.registerProcessor(RequestCode.CONTROLLER_ALTER_SYNC_STATE_SET, controllerRequestProcessor, this.controllerRequestExecutor);
controllerRemotingServer.registerProcessor(RequestCode.CONTROLLER_ELECT_MASTER, controllerRequestProcessor, this.controllerRequestExecutor);
Expand All @@ -231,8 +248,8 @@ public void registerProcessor() {
}

public void start() {
this.heartbeatManager.start();
this.controller.startup();
this.heartbeatManager.start();
this.remotingClient.start();
}

Expand Down Expand Up @@ -335,7 +352,9 @@ public int hashCode() {

@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (this == obj) {
return true;
}
if (!(obj instanceof NotifyTask)) {
return false;
}
Expand Down
Loading