Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #7300] jRaft-Controller Implemention #7301

Merged
merged 31 commits into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
7529187
jRaft-Controller Implemention
yulangz Sep 5, 2023
2edc6c7
fix bazel build
yulangz Sep 6, 2023
c24e268
reformat code
yulangz Sep 6, 2023
855a8ba
remove fury dependence
yulangz Sep 6, 2023
c973b56
fix bazel build
yulangz Sep 6, 2023
14d45c7
Merge branch 'develop' into jraft-controller
yulangz Sep 21, 2023
1196047
Merge branch 'develop' into jraft-controller
yulangz Sep 21, 2023
d775f87
resolve conflict
yulangz Sep 21, 2023
ed2e82f
clear code
yulangz Sep 21, 2023
8800d69
Optimize code style
yulangz Sep 22, 2023
8b38ee6
Optimize code style
yulangz Sep 27, 2023
e01ef73
Optimize code style
yulangz Sep 28, 2023
a172a60
Merge remote-tracking branch 'origin/develop' into jraft-controller
yulangz Oct 4, 2023
8f3f8c1
revert style only change
yulangz Oct 4, 2023
adffdf2
Merge branch 'apache:develop' into jraft-controller
yulangz Oct 9, 2023
9ce99b6
Update Producer.java
yulangz Oct 9, 2023
6c86484
chore: move raft startup to start method
leizhiyuan Dec 5, 2023
14bac4b
chore: use jraftconfig to collect all configs about jraft
leizhiyuan Dec 5, 2023
863b49e
fix: fix wrong store path because init use string constant
leizhiyuan Dec 5, 2023
05062f3
fix: fix CONTROLLER_NOT_LEADER error in follower
leizhiyuan Dec 5, 2023
27e8134
chore: seperate jraft and controller log
leizhiyuan Dec 5, 2023
44d5717
Merge branch 'develop' into jraft-controller
leizhiyuan Dec 5, 2023
323ea1c
chore: fix conflict with develop
leizhiyuan Dec 5, 2023
03b0cb2
chore: add comment to clear the filter logic
leizhiyuan Dec 6, 2023
15002d0
feat: triggerElectMaster will retry when failed
leizhiyuan Dec 6, 2023
677faf3
feat: when controller all restart, we use a timestamp to trace the fi…
leizhiyuan Dec 6, 2023
2b33ccb
fix: implements Serializable to enable snapshot serialize
leizhiyuan Dec 14, 2023
8e1f71c
fix: use for loop to simple the elect retry
leizhiyuan Dec 18, 2023
2f362ef
chore: update jraft version
leizhiyuan Jan 29, 2024
e0f8ede
Merge branch 'develop' into jraft-controller
leizhiyuan Jan 29, 2024
e5b4ae5
chore: opt import
leizhiyuan Jan 29, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ maven_install(
"com.adobe.testing:s3mock-junit4:2.11.0",
"io.github.aliyunmq:rocketmq-grpc-netty-codec-haproxy:1.0.0",
"io.github.aliyunmq:rocketmq-rocksdb:1.0.3",
"com.alipay.sofa:jraft-core:1.3.11",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be 1.3.14?

"com.alipay.sofa:hessian:3.3.6",
],
fetch_sources = True,
repositories = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ public class ControllerConfig {

private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
private String configStorePath = System.getProperty("user.home") + File.separator + "controller" + File.separator + "controller.properties";

public static final String DLEDGER_CONTROLLER = "DLedger";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be better to use an enum

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using enum is indeed more elegant, but I think using string constants here is sufficient.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

public static final String JRAFT_CONTROLLER = "jRaft";
private String controllerType = DLEDGER_CONTROLLER;
/**
* Interval of periodic scanning for non-active broker;
* Unit: millisecond
Expand All @@ -45,7 +47,7 @@ public class ControllerConfig {
private String controllerDLegerPeers;
private String controllerDLegerSelfId;
private int mappedFileSize = 1024 * 1024 * 1024;
private String controllerStorePath = System.getProperty("user.home") + File.separator + "DledgerController";
private String controllerStorePath = "";

/**
* Whether the controller can elect a master which is not in the syncStateSet.
Expand All @@ -67,6 +69,13 @@ public class ControllerConfig {
*/
private long scanInactiveMasterInterval = 5 * 1000;

private int jRaftElectionTimeoutMs = 1000;
private int jRaftSnapshotIntervalSecs = 3600;
private String jRaftGroupId = "jRaft-Controller";
private String jRaftServerId = "localhost:9880";
private String jRaftInitConf = "localhost:9880,localhost:9881,localhost:9882";
private String jRaftControllerRPCAddr = "localhost:9770,localhost:9771,localhost:9772";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving these configurations to a jraftConfig class will make it clearer.


private MetricsExporterType metricsExporterType = MetricsExporterType.DISABLE;

private String metricsGrpcExporterTarget = "";
Expand Down Expand Up @@ -156,6 +165,9 @@ public void setMappedFileSize(int mappedFileSize) {
}

public String getControllerStorePath() {
if (controllerStorePath.isEmpty()) {
yulangz marked this conversation as resolved.
Show resolved Hide resolved
return System.getProperty("user.home") + File.separator + controllerType + "Controller";
}
return controllerStorePath;
}

Expand Down Expand Up @@ -201,6 +213,10 @@ public String getDLedgerAddress() {
.map(x -> x.split("-")[1]).findFirst().get();
}

public String getJRaftAddress() {
return jRaftServerId;
}

public MetricsExporterType getMetricsExporterType() {
return metricsExporterType;
}
Expand Down Expand Up @@ -280,4 +296,60 @@ public boolean isMetricsInDelta() {
public void setMetricsInDelta(boolean metricsInDelta) {
this.metricsInDelta = metricsInDelta;
}

public String getControllerType() {
return controllerType;
}

public void setControllerType(String controllerType) {
this.controllerType = controllerType;
}

public int getjRaftElectionTimeoutMs() {
return jRaftElectionTimeoutMs;
}

public void setjRaftElectionTimeoutMs(int jRaftElectionTimeoutMs) {
this.jRaftElectionTimeoutMs = jRaftElectionTimeoutMs;
}

public int getjRaftSnapshotIntervalSecs() {
return jRaftSnapshotIntervalSecs;
}

public void setjRaftSnapshotIntervalSecs(int jRaftSnapshotIntervalSecs) {
this.jRaftSnapshotIntervalSecs = jRaftSnapshotIntervalSecs;
}

public String getjRaftGroupId() {
return jRaftGroupId;
}

public void setjRaftGroupId(String jRaftGroupId) {
this.jRaftGroupId = jRaftGroupId;
}

public String getjRaftServerId() {
return jRaftServerId;
}

public void setjRaftServerId(String jRaftServerId) {
this.jRaftServerId = jRaftServerId;
}

public String getjRaftInitConf() {
return jRaftInitConf;
}

public void setjRaftInitConf(String jRaftInitConf) {
this.jRaftInitConf = jRaftInitConf;
}

public String getjRaftControllerRPCAddr() {
return jRaftControllerRPCAddr;
}

public void setjRaftControllerRPCAddr(String jRaftControllerRPCAddr) {
this.jRaftControllerRPCAddr = jRaftControllerRPCAddr;
}
}
4 changes: 3 additions & 1 deletion common/src/main/java/org/apache/rocketmq/common/Pair.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
*/
package org.apache.rocketmq.common;

public class Pair<T1, T2> {
import java.io.Serializable;

public class Pair<T1, T2> implements Serializable {
private T1 object1;
private T2 object2;

Expand Down
4 changes: 3 additions & 1 deletion controller/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ java_library(
"@maven//:io_opentelemetry_opentelemetry_sdk_metrics",
"@maven//:io_opentelemetry_opentelemetry_exporter_logging",
"@maven//:org_slf4j_jul_to_slf4j",
"@maven//:com_alipay_sofa_jraft_core",
"@maven//:com_alipay_sofa_hessian",
],
)

Expand All @@ -68,7 +70,7 @@ java_library(
"@maven//:org_apache_commons_commons_lang3",
"@maven//:io_netty_netty_all",
"@maven//:com_google_guava_guava",
"@maven//:com_alibaba_fastjson",
"@maven//:com_alibaba_fastjson",
],
resources = glob(["src/test/resources/certs/*.pem"]) + glob(["src/test/resources/certs/*.key"])
)
Expand Down
22 changes: 21 additions & 1 deletion controller/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>rocketmq-all</artifactId>
<groupId>org.apache.rocketmq</groupId>
Expand Down Expand Up @@ -62,5 +63,24 @@
<groupId>org.slf4j</groupId>
<artifactId>jul-to-slf4j</artifactId>
</dependency>
<dependency>
<groupId>com.alipay.sofa</groupId>
<artifactId>jraft-core</artifactId>
<version>1.3.11</version>
<exclusions>
<exclusion>
<groupId>org.ow2.asm</groupId>
<artifactId>asm</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,32 @@
package org.apache.rocketmq.controller;

import io.netty.channel.Channel;
import java.util.Map;
import org.apache.rocketmq.common.ControllerConfig;
import org.apache.rocketmq.controller.helper.BrokerLifecycleListener;
import org.apache.rocketmq.controller.impl.heartbeat.BrokerLiveInfo;
import org.apache.rocketmq.controller.impl.heartbeat.DefaultBrokerHeartbeatManager;
import org.apache.rocketmq.controller.impl.heartbeat.RaftBrokerHeartBeatManager;

import java.util.Map;

public interface BrokerHeartbeatManager {
public static final long DEFAULT_BROKER_CHANNEL_EXPIRED_TIME = 1000 * 10;

public static BrokerHeartbeatManager newBrokerHeartbeatManager(ControllerConfig controllerConfig) {
if (controllerConfig.getControllerType().equals(ControllerConfig.JRAFT_CONTROLLER)) {
return new RaftBrokerHeartBeatManager(controllerConfig);
} else {
return new DefaultBrokerHeartbeatManager(controllerConfig);
}
}

/**
* initialize the resources
*
* @return
*/
void initialize();

/**
* Broker new heartbeat.
*/
Expand Down Expand Up @@ -67,6 +82,7 @@ void onBrokerHeartbeat(final String clusterName, final String brokerName, final

/**
* Count the number of active brokers in each broker-set of each cluster
*
* @return active brokers count
*/
Map<String/*cluster*/, Map<String/*broker-set*/, Integer/*active broker num*/>> getActiveBrokersNum();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,21 @@

package org.apache.rocketmq.controller;

import java.util.List;
yulangz marked this conversation as resolved.
Show resolved Hide resolved
import java.util.concurrent.CompletableFuture;

import org.apache.rocketmq.controller.helper.BrokerLifecycleListener;
import org.apache.rocketmq.remoting.RemotingServer;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.body.SyncStateSet;
import org.apache.rocketmq.remoting.protocol.header.controller.AlterSyncStateSetRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.admin.CleanControllerBrokerDataRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.admin.CleanControllerBrokerDataRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.register.ApplyBrokerIdRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerRequestHeader;

import java.util.List;
import java.util.concurrent.CompletableFuture;

/**
* The api for controller
*/
Expand Down Expand Up @@ -113,6 +113,7 @@ CompletableFuture<RemotingCommand> alterSyncStateSet(

/**
* Add broker's lifecycle listener
*
* @param listener listener
*/
void registerBrokerLifecycleListener(final BrokerLifecycleListener listener);
Expand All @@ -124,7 +125,6 @@ CompletableFuture<RemotingCommand> alterSyncStateSet(

/**
* Clean controller broker data
*
*/
CompletableFuture<RemotingCommand> cleanBrokerData(final CleanControllerBrokerDataRequestHeader requestHeader);
}
Loading