diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index ab96c8a26d..5e880ee03e 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -18,30 +18,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.AbstractMap; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; -import java.util.function.Function; -import java.util.stream.Collectors; + import org.apache.rocketmq.acl.AccessValidator; import org.apache.rocketmq.acl.plain.PlainAccessValidator; import org.apache.rocketmq.auth.authentication.factory.AuthenticationFactory; @@ -177,6 +154,31 @@ import org.apache.rocketmq.store.timer.TimerMessageStore; import org.apache.rocketmq.store.timer.TimerMetrics; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Function; +import java.util.stream.Collectors; + public class BrokerController { protected static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final Logger LOG_PROTECTION = LoggerFactory.getLogger(LoggerName.PROTECTION_LOGGER_NAME); @@ -245,7 +247,7 @@ public class BrokerController { protected CountDownLatch remotingServerStartLatch; /** * 处理除了拉取请求以外的请求,生产请求默认由 fastRemotingServer 处理 - * 监听端口 listenrPort - 2,默认为 10909 + * 监听端口 listenPort - 2,默认为 10909 */ protected RemotingServer fastRemotingServer; protected TopicConfigManager topicConfigManager; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java index 90c1706853..77a52857e0 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java @@ -16,33 +16,14 @@ */ package org.apache.rocketmq.remoting.netty; -import io.netty.channel.Channel; -import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.ssl.SslContext; -import io.netty.handler.ssl.SslHandler; -import io.netty.util.concurrent.Future; -import io.opentelemetry.api.common.AttributesBuilder; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map.Entry; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Consumer; -import javax.annotation.Nullable; +import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_IS_LONG_POLLING; +import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_REQUEST_CODE; +import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_RESPONSE_CODE; +import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_RESULT; +import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.RESULT_ONEWAY; +import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.RESULT_PROCESS_REQUEST_FAILED; +import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.RESULT_WRITE_CHANNEL_FAILED; + import org.apache.rocketmq.common.AbortProcessException; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.Pair; @@ -65,13 +46,35 @@ import org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode; import org.apache.rocketmq.remoting.protocol.ResponseCode; -import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_IS_LONG_POLLING; -import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_REQUEST_CODE; -import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_RESPONSE_CODE; -import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_RESULT; -import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.RESULT_ONEWAY; -import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.RESULT_PROCESS_REQUEST_FAILED; -import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.RESULT_WRITE_CHANNEL_FAILED; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map.Entry; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +import javax.annotation.Nullable; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslHandler; +import io.netty.util.concurrent.Future; +import io.opentelemetry.api.common.AttributesBuilder; public abstract class NettyRemotingAbstract { @@ -93,7 +96,7 @@ public abstract class NettyRemotingAbstract { protected final Semaphore semaphoreAsync; /** - * 当前正在等待对端返回的请求处理表,其中 opaque 表示请求的编号,全局唯一,通常采用原子递增。 + * 当前正在等待对端返回的请求处理表,其中 Key 是 requestId,全局唯一,采用原子递增;Value 是响应结果的 Future 对象 *

* 通常套路是客户端向对端发送网络请求时,通常会采取单一长连接,发送请求后立即返回 ResponseFuture,同时将请求放入到该映射表中, * 收到客户端响应时(客户端响应会包含请求 code),从该映射表中获取对应的 ResponseFuture,通知调用端的返回结果。 @@ -101,7 +104,7 @@ public abstract class NettyRemotingAbstract { * * This map caches all on-going requests. */ - protected final ConcurrentMap responseTable = + protected final ConcurrentMap responseTable = new ConcurrentHashMap<>(256); /** @@ -187,9 +190,11 @@ public void putNettyEvent(final NettyEvent event) { public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) { if (msg != null) { switch (msg.getType()) { + // 处理对端发来的请求 case REQUEST_COMMAND: processRequestCommand(ctx, msg); break; + // 发出请求后,接收和处理对端的响应 case RESPONSE_COMMAND: processResponseCommand(ctx, msg); break; @@ -388,6 +393,7 @@ private Runnable buildProcessRequestHandler(ChannelHandlerContext ctx, RemotingC } /** + * 处理之前发出请求的响应,从 {@link #responseTable} 中移除对应的 {@link ResponseFuture},并执行回调函数,设置响应结果, * Process response from remote peer to the previous issued requests. * * @param ctx channel handler context. @@ -399,8 +405,10 @@ public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cm if (responseFuture != null) { responseFuture.setResponseCommand(cmd); + // 从请求等待响应表中移除等待中的请求 responseTable.remove(opaque); + // 执行响应结束的回调函数 if (responseFuture.getInvokeCallback() != null) { executeInvokeCallback(responseFuture); } else { @@ -413,6 +421,7 @@ public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cm } /** + * 为响应结果执行回调函数 * Execute callback in callback executor. If callback executor is null, run directly in current thread */ private void executeInvokeCallback(final ResponseFuture responseFuture) { @@ -480,6 +489,7 @@ public void clearRPCHook() { public abstract ExecutorService getCallbackExecutor(); /** + * 周期性扫描,移除过期的请求响应 *

* This method is periodically invoked to scan and expire deprecated request. *

@@ -526,6 +536,10 @@ public CompletableFuture invokeImpl(final Channel channel, final return invoke0(channel, request, timeoutMillis); } + /** + * Netty RPC 调用底层实现 + * 使用 Netty Channel 发送请求,将响应 Future 返回,并且会放入 {@link #responseTable} 待响应请求表中 + */ protected CompletableFuture invoke0(final Channel channel, final RemotingCommand request, final long timeoutMillis) { CompletableFuture future = new CompletableFuture<>(); @@ -534,6 +548,7 @@ protected CompletableFuture invoke0(final Channel channel, final boolean acquired; try { + // 获取信号量,控制并发 acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); } catch (Throwable t) { future.completeExceptionally(t); @@ -548,6 +563,7 @@ protected CompletableFuture invoke0(final Channel channel, final return future; } + // 创建请求的响应 Future,放入待响应请求表 responseTable AtomicReference responseFutureReference = new AtomicReference<>(); final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, request, timeoutMillis - costTime, new InvokeCallback() { @@ -567,13 +583,17 @@ public void operationFail(Throwable throwable) { } }, once); responseFutureReference.set(responseFuture); + // 放入待响应请求表 responseTable this.responseTable.put(opaque, responseFuture); try { + // Netty Channel 写数据 channel.writeAndFlush(request).addListener((ChannelFutureListener) f -> { if (f.isSuccess()) { + // 设置请求发送成功 responseFuture.setSendRequestOK(true); return; } + // 设置请求发送失败 requestFail(opaque); log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel)); }); @@ -621,6 +641,12 @@ public void invokeAsyncImpl(final Channel channel, final RemotingCommand request }); } + /** + * 设置请求失败 + *

+ * 从待响应请求表 {@link #responseTable} 中移除该 {@link ResponseFuture},并设置状态为请求发送失败 + * @param opaque requestId + */ private void requestFail(final int opaque) { ResponseFuture responseFuture = responseTable.remove(opaque); if (responseFuture != null) { diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java index 3eee63ca63..c66c06bcc4 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java @@ -19,6 +19,52 @@ import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.TypeReference; import com.google.common.base.Stopwatch; + +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.common.Pair; +import org.apache.rocketmq.common.ThreadFactoryImpl; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.utils.ThreadUtils; +import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +import org.apache.rocketmq.remoting.ChannelEventListener; +import org.apache.rocketmq.remoting.InvokeCallback; +import org.apache.rocketmq.remoting.RemotingClient; +import org.apache.rocketmq.remoting.common.RemotingHelper; +import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.protocol.ResponseCode; +import org.apache.rocketmq.remoting.proxy.SocksProxyConfig; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.security.cert.CertificateException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + import io.netty.bootstrap.Bootstrap; import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.Channel; @@ -46,49 +92,6 @@ import io.netty.util.TimerTask; import io.netty.util.concurrent.DefaultEventExecutorGroup; import io.netty.util.concurrent.EventExecutorGroup; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.security.cert.CertificateException; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.Set; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import org.apache.commons.lang3.StringUtils; -import org.apache.rocketmq.common.Pair; -import org.apache.rocketmq.common.ThreadFactoryImpl; -import org.apache.rocketmq.common.constant.LoggerName; -import org.apache.rocketmq.common.utils.ThreadUtils; -import org.apache.rocketmq.logging.org.slf4j.Logger; -import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; -import org.apache.rocketmq.remoting.ChannelEventListener; -import org.apache.rocketmq.remoting.InvokeCallback; -import org.apache.rocketmq.remoting.RemotingClient; -import org.apache.rocketmq.remoting.common.RemotingHelper; -import org.apache.rocketmq.remoting.exception.RemotingConnectException; -import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; -import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; -import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; -import org.apache.rocketmq.remoting.protocol.RemotingCommand; -import org.apache.rocketmq.remoting.protocol.ResponseCode; -import org.apache.rocketmq.remoting.proxy.SocksProxyConfig; public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient { private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME); @@ -206,8 +209,10 @@ public void start() { new ThreadFactoryImpl("NettyClientWorkerThread_")); } Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class) + // 禁用 Nagle,即数据立即发送,如果为 false,当数据包较小时将会在数据累积到一定量后才发送 .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.SO_KEEPALIVE, false) + // 连接超时时间 .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()) .handler(new ChannelInitializer() { @Override @@ -223,10 +228,15 @@ public void initChannel(SocketChannel ch) throws Exception { } ch.pipeline().addLast( nettyClientConfig.isDisableNettyWorkerGroup() ? null : defaultEventExecutorGroup, + // 编码器 new NettyEncoder(), + // 解码器 new NettyDecoder(), + // 空闲检测,默认空闲时间为 120s new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()), + // 连接管理器,打印连接相关的日志 new NettyConnectManageHandler(), + // 客户端业务处理器,处理客户端收到的消息 new NettyClientHandler()); } }); @@ -626,6 +636,9 @@ private void updateChannelLastResponseTime(final String addr) { } } + /** + * 获取或者创建 {@link Channel}(异步) + */ private ChannelFuture getAndCreateChannelAsync(final String addr) throws InterruptedException { // 如果请求的地址为空,则请求 Nameserver,否则请求 Broker if (null == addr) { @@ -642,6 +655,9 @@ private ChannelFuture getAndCreateChannelAsync(final String addr) throws Interru return this.createChannelAsync(addr); } + /** + * 获取或创建 {@link Channel}(同步) + */ private Channel getAndCreateChannel(final String addr) throws InterruptedException { ChannelFuture channelFuture = getAndCreateChannelAsync(addr); if (channelFuture == null) { @@ -736,6 +752,10 @@ private ChannelFuture createChannelAsync(final String addr) throws InterruptedEx return null; } + /** + * 用客户端 {@link #bootstrap} 创建连接,并加入到 {@link #channelTables} 缓存 + * @return {@link ChannelFuture} + */ private ChannelWrapper createChannel(String addr) { String[] hostAndPort = getHostAndPort(addr); ChannelFuture channelFuture = fetchBootstrap(addr) @@ -757,6 +777,7 @@ public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis invokeCallback.operationFail(new RemotingConnectException(addr)); return; } + // Channel 创建成功之后触发 channelFuture.addListener(future -> { if (future.isSuccess()) { Channel channel = channelFuture.channel(); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java index 8eda006c6a..ef8864e9cc 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java @@ -16,6 +16,41 @@ */ package org.apache.rocketmq.remoting.netty; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.common.Pair; +import org.apache.rocketmq.common.ThreadFactoryImpl; +import org.apache.rocketmq.common.constant.HAProxyConstants; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.utils.BinaryUtil; +import org.apache.rocketmq.common.utils.NetworkUtil; +import org.apache.rocketmq.common.utils.ThreadUtils; +import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +import org.apache.rocketmq.remoting.ChannelEventListener; +import org.apache.rocketmq.remoting.InvokeCallback; +import org.apache.rocketmq.remoting.RemotingServer; +import org.apache.rocketmq.remoting.common.RemotingHelper; +import org.apache.rocketmq.remoting.common.TlsMode; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.security.cert.CertificateException; +import java.time.Duration; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; @@ -54,39 +89,6 @@ import io.netty.util.Timeout; import io.netty.util.TimerTask; import io.netty.util.concurrent.DefaultEventExecutorGroup; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.security.cert.CertificateException; -import java.time.Duration; -import java.util.List; -import java.util.NoSuchElementException; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.rocketmq.common.Pair; -import org.apache.rocketmq.common.ThreadFactoryImpl; -import org.apache.rocketmq.common.constant.HAProxyConstants; -import org.apache.rocketmq.common.constant.LoggerName; -import org.apache.rocketmq.common.utils.BinaryUtil; -import org.apache.rocketmq.common.utils.NetworkUtil; -import org.apache.rocketmq.common.utils.ThreadUtils; -import org.apache.rocketmq.logging.org.slf4j.Logger; -import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; -import org.apache.rocketmq.remoting.ChannelEventListener; -import org.apache.rocketmq.remoting.InvokeCallback; -import org.apache.rocketmq.remoting.RemotingServer; -import org.apache.rocketmq.remoting.common.RemotingHelper; -import org.apache.rocketmq.remoting.common.TlsMode; -import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; -import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; -import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; -import org.apache.rocketmq.remoting.protocol.RemotingCommand; /** * Netty服务端实现 @@ -123,11 +125,12 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti private final HashedWheelTimer timer = new HashedWheelTimer(r -> new Thread(r, "ServerHouseKeepingService")); /** - * Netty Handler 执行线程组,执行 TLS、编码、解码操作 + * Netty Handler 默认执行线程组,执行 TLS、编码、解码操作 */ private DefaultEventExecutorGroup defaultEventExecutorGroup; /** + * 服务端容器,{@link NettyRemotingServer} 可以包含多个子服务器,不同子服务器监听不同的端口 * NettyRemotingServer may hold multiple SubRemotingServer, each server will be stored in this container with a * ListenPort key. */ @@ -154,6 +157,9 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti * Server 段业务处理器,RocketMQ 服务端业务处理的入口 */ private NettyServerHandler serverHandler; + /** + * 服务端处理的请求码分布统计处理器,每秒统计服务端处理的请求码(和响应码)分布 + */ private RemotingCodeDistributionHandler distributionHandler; public NettyRemotingServer(final NettyServerConfig nettyServerConfig) { @@ -302,14 +308,17 @@ public void run(Timeout timeout) { */ protected ChannelPipeline configChannel(SocketChannel ch) { return ch.pipeline() + // TLS 握手处理器 .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, new HandshakeHandler()) .addLast(defaultEventExecutorGroup, encoder, new NettyDecoder(), + // 请求码统计处理器 distributionHandler, new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), connectionManageHandler, + // RocketMQ 服务端业务处理器 serverHandler ); } @@ -400,6 +409,11 @@ public Pair getDefaultProcessorPair() { return defaultRequestProcessorPair; } + /** + * 创建新的子服务端 + * @param port 子服务端绑定的端口 + * @return 子服务端 + */ @Override public RemotingServer newRemotingServer(final int port) { SubRemotingServer remotingServer = new SubRemotingServer(port, @@ -588,6 +602,7 @@ public class NettyServerHandler extends SimpleChannelInboundHandler + * 用于云服务的逻辑多租中分辨客户端和逻辑实例 + */ public class HAProxyMessageHandler extends ChannelInboundHandlerAdapter { @Override @@ -842,6 +864,11 @@ private void handleWithMessage(HAProxyMessage msg, Channel channel) { } } + /** + * 解析 Proxy Protocol 的 TLV,即 Type-Length-Value,Proxy Protocol 中带的自定义参数 + * @param tlv + * @param channel + */ protected void handleHAProxyTLV(HAProxyTLV tlv, Channel channel) { byte[] valueBytes = ByteBufUtil.getBytes(tlv.content()); if (!BinaryUtil.isAscii(valueBytes)) { diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java index b7e0bb95f7..134c16748a 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java @@ -26,6 +26,9 @@ public class NettyServerConfig implements Cloneable { * By default, it's wildcard address, listening all network interfaces. */ private String bindAddress = "0.0.0.0"; + /** + * 服务端绑定端口 + */ private int listenPort = 0; // Netty 业务线程个数 private int serverWorkerThreads = 8; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java index 0882818fea..4025211810 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java @@ -16,10 +16,6 @@ */ package org.apache.rocketmq.remoting.netty; -import io.netty.channel.Channel; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.rocketmq.remoting.InvokeCallback; import org.apache.rocketmq.remoting.common.SemaphoreReleaseOnlyOnce; import org.apache.rocketmq.remoting.exception.RemotingException; @@ -27,6 +23,15 @@ import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import io.netty.channel.Channel; + +/** + * Netty RPC 调用响应 + */ public class ResponseFuture { private final Channel channel; private final int opaque; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java index f6e3e9c898..f7683ddf4e 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java @@ -18,8 +18,16 @@ import com.alibaba.fastjson.annotation.JSONField; import com.google.common.base.Stopwatch; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; + +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.common.BoundaryType; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.annotation.CFNotNull; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; + import java.lang.annotation.Annotation; import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; @@ -33,14 +41,8 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.commons.lang3.StringUtils; -import org.apache.rocketmq.common.BoundaryType; -import org.apache.rocketmq.common.constant.LoggerName; -import org.apache.rocketmq.logging.org.slf4j.Logger; -import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; -import org.apache.rocketmq.remoting.CommandCustomHeader; -import org.apache.rocketmq.remoting.annotation.CFNotNull; -import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; public class RemotingCommand { public static final String SERIALIZE_TYPE_PROPERTY = "rocketmq.serialize.type"; @@ -86,7 +88,9 @@ public class RemotingCommand { private LanguageCode language = LanguageCode.JAVA; // 版本号 private int version = 0; - // 客户端请求序号 + /** + * 请求编号,即 requestId + */ private int opaque = requestId.getAndIncrement(); // 标记。倒数第一位表示请求类型,0:请求;1:返回。倒数第二位,1:单项发送 private int flag = 0;