-
Notifications
You must be signed in to change notification settings - Fork 11.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ISSUE #7300] jRaft-Controller Implemention #7301
Conversation
@@ -275,13 +275,17 @@ public class RequestCode { | |||
* clean broker data | |||
*/ | |||
public static final int CLEAN_BROKER_DATA = 1011; | |||
public static final int CONTROLLER_GET_NEXT_BROKER_ID = 1012; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这些协议上的内容需要谨慎评估下
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Request changes
# Conflicts: # controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java # controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java # controller/src/main/java/org/apache/rocketmq/controller/impl/heartbeat/DefaultBrokerHeartbeatManager.java
...oller/src/main/java/org/apache/rocketmq/controller/impl/manager/RaftReplicasInfoManager.java
Outdated
Show resolved
Hide resolved
controller/src/main/java/org/apache/rocketmq/controller/impl/JRaftControllerStateMachine.java
Outdated
Show resolved
Hide resolved
...roller/src/main/java/org/apache/rocketmq/controller/impl/task/BrokerCloseChannelRequest.java
Show resolved
Hide resolved
...oller/src/main/java/org/apache/rocketmq/controller/impl/manager/RaftReplicasInfoManager.java
Outdated
Show resolved
Hide resolved
} | ||
ControllerClosure closure = new ControllerClosure(request); | ||
Task task = closure.taskWithThisClosure(); | ||
if (task != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
applyToJRaft方法调用jraft FSMCallerImpl#doCommitted 期间,doCommitted方法里会回调Closure,最后调到ControllerClosure#run方法,这里会返回结果给调用方。但之后doCommitted才会调用doApplyTasks,doApplyTasks方法最终调用JRaftControllerStateMachine#processEvent。
这里会不会有有并发问题,即processEvent方法还没有结束,ControllerClosure#run方法就返回结果给调用方了。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
在processEvent的最后才会执行ControllerClosure#run,之后ControllerClosure才会完成他的异步回调并返回给调用方,调用方得到返回时processEvent已经执行完了
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yulangz ,你看图中的逻辑,从上到下的顺序,如果在iterImpl#done不为nul,就回调ControllerClosure#run方法,而
processEvent要在doApplyTasks方法才被回调。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这也许是 JRaft 的一个 Issue,我参照的 JRaft 的示例CounterStateMachine#OnApply编写的这段代码,并且预期 ControllerClosure#run 应该在 processEvent 尾部被调用
我暂时不知道这是不是 JRaft 本身实现的问题,但是我在测试过程中没有因此而导致什么错误。你可以去 JRaft 询问一下这个问题。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yulangz 我初步翻看 JRaft CounterService Demo,这个Demo和你提的PR还是不一样,Demo 中相关数据变更都在CounterStateMachine中,而你提的PR在processEvent处理业务业务逻辑(数据变更)、ControllerClosure#run返回结果,这二个操作并非原子,存在并发风险。
至于你反馈测试正常,并发问题发生概率本身就比较低,不一定可以测出。
建议你针对PR,评估下这块怎么优化,确保这二个操作是原子的 。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
controller/src/main/java/org/apache/rocketmq/controller/impl/JRaftControllerStateMachine.java
Outdated
Show resolved
Hide resolved
controller/src/main/java/org/apache/rocketmq/controller/impl/JRaftControllerStateMachine.java
Outdated
Show resolved
Hide resolved
controller/src/main/java/org/apache/rocketmq/controller/impl/JRaftControllerStateMachine.java
Outdated
Show resolved
Hide resolved
.../src/main/java/org/apache/rocketmq/controller/impl/heartbeat/RaftBrokerHeartBeatManager.java
Outdated
Show resolved
Hide resolved
@@ -24,7 +24,9 @@ public class ControllerConfig { | |||
|
|||
private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV)); | |||
private String configStorePath = System.getProperty("user.home") + File.separator + "controller" + File.separator + "controller.properties"; | |||
|
|||
public static final String DLEDGER_CONTROLLER = "DLedger"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would be better to use an enum
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using enum is indeed more elegant, but I think using string constants here is sufficient.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
controller/src/main/java/org/apache/rocketmq/controller/Controller.java
Outdated
Show resolved
Hide resolved
controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
Outdated
Show resolved
Hide resolved
@@ -16,12 +16,6 @@ | |||
*/ | |||
package org.apache.rocketmq.controller; | |||
|
|||
import java.io.BufferedInputStream; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to only format the code you change or add. It may cause confusion for reviewers if formatting the
irrelevant file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think It's hard to find your real changes in the file diff if you format the whole file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will rollback it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you :)
@@ -303,7 +303,7 @@ private boolean appendToDLedgerAndWait(final AppendEntryRequest request) { | |||
attributesBuilder.put(LABEL_DLEDGER_OPERATION_STATUS, ControllerMetricsConstant.DLedgerOperationStatus.FAILED.getLowerCaseName()).build()); | |||
return false; | |||
} | |||
dLedgerFuture.get(5, TimeUnit.SECONDS); | |||
dLedgerFuture.get(50, TimeUnit.SECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why increase this value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an accident, already rollback.
import org.apache.rocketmq.common.constant.LoggerName; | ||
import org.apache.rocketmq.controller.impl.event.EventMessage; | ||
import org.apache.rocketmq.controller.impl.event.EventSerializer; | ||
import org.apache.rocketmq.controller.impl.manager.ReplicasInfoManager; | ||
import org.apache.rocketmq.logging.org.slf4j.Logger; | ||
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; | ||
|
||
import java.util.concurrent.CompletableFuture; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, did you import the rocketmq-code-style
, cause it's need not change in my IDE
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have imported rocketmq-code-style
, and I have rollback the modification of style only for this file.
controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerReplicaInfo.java
Outdated
Show resolved
Hide resolved
controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerReplicaInfo.java
Show resolved
Hide resolved
fix code style rollback an unexpected modification.
Hello, I am replicating and enhancing the jraft experiment. I would like to consult you on the following questions:
|
nodeOptions.setInitialConf(initConf); | ||
|
||
FileUtils.forceMkdir(new File(controllerConfig.getControllerStorePath())); | ||
nodeOptions.setLogUri(controllerConfig.getControllerStorePath() + File.separator + "log"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that the path will always be "xxx/DLedgerController/xxxx", even if i set the controllerType as "jRaft".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
try { | ||
RemotingCommand remotingCommand = future.get(5, java.util.concurrent.TimeUnit.SECONDS); | ||
if (remotingCommand.getCode() != ResponseCode.SUCCESS) { | ||
throw new RuntimeException("on broker heartbeat return invalid code, code: " + remotingCommand.getCode()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
private int jRaftElectionTimeoutMs = 1000; | ||
private int jRaftSnapshotIntervalSecs = 3600; | ||
private String jRaftGroupId = "jRaft-Controller"; | ||
private String jRaftServerId = "localhost:9880"; | ||
private String jRaftInitConf = "localhost:9880,localhost:9881,localhost:9882"; | ||
private String jRaftControllerRPCAddr = "localhost:9770,localhost:9771,localhost:9772"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moving these configurations to a jraftConfig class will make it clearer.
nettyServerConfig.setListenPort(Integer.parseInt(this.peerIdToAddr.get(serverId).split(":")[1])); | ||
remotingServer = new NettyRemotingServer(nettyServerConfig, channelEventListener); | ||
|
||
this.node = this.raftGroupService.start(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Placing this line in startup function would be better. Or is there any reason why it must be placed in initialization?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
if (remotingCommand.getCode() != ResponseCode.SUCCESS) { | ||
throw new RuntimeException("on broker heartbeat return invalid code, code: " + remotingCommand.getCode()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be better to handle the return code "CONTROLLER_NOT_LEADER" here. Broker will send heartbeat to all controllers and folllower controllers will return "CONTROLLER_NOT_LEADER".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
<fileNamePattern> | ||
${user.home}${file.separator}logs${file.separator}rocketmqlogs${file.separator}otherdays${file.separator}dledger.%i.log.gz | ||
</fileNamePattern> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
能不能像DLedgerController那样(dledger内部日志和controller业务日志分离),jraft内部日志放在jraft.log,controller的业务日志放在controller.log,现在开启jraftController后日志都混在一起(jraft日志既在controller.log,也有在controller_default.log),不是很方便进行排查。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
public ControllerResult<CheckNotActiveBrokerResponse> checkNotActiveBroker(CheckNotActiveBrokerRequest request) { | ||
List<BrokerIdentityInfo> notActiveBrokerIdentityInfoList = new ArrayList<>(); | ||
long checkTime = request.getCheckTimeMillis(); | ||
final Iterator<Map.Entry<BrokerIdentityInfo, BrokerLiveInfo>> iterator = this.brokerLiveTable.entrySet().iterator(); | ||
while (iterator.hasNext()) { | ||
final Map.Entry<BrokerIdentityInfo, BrokerLiveInfo> next = iterator.next(); | ||
long last = next.getValue().getLastUpdateTimestamp(); | ||
long timeoutMillis = next.getValue().getHeartbeatTimeoutMillis(); | ||
if (checkTime - last > timeoutMillis) { | ||
notActiveBrokerIdentityInfoList.add(next.getKey()); | ||
iterator.remove(); | ||
log.warn("Broker expired, brokerInfo {}, expired {}ms", next.getKey(), timeoutMillis); | ||
} | ||
} | ||
List<String> needReElectBrokerNames = scanNeedReelectBrokerSets(new BrokerValidPredicate() { | ||
@Override | ||
public boolean check(String clusterName, String brokerName, Long brokerId) { | ||
return !isBrokerActive(clusterName, brokerName, brokerId, checkTime); | ||
} | ||
}); | ||
Set<String> alreadyReportedBrokerName = notActiveBrokerIdentityInfoList.stream() | ||
.map(BrokerIdentityInfo::getBrokerName) | ||
.collect(Collectors.toSet()); | ||
notActiveBrokerIdentityInfoList.addAll(needReElectBrokerNames.stream() | ||
.filter(brokerName -> !alreadyReportedBrokerName.contains(brokerName)) | ||
.map(brokerName -> new BrokerIdentityInfo(null, brokerName, null)) | ||
.collect(Collectors.toList())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里有两个问题,为了方便我用中文描述:
- checkNotActiveBroker和electMaster是两种日志,一般先执行checkNotActiveBroker,再执行electMaster,如果checkNotActiveBroker执行成功了,brokerLiveTable就移除了这个broker,但electMaster日志没有应用成功(假设该日志controller切换后被截断了)没有选新主,那么下次checkNotActiveBroker就不会再去扫描到这个broker不活跃,electMaster行为就丢失了,也就是broker无法正常切换。
- 还有一个是如果controller全部下线,再重新上线,预期我们应该不影响到broker正常运行,但当前的设计可能在心跳前先调用checkNotActiveBroker触发broker 重新选举,这个需要避免。
1 yes
broker
|
I will take over this pull request later. cc @yulangz |
# Conflicts: # WORKSPACE # controller/src/main/java/org/apache/rocketmq/controller/ControllerStartup.java # controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
…rst heartbeat, avoid to elect again
int jRaftElectionTimeoutMs = 1000; | ||
|
||
int jRaftScanWaitTimeoutMs = 1000; | ||
int jRaftSnapshotIntervalSecs = 3600; | ||
String jRaftGroupId = "jRaft-Controller"; | ||
String jRaftServerId = "localhost:9880"; | ||
String jRaftInitConf = "localhost:9880,localhost:9881,localhost:9882"; | ||
String jRaftControllerRPCAddr = "localhost:9770,localhost:9771,localhost:9772"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be better to add private embellishments.
@@ -107,6 +107,8 @@ maven_install( | |||
"com.adobe.testing:s3mock-junit4:2.11.0", | |||
"io.github.aliyunmq:rocketmq-grpc-netty-codec-haproxy:1.0.0", | |||
"org.apache.rocketmq:rocketmq-rocksdb:1.0.2", | |||
"com.alipay.sofa:jraft-core:1.3.11", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be 1.3.14?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Which Issue(s) This PR Fixes
Fixes #7300
Brief Description
Use JRaft to Implement Controller.
New Configuration
Design
On the Broker side, the JRaft Controller did not make any modifications and continued to use all the designs and concepts of DLedger Controller, such as Epoch and SyncStateSet.
On the Controller side, JRaft Controller not only migrated, but also refactored the original components and code logic, not only supporting SnapShot functionality, but also solving the unreasonable aspects of the original design, making the Controller conform to linear consistency.
How Did You Test This Change?
Use Openchaos to test it.