diff --git a/pom.xml b/pom.xml index db47311..0be6495 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ 4.0.0 org.summerboot jexpress - 2.4.17 + 2.4.18 jar Summer Boot jExpress Summer Boot jExpress focuses on solving non-functional and operational maintainability requirements, @@ -58,20 +58,29 @@ release + + UTF-8 + 17 + 2.1.0 + 3.3.1 + 3.11.2 + 3.1.3 + 3.2.7 + 1.7.0 + - org.apache.maven.plugins maven-scm-plugin - 2.1.0 + ${maven-scm.version} org.apache.maven.plugins maven-source-plugin - 3.3.1 + ${maven-source.version} attach-sources @@ -86,15 +95,7 @@ org.apache.maven.plugins maven-javadoc-plugin - 3.10.1 - - 17 - - UTF-8 - UTF-8 - UTF-8 - - + ${maven-javadoc.version} package @@ -110,7 +111,7 @@ org.apache.maven.plugins maven-deploy-plugin - 3.1.3 + ${maven-deploy.version} default-deploy @@ -125,7 +126,7 @@ org.apache.maven.plugins maven-gpg-plugin - 3.2.7 + ${maven-gpg.version} verify @@ -140,7 +141,7 @@ org.sonatype.plugins nexus-staging-maven-plugin - 1.7.0 + ${nexus-staging-maven-plugin.version} true ossrh @@ -178,14 +179,15 @@ UTF-8 - 17 - 17 + + + 17 3.4.0 3.13.0 - 3.8.0 + 3.8.1 3.4.2 3.3.1 - 3.5.1 + 3.5.2 3.1.3 3.17.0 @@ -194,7 +196,7 @@ - 2.24.2 + 2.24.3 4.0.0 2.0.1 @@ -204,14 +206,14 @@ 0.12.6 - 4.1.115.Final + 4.1.116.Final 2.0.69.Final - 1.68.1 - 33.3.1-jre - 4.28.3 + 1.69.0 + 33.4.0-jre + 4.29.2 - 2.2.26 + 2.2.27 @@ -223,17 +225,17 @@ 0.10.2 - 2.18.1 + 2.18.2 6.0.1 - 11.0.1 - 8.0.1.Final + 11.0.2 + 8.0.2.Final 7.0.0 - 6.6.3.Final + 6.6.4.Final 6.2.1 @@ -243,7 +245,7 @@ 1.2.5 - 2.3.33 + 2.3.34 3.5.3 diff --git a/src/main/java/org/summerboot/jexpress/boot/BackOffice.java b/src/main/java/org/summerboot/jexpress/boot/BackOffice.java index 35f5be7..0811d26 100644 --- a/src/main/java/org/summerboot/jexpress/boot/BackOffice.java +++ b/src/main/java/org/summerboot/jexpress/boot/BackOffice.java @@ -58,7 +58,7 @@ protected void loadCustomizedConfigs(File cfgFile, boolean isReal, ConfigUtil he tpeMax = CPU_CORE + 1; } - tpe = buildThreadPoolExecutor(tpe, "BackOffice", ThreadingMode.Mixed, + tpe = buildThreadPoolExecutor(tpe, "BackOffice", tpeThreadingMode, tpeCore, tpeMax, tpeQueue, tpeKeepAliveSeconds, new ThreadPoolExecutor.DiscardPolicy(), prestartAllCoreThreads, allowCoreThreadTimeOut, false); @@ -191,6 +191,12 @@ public Map getBootErrorCodeMapping() { @Config(key = "portinuse.alert.message", defaultValue = ALERT_MSG_PORT_IN_USE) private String portInUseAlertMessage = ALERT_MSG_PORT_IN_USE; + @Config(key = "backoffice.executor.mode", defaultValue = "VirtualThread", + desc = "valid value = VirtualThread (default for Java 21+), CPU, IO and Mixed (default for old Java) \n use CPU core + 1 when application is CPU bound\n" + + "use CPU core x 2 + 1 when application is I/O bound\n" + + "need to find the best value based on your performance test result when nio.server.BizExecutor.mode=Mixed") + protected volatile ThreadingMode tpeThreadingMode = ThreadingMode.VirtualThread; + @Config(key = "backoffice.executor.core", defaultValue = "3", desc = "0 = current computer/VM's available processors + 1") private int tpeCore = 3; diff --git a/src/main/java/org/summerboot/jexpress/boot/BootConstant.java b/src/main/java/org/summerboot/jexpress/boot/BootConstant.java index 8371396..a63734c 100644 --- a/src/main/java/org/summerboot/jexpress/boot/BootConstant.java +++ b/src/main/java/org/summerboot/jexpress/boot/BootConstant.java @@ -27,7 +27,7 @@ public interface BootConstant { String APP_ID = String.format("%06d", new Random().nextInt(999999)); //version - String VERSION = "jExpress 2.4.17"; + String VERSION = "jExpress 2.4.18"; String JEXPRESS_PACKAGE_NAME = "org.summerboot.jexpress"; String DEFAULT_ADMIN_MM = "changeit"; diff --git a/src/main/java/org/summerboot/jexpress/boot/SummerApplication.java b/src/main/java/org/summerboot/jexpress/boot/SummerApplication.java index d688290..2e30f07 100644 --- a/src/main/java/org/summerboot/jexpress/boot/SummerApplication.java +++ b/src/main/java/org/summerboot/jexpress/boot/SummerApplication.java @@ -310,7 +310,8 @@ public void start() { int port = bindingAddress.getPort(); log.trace("5a. binding gRPC on {}:{}", host, port); try (var a = Timeout.watch("starting gRPCServer at " + host + ":" + port, timeoutMs).withDesc(timeoutDesc)) { - GRPCServer gRPCServer = new GRPCServer(host, port, gRPCCfg.getKmf(), gRPCCfg.getTmf(), serverInterceptor, gRPCCfg.getTpe(), nioListener); + boolean useVirtualThread = gRPCCfg.getTpeThreadingMode().equals(GRPCServerConfig.ThreadingMode.VirtualThread); + GRPCServer gRPCServer = new GRPCServer(host, port, gRPCCfg.getKmf(), gRPCCfg.getTmf(), serverInterceptor, gRPCCfg.getTpe(), useVirtualThread, nioListener); ServerBuilder serverBuilder = gRPCServer.getServerBuilder(); for (Class c : gRPCBindableServiceImplClasses) { BindableService impl = guiceInjector.getInstance(c); diff --git a/src/main/java/org/summerboot/jexpress/boot/config/BootConfig.java b/src/main/java/org/summerboot/jexpress/boot/config/BootConfig.java index 9a8f756..af358ef 100644 --- a/src/main/java/org/summerboot/jexpress/boot/config/BootConfig.java +++ b/src/main/java/org/summerboot/jexpress/boot/config/BootConfig.java @@ -27,6 +27,7 @@ import org.summerboot.jexpress.boot.config.annotation.Config; import org.summerboot.jexpress.boot.config.annotation.ConfigHeader; import org.summerboot.jexpress.boot.config.annotation.ImportResource; +import org.summerboot.jexpress.nio.server.AbortPolicyWithReport; import org.summerboot.jexpress.security.SecurityUtil; import org.summerboot.jexpress.util.ApplicationUtil; import org.summerboot.jexpress.util.BeanUtil; @@ -59,6 +60,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -588,22 +590,23 @@ protected static int getLength(String s) { protected static final int CPU_CORE = Runtime.getRuntime().availableProcessors(); public enum ThreadingMode { - CPU, IO, Mixed + VirtualThread, CPU, IO, Mixed + } + + public static ThreadPoolExecutor buildThreadPoolExecutor(String tpeName) { + return buildThreadPoolExecutor(null, tpeName, ThreadingMode.VirtualThread, 0, 0, Integer.MAX_VALUE, 60, null, false, true, false); + } + + public static ThreadPoolExecutor buildThreadPoolExecutor(String tpeName, ThreadingMode threadingMode, int core, int max, int queue, long keepAliveSec) { + return buildThreadPoolExecutor(null, tpeName, threadingMode, core, max, queue, keepAliveSec, null, false, false, false); } public static ThreadPoolExecutor buildThreadPoolExecutor(ThreadPoolExecutor tpe, String tpeName, ThreadingMode threadingMode, int core, int max, int queue, long keepAliveSec, RejectedExecutionHandler rejectedExecutionHandler, boolean prestartAllCoreThreads, boolean allowCoreThreadTimeOut, boolean isSingleton) { + boolean useVirtualThread = false; switch (threadingMode) { - case CPU:// use CPU core + 1 when application is CPU bound - core = CPU_CORE + 1; - max = CPU_CORE + 1; - break; - case IO:// use CPU core x 2 + 1 when application is I/O bound - core = CPU_CORE * 2 + 1; - max = CPU_CORE * 2 + 1; - break; - case Mixed:// manual config is required when it is mixed + case Mixed, VirtualThread -> {// manual config is required when it is mixed if (core < 1) { core = CPU_CORE * 2 + 1; } @@ -614,7 +617,15 @@ public static ThreadPoolExecutor buildThreadPoolExecutor(ThreadPoolExecutor tpe, //helper.addError("BizExecutor.MaxSize should not less than BizExecutor.CoreSize"); max = core; } - break; + } + case CPU -> {// use CPU core + 1 when application is CPU bound + core = CPU_CORE + 1; + max = CPU_CORE + 1; + } + case IO -> {// use CPU core x 2 + 1 when application is I/O bound + core = CPU_CORE * 2 + 1; + max = CPU_CORE * 2 + 1; + } } boolean isQueueChanged = false; @@ -627,9 +638,13 @@ public static ThreadPoolExecutor buildThreadPoolExecutor(ThreadPoolExecutor tpe, //backup old ThreadPoolExecutor old = tpe; //create new - BlockingQueue bq = queue > 0 ? new LinkedBlockingQueue<>(queue) : new EmptyBlockingQueue(); - tpe = new ThreadPoolExecutor(core, max, keepAliveSec, TimeUnit.SECONDS, bq, - new NamedDefaultThreadFactory(tpeName), rejectedExecutionHandler);//.DiscardOldestPolicy() + ThreadFactory factory = NamedDefaultThreadFactory.build(tpeName, useVirtualThread); + BlockingQueue workQueue = queue > 0 ? new LinkedBlockingQueue<>(queue) : new EmptyBlockingQueue(); + if (rejectedExecutionHandler == null) { + rejectedExecutionHandler = new AbortPolicyWithReport(tpeName); + } + tpe = new ThreadPoolExecutor(core, max, keepAliveSec, TimeUnit.SECONDS, workQueue, + factory, rejectedExecutionHandler);//.DiscardOldestPolicy() // then shotdown old tpe if (old != null) { old.shutdown(); diff --git a/src/main/java/org/summerboot/jexpress/boot/config/NamedDefaultThreadFactory.java b/src/main/java/org/summerboot/jexpress/boot/config/NamedDefaultThreadFactory.java index bc680e6..1f6aae7 100644 --- a/src/main/java/org/summerboot/jexpress/boot/config/NamedDefaultThreadFactory.java +++ b/src/main/java/org/summerboot/jexpress/boot/config/NamedDefaultThreadFactory.java @@ -7,19 +7,17 @@ public class NamedDefaultThreadFactory implements ThreadFactory { protected static final AtomicInteger poolNumber = new AtomicInteger(1); protected final ThreadGroup group; - protected final AtomicInteger threadNumber = new AtomicInteger(1); + protected final AtomicInteger threadCounter = new AtomicInteger(1); protected final String namePrefix; - public NamedDefaultThreadFactory(String name) { + private NamedDefaultThreadFactory(String tpeName) { group = Thread.currentThread().getThreadGroup(); - namePrefix = name + "-" - + poolNumber.getAndIncrement() - + "-thread-"; + namePrefix = tpeName; } @Override public Thread newThread(Runnable r) { - Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); + Thread t = new Thread(group, r, namePrefix + threadCounter.getAndIncrement(), 0); if (t.isDaemon()) { t.setDaemon(false); } @@ -28,4 +26,11 @@ public Thread newThread(Runnable r) { } return t; } + + public static ThreadFactory build(String tpeName, boolean useVirtualThread) { + String namePrefix = tpeName + "-" + + poolNumber.getAndIncrement() + + (useVirtualThread ? "-vt-" : "-pt-"); + return new NamedDefaultThreadFactory(namePrefix); + } } diff --git a/src/main/java/org/summerboot/jexpress/boot/instrumentation/jmx/ServerStatus.java b/src/main/java/org/summerboot/jexpress/boot/instrumentation/jmx/ServerStatus.java index ce33cc9..ff71b5e 100644 --- a/src/main/java/org/summerboot/jexpress/boot/instrumentation/jmx/ServerStatus.java +++ b/src/main/java/org/summerboot/jexpress/boot/instrumentation/jmx/ServerStatus.java @@ -45,7 +45,7 @@ public class ServerStatus extends NotificationBroadcasterSupport implements NIOS protected static final DateTimeFormatter DTF = DateTimeFormatter.ISO_LOCAL_DATE_TIME;//DateTimeFormatter.ofPattern("yyyy-MM-dd E HH:mm:ss"); - protected static final ExecutorService QPS_SERVICE = Executors.newSingleThreadExecutor(new NamedDefaultThreadFactory("ServerStatus")); + protected static final ExecutorService QPS_SERVICE = Executors.newSingleThreadExecutor(NamedDefaultThreadFactory.build("ServerStatus", true)); protected final LinkedList events; diff --git a/src/main/java/org/summerboot/jexpress/integration/cache/BootCache_RedisImple.java b/src/main/java/org/summerboot/jexpress/integration/cache/BootCache_RedisImple.java index fcfe6e1..5157879 100644 --- a/src/main/java/org/summerboot/jexpress/integration/cache/BootCache_RedisImple.java +++ b/src/main/java/org/summerboot/jexpress/integration/cache/BootCache_RedisImple.java @@ -19,11 +19,10 @@ import com.google.inject.Singleton; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.summerboot.jexpress.boot.config.NamedDefaultThreadFactory; +import org.summerboot.jexpress.boot.config.BootConfig; import org.summerboot.jexpress.integration.cache.domain.FlashSale; import org.summerboot.jexpress.integration.smtp.PostOffice; import org.summerboot.jexpress.integration.smtp.SMTPClientConfig; -import org.summerboot.jexpress.nio.server.AbortPolicyWithReport; import redis.clients.jedis.Jedis; import redis.clients.jedis.Pipeline; import redis.clients.jedis.Response; @@ -36,7 +35,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -86,7 +84,9 @@ public class BootCache_RedisImple implements AuthTokenCache, BootCache { protected static final Logger log = LogManager.getLogger(BootCache_RedisImple.class.getName()); - protected static final ThreadPoolExecutor tpe = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1), new NamedDefaultThreadFactory("Redis"), new AbortPolicyWithReport("Cahce.BackofficeExecutor")); + protected static final ThreadPoolExecutor tpe = BootConfig.buildThreadPoolExecutor(null, "Redis", BootConfig.ThreadingMode.VirtualThread, + 1, 1, 1, 60, null, + false, false, false); protected static final RuntimeException REDIS_MASTER_NULL_EX = new RuntimeException("Redis master is null"); diff --git a/src/main/java/org/summerboot/jexpress/integration/httpclient/HttpClientConfig.java b/src/main/java/org/summerboot/jexpress/integration/httpclient/HttpClientConfig.java index effcbc8..4345d60 100644 --- a/src/main/java/org/summerboot/jexpress/integration/httpclient/HttpClientConfig.java +++ b/src/main/java/org/summerboot/jexpress/integration/httpclient/HttpClientConfig.java @@ -24,7 +24,6 @@ import org.summerboot.jexpress.boot.config.annotation.Config; import org.summerboot.jexpress.boot.config.annotation.ConfigHeader; import org.summerboot.jexpress.boot.instrumentation.HTTPClientStatusListener; -import org.summerboot.jexpress.nio.server.AbortPolicyWithReport; import org.summerboot.jexpress.security.SSLUtil; import javax.net.ssl.KeyManager; @@ -167,6 +166,12 @@ protected void generateTemplate_truststore(StringBuilder sb) { @Config(key = "httpclient.timeout.ms", desc = "The maximum time to wait from the beginning of the connection establishment until the server sends data back, this is the end-to-end timeout.") protected volatile long httpClientTimeoutMs = 5000; + @Config(key = "httpclient.executor.mode", defaultValue = "VirtualThread", + desc = "valid value = VirtualThread (default for Java 21+), CPU, IO and Mixed (default for old Java) \n use CPU core + 1 when application is CPU bound\n" + + "use CPU core x 2 + 1 when application is I/O bound\n" + + "need to find the best value based on your performance test result when nio.server.BizExecutor.mode=Mixed") + protected volatile ThreadingMode tpeThreadingMode = ThreadingMode.VirtualThread; + @Config(key = "httpclient.executor.CoreSize", predefinedValue = "0", desc = "CoreSize 0 = current computer/VM's available processors x 2 + 1") protected volatile int httpClientCoreSize = BootConstant.CPU_CORE * 2 + 1;// how many tasks running at the same time @@ -252,15 +257,7 @@ protected void loadCustomizedConfigs(File cfgFile, boolean isReal, ConfigUtil he } // 3.3 HTTP Client Executor - if (httpClientCoreSize <= 0) { - httpClientCoreSize = BootConstant.CPU_CORE * 2 + 1; - } - if (httpClientMaxSize <= 0) { - httpClientMaxSize = BootConstant.CPU_CORE * 2 + 1; - } - if (httpClientMaxSize < httpClientCoreSize) { - helper.addError("MaxSize should not less than CoreSize"); - } + // -Djdk.httpclient.keepalive.timeout=99999 //System.setProperty("jdk.httpclient.keepalive.timeout", "99999"); //System.setProperty("jdk.httpclient.connectionPoolSize", "1"); @@ -277,8 +274,8 @@ protected void loadCustomizedConfigs(File cfgFile, boolean isReal, ConfigUtil he ThreadPoolExecutor old = tpe; int currentTpeHashCode = old == null ? -1 : old.hashCode(); - tpe = buildThreadPoolExecutor(old, "HttpClient", ThreadingMode.Mixed, - httpClientCoreSize, httpClientMaxSize, httpClientQueueSize, tpeKeepAliveSeconds, new AbortPolicyWithReport("HttpClientExecutor"), + tpe = buildThreadPoolExecutor(old, "HttpClient", tpeThreadingMode, + httpClientCoreSize, httpClientMaxSize, httpClientQueueSize, tpeKeepAliveSeconds, null, prestartAllCoreThreads, allowCoreThreadTimeOut, false); boolean isHttpClientSettingsChanged = tpe.hashCode() != currentTpeHashCode; // 1. save @@ -323,7 +320,7 @@ httpClientCoreSize, httpClientMaxSize, httpClientQueueSize, tpeKeepAliveSeconds, } httpClient = builder.build(); // 3. register new - ses = Executors.newSingleThreadScheduledExecutor(new NamedDefaultThreadFactory("HttpClient.QPS_SERVICE")); + ses = Executors.newSingleThreadScheduledExecutor(NamedDefaultThreadFactory.build("HttpClient.QPS_SERVICE", tpeThreadingMode.equals(ThreadingMode.VirtualThread))); ses.scheduleAtFixedRate(() -> { int queue = tpe.getQueue().size(); int active = tpe.getActiveCount(); diff --git a/src/main/java/org/summerboot/jexpress/nio/grpc/GRPCServer.java b/src/main/java/org/summerboot/jexpress/nio/grpc/GRPCServer.java index 3d195c6..235d93a 100644 --- a/src/main/java/org/summerboot/jexpress/nio/grpc/GRPCServer.java +++ b/src/main/java/org/summerboot/jexpress/nio/grpc/GRPCServer.java @@ -66,7 +66,7 @@ public GRPCServiceCounter getServiceCounter() { return serviceCounter; } - public GRPCServer(String bindingAddr, int port, KeyManagerFactory kmf, TrustManagerFactory tmf, ServerInterceptor serverInterceptor, ThreadPoolExecutor tpe, NIOStatusListener nioListener) { + public GRPCServer(String bindingAddr, int port, KeyManagerFactory kmf, TrustManagerFactory tmf, ServerInterceptor serverInterceptor, ThreadPoolExecutor tpe, boolean useVirtualThread, NIOStatusListener nioListener) { this.bindingAddr = bindingAddr; this.port = port; serverCredentials = initTLS(kmf, tmf); @@ -79,7 +79,7 @@ public GRPCServer(String bindingAddr, int port, KeyManagerFactory kmf, TrustMana serverBuilder.intercept(serverInterceptor); } serverBuilder.executor(tpe); - initThreadPool(tpe, nioListener, bindingAddr, port); + initThreadPool(tpe, useVirtualThread, nioListener, bindingAddr, port); } protected ServerCredentials initTLS(KeyManagerFactory kmf, TrustManagerFactory tmf) { @@ -99,14 +99,14 @@ protected ServerCredentials initTLS(KeyManagerFactory kmf, TrustManagerFactory t * @param nioListener * @return */ - protected GRPCServiceCounter initThreadPool(ThreadPoolExecutor tpe, NIOStatusListener nioListener, String bindingAddr, int port) { + protected GRPCServiceCounter initThreadPool(ThreadPoolExecutor tpe, boolean useVirtualThread, NIOStatusListener nioListener, String bindingAddr, int port) { int interval = 1; final AtomicReference lastBizHitRef = new AtomicReference<>(); lastBizHitRef.set(-1L); long totalChannel = -1;//NioServerContext.COUNTER_TOTAL_CHANNEL.get(); long activeChannel = -1;//NioServerContext.COUNTER_ACTIVE_CHANNEL.get(); ScheduledExecutorService old2 = statusReporter; - statusReporter = Executors.newSingleThreadScheduledExecutor(new NamedDefaultThreadFactory("gRPC.QPS_SERVICE@" + bindingAddr + ":" + port)); + statusReporter = Executors.newSingleThreadScheduledExecutor(NamedDefaultThreadFactory.build("Netty-gRPC.QPS_SERVICE@" + bindingAddr + ":" + port, useVirtualThread)); final AtomicLong lastChecksum = new AtomicLong(0); String appInfo = "gRPC@" + BootConstant.VERSION + " " + BootConstant.PID; statusReporter.scheduleAtFixedRate(() -> { diff --git a/src/main/java/org/summerboot/jexpress/nio/grpc/GRPCServerConfig.java b/src/main/java/org/summerboot/jexpress/nio/grpc/GRPCServerConfig.java index b01accb..a0a3cb3 100644 --- a/src/main/java/org/summerboot/jexpress/nio/grpc/GRPCServerConfig.java +++ b/src/main/java/org/summerboot/jexpress/nio/grpc/GRPCServerConfig.java @@ -22,7 +22,6 @@ import org.summerboot.jexpress.boot.config.ConfigUtil; import org.summerboot.jexpress.boot.config.annotation.Config; import org.summerboot.jexpress.boot.config.annotation.ConfigHeader; -import org.summerboot.jexpress.nio.server.AbortPolicyWithReport; import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.TrustManagerFactory; @@ -62,9 +61,11 @@ protected GRPCServerConfig() { @Config(key = ID + ".autostart", defaultValue = "true") protected volatile boolean autoStart; - @Config(key = ID + ".pool.BizExecutor.mode", defaultValue = "Mixed", - desc = "valid value = CPU (default), IO, Mixed") - protected volatile ThreadingMode tpeThreadingMode = ThreadingMode.Mixed; + @Config(key = ID + ".pool.BizExecutor.mode", defaultValue = "VirtualThread", + desc = "valid value = VirtualThread (default for Java 21+), CPU, IO and Mixed (default for old Java) \n use CPU core + 1 when application is CPU bound\n" + + "use CPU core x 2 + 1 when application is I/O bound\n" + + "need to find the best value based on your performance test result when nio.server.BizExecutor.mode=Mixed") + protected volatile ThreadingMode tpeThreadingMode = ThreadingMode.VirtualThread; @Config(key = ID + ".pool.coreSize", predefinedValue = "0", desc = "coreSize 0 = current computer/VM's available processors x 2 + 1") @@ -133,8 +134,8 @@ protected void preLoad(File cfgFile, boolean isReal, ConfigUtil helper, Properti @Override protected void loadCustomizedConfigs(File cfgFile, boolean isReal, ConfigUtil helper, Properties props) throws IOException { - tpe = buildThreadPoolExecutor(tpe, "gRPC.Biz", tpeThreadingMode, - tpeCore, tpeMax, tpeQueue, tpeKeepAliveSeconds, new AbortPolicyWithReport("gRPCThreadPoolExecutor"), + tpe = buildThreadPoolExecutor(tpe, "Netty-gRPC.Biz", tpeThreadingMode, + tpeCore, tpeMax, tpeQueue, tpeKeepAliveSeconds, null, prestartAllCoreThreads, allowCoreThreadTimeOut, true); } diff --git a/src/main/java/org/summerboot/jexpress/nio/server/BootWebSocketHandler.java b/src/main/java/org/summerboot/jexpress/nio/server/BootWebSocketHandler.java index eb5a104..fd1c485 100644 --- a/src/main/java/org/summerboot/jexpress/nio/server/BootWebSocketHandler.java +++ b/src/main/java/org/summerboot/jexpress/nio/server/BootWebSocketHandler.java @@ -102,7 +102,7 @@ protected void channelRead0(ChannelHandlerContext ctx, BinaryWebSocketFrame msg) clients.remove(ctx.channel()); ctx.writeAndFlush(MSG_AUTH_FAILED.retainedDuplicate()); ctx.close(); - log.info("Binary auth failed " + ctx.channel().remoteAddress()); + log.warn("Binary auth failed " + ctx.channel().remoteAddress()); return; } String responseText = onMessage(ctx, caller, data); @@ -123,7 +123,7 @@ protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) t clients.remove(ctx.channel()); ctx.writeAndFlush(MSG_AUTH_FAILED.retainedDuplicate()); ctx.close(); - log.info("Text auth failed " + ctx.channel().remoteAddress() + ": " + txt); + log.warn("Text auth failed " + ctx.channel().remoteAddress() + ": " + txt); return; } ctx.channel().attr(KEY_CALLER).set(caller); @@ -192,18 +192,18 @@ public static void sendToAllChannels(WebSocketFrame message, boolean auth) { @Override public void handlerAdded(ChannelHandlerContext ctx) { clients.add(ctx.channel()); - log.debug(() -> "handlerAdded: " + ctx.channel().remoteAddress()); + log.trace(() -> "handlerAdded: " + ctx.channel().remoteAddress()); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { - log.debug(() -> "channelActive: " + ctx.channel().remoteAddress()); + log.trace(() -> "channelActive: " + ctx.channel().remoteAddress()); } @Override public void handlerRemoved(ChannelHandlerContext ctx) { clients.remove(ctx.channel()); - log.debug(() -> "handlerRemoved: " + ctx.channel().remoteAddress()); + log.trace(() -> "handlerRemoved: " + ctx.channel().remoteAddress()); } @Override diff --git a/src/main/java/org/summerboot/jexpress/nio/server/NioConfig.java b/src/main/java/org/summerboot/jexpress/nio/server/NioConfig.java index b24ef37..b191316 100644 --- a/src/main/java/org/summerboot/jexpress/nio/server/NioConfig.java +++ b/src/main/java/org/summerboot/jexpress/nio/server/NioConfig.java @@ -176,20 +176,23 @@ protected void generateTemplate_keystore(StringBuilder sb) { protected volatile int httpServerCodec_MaxChunkSize = 8192; @ConfigHeader(title = "4.2 Netty Performance - NIO and Biz Exector Pool") + @Config(key = "nio.server.EventLoopGroup.Acceptor.useVirtualThread", defaultValue = "false") + protected volatile boolean nioEventLoopGroupAcceptorUseVirtualThread = false; @Config(key = "nio.server.EventLoopGroup.AcceptorSize", defaultValue = "0", desc = "AcceptorSize 0 = number of bindings") protected volatile int nioEventLoopGroupAcceptorSize = 0; + @Config(key = "nio.server.EventLoopGroup.Worker.useVirtualThread", defaultValue = "false") + protected volatile boolean nioEventLoopGroupWorkerUseVirtualThread = false; @Config(key = "nio.server.EventLoopGroup.WorkerSize", predefinedValue = "0", desc = "WorkerSize 0 = current computer/VM's available processors x 2 + 1") protected volatile int nioEventLoopGroupWorkerSize = BootConstant.CPU_CORE * 2 + 1; - //protected volatile int nioEventLoopGroupExecutorSize; - @Config(key = "nio.server.BizExecutor.mode", defaultValue = "Mixed", - desc = "valid value = CPU, IO and Mixed (default) \nuse CPU core + 1 when application is CPU bound\n" + @Config(key = "nio.server.BizExecutor.mode", defaultValue = "VirtualThread", + desc = "valid value = VirtualThread (default for Java 21+), CPU, IO and Mixed (default for old Java) \n use CPU core + 1 when application is CPU bound\n" + "use CPU core x 2 + 1 when application is I/O bound\n" + "need to find the best value based on your performance test result when nio.server.BizExecutor.mode=Mixed") - protected volatile ThreadingMode tpeThreadingMode = ThreadingMode.Mixed; + protected volatile ThreadingMode tpeThreadingMode = ThreadingMode.VirtualThread; @Config(key = "nio.server.BizExecutor.CoreSize", predefinedValue = "0", desc = "CoreSize 0 = current computer/VM's available processors x 2 + 1") @@ -415,8 +418,8 @@ protected void loadCustomizedConfigs(File cfgFile, boolean isReal, ConfigUtil he nioEventLoopGroupWorkerSize = CPU_CORE * 2 + 1; } - tpe = buildThreadPoolExecutor(tpe, "NIO.Biz", tpeThreadingMode, - tpeCore, tpeMax, tpeQueue, tpeKeepAliveSeconds, new AbortPolicyWithReport("NIOBizThreadPoolExecutor"), + tpe = buildThreadPoolExecutor(tpe, "Netty-HTTP.Biz", tpeThreadingMode, + tpeCore, tpeMax, tpeQueue, tpeKeepAliveSeconds, null, prestartAllCoreThreads, allowCoreThreadTimeOut, false); BeanUtil.init(jsonParserTimeZone, fromJsonFailOnUnknownProperties, fromJsonCaseInsensitive, toJsonPretty, toJsonIgnoreNull); @@ -579,10 +582,18 @@ public int getNioEventLoopGroupAcceptorSize() { return nioEventLoopGroupAcceptorSize; } + public boolean isNioEventLoopGroupAcceptorUseVirtualThread() { + return nioEventLoopGroupAcceptorUseVirtualThread; + } + public int getNioEventLoopGroupWorkerSize() { return nioEventLoopGroupWorkerSize; } + public boolean isNioEventLoopGroupWorkerUseVirtualThread() { + return nioEventLoopGroupWorkerUseVirtualThread; + } + public ThreadingMode getTpeThreadingMode() { return tpeThreadingMode; } @@ -599,6 +610,18 @@ public int getTpeQueue() { return tpeQueue; } + public int getTpeKeepAliveSeconds() { + return tpeKeepAliveSeconds; + } + + public boolean isPrestartAllCoreThreads() { + return prestartAllCoreThreads; + } + + public boolean isAllowCoreThreadTimeOut() { + return allowCoreThreadTimeOut; + } + public long getBizTimeoutWarnThresholdMs() { return bizTimeoutWarnThresholdMs; } diff --git a/src/main/java/org/summerboot/jexpress/nio/server/NioServer.java b/src/main/java/org/summerboot/jexpress/nio/server/NioServer.java index eb6cf14..085511e 100644 --- a/src/main/java/org/summerboot/jexpress/nio/server/NioServer.java +++ b/src/main/java/org/summerboot/jexpress/nio/server/NioServer.java @@ -41,6 +41,7 @@ import org.apache.logging.log4j.Logger; import org.summerboot.jexpress.boot.BackOffice; import org.summerboot.jexpress.boot.BootConstant; +import org.summerboot.jexpress.boot.config.BootConfig; import org.summerboot.jexpress.boot.config.NamedDefaultThreadFactory; import org.summerboot.jexpress.boot.instrumentation.NIOStatusListener; @@ -135,11 +136,12 @@ public void bind(NioConfig nioCfg, StringBuilder memo) throws InterruptedExcepti // Configure the server. //boss and work groups + int bossSize = nioCfg.getNioEventLoopGroupAcceptorSize(); int workerSize = nioCfg.getNioEventLoopGroupWorkerSize(); Class serverChannelClass; - ThreadFactory threadFactoryBoss = new NamedDefaultThreadFactory("NIO.Boss"); - ThreadFactory threadFactoryWorker = new NamedDefaultThreadFactory("NIO.Worker"); + ThreadFactory threadFactoryBoss = NamedDefaultThreadFactory.build("Netty-HTTP.Boss", nioCfg.isNioEventLoopGroupAcceptorUseVirtualThread()); + ThreadFactory threadFactoryWorker = NamedDefaultThreadFactory.build("Netty-HTTP.Worker", nioCfg.isNioEventLoopGroupWorkerUseVirtualThread()); if (Epoll.isAvailable() && (IoMultiplexer.AVAILABLE.equals(multiplexer) || IoMultiplexer.EPOLL.equals(multiplexer))) { bossGroup = bossSize < 1 ? new EpollEventLoopGroup() : new EpollEventLoopGroup(bossSize, threadFactoryBoss); workerGroup = workerSize < 1 ? new EpollEventLoopGroup() : new EpollEventLoopGroup(workerSize, threadFactoryWorker); @@ -220,7 +222,8 @@ public void bind(NioConfig nioCfg, StringBuilder memo) throws InterruptedExcepti if (nioListener != null || log.isDebugEnabled()) { final AtomicLong lastChecksum = new AtomicLong(0); int interval = 1; - QPS_SERVICE = Executors.newSingleThreadScheduledExecutor(new NamedDefaultThreadFactory("NIO.QPS_SERVICE")); + boolean useVirtualThread = nioCfg.getTpeThreadingMode().equals(BootConfig.ThreadingMode.VirtualThread); + QPS_SERVICE = Executors.newSingleThreadScheduledExecutor(NamedDefaultThreadFactory.build("NIO.QPS_SERVICE", useVirtualThread)); QPS_SERVICE.scheduleAtFixedRate(() -> { long hps = NioCounter.COUNTER_HIT.getAndSet(0); long tps = NioCounter.COUNTER_SENT.getAndSet(0); diff --git a/src/main/java/org/summerboot/jexpress/nio/server/ws/rs/BootController.java b/src/main/java/org/summerboot/jexpress/nio/server/ws/rs/BootController.java index 5e9bf16..19961b7 100644 --- a/src/main/java/org/summerboot/jexpress/nio/server/ws/rs/BootController.java +++ b/src/main/java/org/summerboot/jexpress/nio/server/ws/rs/BootController.java @@ -389,20 +389,31 @@ public Caller longin_JSON(@Nonnull LoginVo loginVo, return login(auth, loginVo.getUsername(), loginVo.getPassword(), context); } - public static Caller login(Authenticator auth, String userId, String password, ServiceContext context) throws NamingException { + public Caller login(Authenticator auth, String userId, String password, ServiceContext context) throws NamingException { if (auth == null) { context.error(new Err(BootErrorCode.ACCESS_BASE, null, null, null, "Authenticator not provided")).status(HttpResponseStatus.NOT_IMPLEMENTED); return null; } + if (!preLogin(userId, password, context)) { + return null; + } String jwt = auth.signJWT(userId, password, null, AuthConfig.cfg.getJwtTTLMinutes(), context); if (jwt == null) { context.status(HttpResponseStatus.UNAUTHORIZED); } else { context.responseHeader(Config.X_AUTH_TOKEN, jwt).status(HttpResponseStatus.CREATED); } + postLogin(context); return context.caller(); } + protected boolean preLogin(String userId, String password, ServiceContext context) { + return true; + } + + protected void postLogin(ServiceContext context) { + } + @Operation( tags = {TAG_USER_AUTH}, summary = "User logout",