Skip to content

Commit

Permalink
update remoting
Browse files Browse the repository at this point in the history
  • Loading branch information
HScarb committed Sep 19, 2024
1 parent da5cc56 commit 88b14aa
Show file tree
Hide file tree
Showing 7 changed files with 241 additions and 153 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.rocketmq.acl.AccessValidator;
import org.apache.rocketmq.acl.plain.PlainAccessValidator;
import org.apache.rocketmq.auth.authentication.factory.AuthenticationFactory;
Expand Down Expand Up @@ -177,6 +154,31 @@
import org.apache.rocketmq.store.timer.TimerMessageStore;
import org.apache.rocketmq.store.timer.TimerMetrics;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.stream.Collectors;

public class BrokerController {
protected static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final Logger LOG_PROTECTION = LoggerFactory.getLogger(LoggerName.PROTECTION_LOGGER_NAME);
Expand Down Expand Up @@ -245,7 +247,7 @@ public class BrokerController {
protected CountDownLatch remotingServerStartLatch;
/**
* 处理除了拉取请求以外的请求,生产请求默认由 fastRemotingServer 处理
* 监听端口 listenrPort - 2,默认为 10909
* 监听端口 listenPort - 2,默认为 10909
*/
protected RemotingServer fastRemotingServer;
protected TopicConfigManager topicConfigManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,33 +16,14 @@
*/
package org.apache.rocketmq.remoting.netty;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.Future;
import io.opentelemetry.api.common.AttributesBuilder;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_IS_LONG_POLLING;
import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_REQUEST_CODE;
import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_RESPONSE_CODE;
import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_RESULT;
import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.RESULT_ONEWAY;
import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.RESULT_PROCESS_REQUEST_FAILED;
import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.RESULT_WRITE_CHANNEL_FAILED;

import org.apache.rocketmq.common.AbortProcessException;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.Pair;
Expand All @@ -65,13 +46,35 @@
import org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode;
import org.apache.rocketmq.remoting.protocol.ResponseCode;

import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_IS_LONG_POLLING;
import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_REQUEST_CODE;
import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_RESPONSE_CODE;
import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_RESULT;
import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.RESULT_ONEWAY;
import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.RESULT_PROCESS_REQUEST_FAILED;
import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.RESULT_WRITE_CHANNEL_FAILED;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import javax.annotation.Nullable;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.Future;
import io.opentelemetry.api.common.AttributesBuilder;

public abstract class NettyRemotingAbstract {

Expand All @@ -93,15 +96,15 @@ public abstract class NettyRemotingAbstract {
protected final Semaphore semaphoreAsync;

/**
* 当前正在等待对端返回的请求处理表,其中 opaque 表示请求的编号,全局唯一,通常采用原子递增。
* 当前正在等待对端返回的请求处理表,其中 Key 是 requestId,全局唯一,采用原子递增;Value 是响应结果的 Future 对象
* <p>
* 通常套路是客户端向对端发送网络请求时,通常会采取单一长连接,发送请求后立即返回 ResponseFuture,同时将请求放入到该映射表中,
* 收到客户端响应时(客户端响应会包含请求 code),从该映射表中获取对应的 ResponseFuture,通知调用端的返回结果。
* 这里是 Future 模式在网络编程中的经典运用。
*
* This map caches all on-going requests.
*/
protected final ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable =
protected final ConcurrentMap<Integer /* requestId */, ResponseFuture> responseTable =
new ConcurrentHashMap<>(256);

/**
Expand Down Expand Up @@ -187,9 +190,11 @@ public void putNettyEvent(final NettyEvent event) {
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) {
if (msg != null) {
switch (msg.getType()) {
// 处理对端发来的请求
case REQUEST_COMMAND:
processRequestCommand(ctx, msg);
break;
// 发出请求后,接收和处理对端的响应
case RESPONSE_COMMAND:
processResponseCommand(ctx, msg);
break;
Expand Down Expand Up @@ -388,6 +393,7 @@ private Runnable buildProcessRequestHandler(ChannelHandlerContext ctx, RemotingC
}

/**
* 处理之前发出请求的响应,从 {@link #responseTable} 中移除对应的 {@link ResponseFuture},并执行回调函数,设置响应结果,
* Process response from remote peer to the previous issued requests.
*
* @param ctx channel handler context.
Expand All @@ -399,8 +405,10 @@ public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cm
if (responseFuture != null) {
responseFuture.setResponseCommand(cmd);

// 从请求等待响应表中移除等待中的请求
responseTable.remove(opaque);

// 执行响应结束的回调函数
if (responseFuture.getInvokeCallback() != null) {
executeInvokeCallback(responseFuture);
} else {
Expand All @@ -413,6 +421,7 @@ public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cm
}

/**
* 为响应结果执行回调函数
* Execute callback in callback executor. If callback executor is null, run directly in current thread
*/
private void executeInvokeCallback(final ResponseFuture responseFuture) {
Expand Down Expand Up @@ -480,6 +489,7 @@ public void clearRPCHook() {
public abstract ExecutorService getCallbackExecutor();

/**
* 周期性扫描,移除过期的请求响应
* <p>
* This method is periodically invoked to scan and expire deprecated request.
* </p>
Expand Down Expand Up @@ -526,6 +536,10 @@ public CompletableFuture<ResponseFuture> invokeImpl(final Channel channel, final
return invoke0(channel, request, timeoutMillis);
}

/**
* Netty RPC 调用底层实现
* 使用 Netty Channel 发送请求,将响应 Future 返回,并且会放入 {@link #responseTable} 待响应请求表中
*/
protected CompletableFuture<ResponseFuture> invoke0(final Channel channel, final RemotingCommand request,
final long timeoutMillis) {
CompletableFuture<ResponseFuture> future = new CompletableFuture<>();
Expand All @@ -534,6 +548,7 @@ protected CompletableFuture<ResponseFuture> invoke0(final Channel channel, final

boolean acquired;
try {
// 获取信号量,控制并发
acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
} catch (Throwable t) {
future.completeExceptionally(t);
Expand All @@ -548,6 +563,7 @@ protected CompletableFuture<ResponseFuture> invoke0(final Channel channel, final
return future;
}

// 创建请求的响应 Future,放入待响应请求表 responseTable
AtomicReference<ResponseFuture> responseFutureReference = new AtomicReference<>();
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, request, timeoutMillis - costTime,
new InvokeCallback() {
Expand All @@ -567,13 +583,17 @@ public void operationFail(Throwable throwable) {
}
}, once);
responseFutureReference.set(responseFuture);
// 放入待响应请求表 responseTable
this.responseTable.put(opaque, responseFuture);
try {
// Netty Channel 写数据
channel.writeAndFlush(request).addListener((ChannelFutureListener) f -> {
if (f.isSuccess()) {
// 设置请求发送成功
responseFuture.setSendRequestOK(true);
return;
}
// 设置请求发送失败
requestFail(opaque);
log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
});
Expand Down Expand Up @@ -621,6 +641,12 @@ public void invokeAsyncImpl(final Channel channel, final RemotingCommand request
});
}

/**
* 设置请求失败
* <p>
* 从待响应请求表 {@link #responseTable} 中移除该 {@link ResponseFuture},并设置状态为请求发送失败
* @param opaque requestId
*/
private void requestFail(final int opaque) {
ResponseFuture responseFuture = responseTable.remove(opaque);
if (responseFuture != null) {
Expand Down
Loading

0 comments on commit 88b14aa

Please sign in to comment.