diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java index 5df5cc8fa1a..0cb74725f61 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java @@ -16,6 +16,9 @@ */ package org.apache.rocketmq.client.consumer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.consumer.listener.MessageListener; @@ -170,6 +173,22 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume */ private long adjustThreadPoolNumsThreshold = 100000; + /** + * Concurrently max span offset.it has no effect on sequential consumption + * Lifecycle managed by the provider + */ + private ScheduledExecutorService consumeMessageScheduledExecutor; + /** + * Thread pool for handling expired messages + * Lifecycle managed by the provider + */ + private ScheduledExecutorService cleanExpireMsgScheduledExecutor; + /** + * Thread pool for handling normal messages + * Lifecycle managed by the provider + */ + private ExecutorService consumeExecutor; + /** * Concurrently max span offset.it has no effect on sequential consumption */ @@ -927,6 +946,30 @@ public void setAdjustThreadPoolNumsThreshold(long adjustThreadPoolNumsThreshold) this.adjustThreadPoolNumsThreshold = adjustThreadPoolNumsThreshold; } + public ScheduledExecutorService getConsumeMessageScheduledExecutor() { + return consumeMessageScheduledExecutor; + } + + public void setConsumeMessageScheduledExecutor(ScheduledExecutorService consumeMessageScheduledExecutor) { + this.consumeMessageScheduledExecutor = consumeMessageScheduledExecutor; + } + + public ScheduledExecutorService getCleanExpireMsgScheduledExecutor() { + return cleanExpireMsgScheduledExecutor; + } + + public void setCleanExpireMsgScheduledExecutor(ScheduledExecutorService cleanExpireMsgScheduledExecutor) { + this.cleanExpireMsgScheduledExecutor = cleanExpireMsgScheduledExecutor; + } + + public ExecutorService getConsumeExecutor() { + return consumeExecutor; + } + + public void setConsumeExecutor(ExecutorService consumeExecutor) { + this.consumeExecutor = consumeExecutor; + } + public int getMaxReconsumeTimes() { return maxReconsumeTimes; } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java index b151fefbbb3..5d6f7c94e8a 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; @@ -54,11 +55,14 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService private final DefaultMQPushConsumer defaultMQPushConsumer; private final MessageListenerConcurrently messageListener; private final BlockingQueue consumeRequestQueue; - private final ThreadPoolExecutor consumeExecutor; + private final ExecutorService consumeExecutor; + private final boolean isConsumeExecutorFromUser; private final String consumerGroup; private final ScheduledExecutorService scheduledExecutorService; + private final boolean isScheduledExecutorServiceFromUser; private final ScheduledExecutorService cleanExpireMsgExecutors; + private final boolean isCleanExpireMsgExecutorsFromUser; public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl, MessageListenerConcurrently messageListener) { @@ -68,18 +72,36 @@ public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPush this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer(); this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup(); this.consumeRequestQueue = new LinkedBlockingQueue<>(); - String consumerGroupTag = (consumerGroup.length() > 100 ? consumerGroup.substring(0, 100) : consumerGroup) + "_"; - this.consumeExecutor = new ThreadPoolExecutor( - this.defaultMQPushConsumer.getConsumeThreadMin(), - this.defaultMQPushConsumer.getConsumeThreadMax(), - 1000 * 60, - TimeUnit.MILLISECONDS, - this.consumeRequestQueue, - new ThreadFactoryImpl("ConsumeMessageThread_" + consumerGroupTag)); - - this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_" + consumerGroupTag)); - this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_" + consumerGroupTag)); + if (this.defaultMQPushConsumer.getConsumeExecutor() != null) { + this.consumeExecutor = this.defaultMQPushConsumer.getConsumeExecutor(); + this.isConsumeExecutorFromUser = true; + } else { + this.isConsumeExecutorFromUser = false; + this.consumeExecutor = new ThreadPoolExecutor( + this.defaultMQPushConsumer.getConsumeThreadMin(), + this.defaultMQPushConsumer.getConsumeThreadMax(), + 1000 * 60, + TimeUnit.MILLISECONDS, + this.consumeRequestQueue, + new ThreadFactoryImpl("ConsumeMessageThread_" + consumerGroupTag)); + } + + if (this.defaultMQPushConsumer.getConsumeMessageScheduledExecutor() != null) { + this.isScheduledExecutorServiceFromUser = true; + this.scheduledExecutorService = this.defaultMQPushConsumer.getConsumeMessageScheduledExecutor(); + } else { + this.isScheduledExecutorServiceFromUser = false; + this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_" + consumerGroupTag)); + } + + if (this.defaultMQPushConsumer.getCleanExpireMsgScheduledExecutor() != null) { + this.isCleanExpireMsgExecutorsFromUser = true; + this.cleanExpireMsgExecutors = this.defaultMQPushConsumer.getCleanExpireMsgScheduledExecutor(); + } else { + this.isCleanExpireMsgExecutorsFromUser = false; + this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_" + consumerGroupTag)); + } } public void start() { @@ -98,9 +120,15 @@ public void run() { } public void shutdown(long awaitTerminateMillis) { - this.scheduledExecutorService.shutdown(); - ThreadUtils.shutdownGracefully(this.consumeExecutor, awaitTerminateMillis, TimeUnit.MILLISECONDS); - this.cleanExpireMsgExecutors.shutdown(); + if (!isScheduledExecutorServiceFromUser) { + this.scheduledExecutorService.shutdown(); + } + if (!isConsumeExecutorFromUser) { + ThreadUtils.shutdownGracefully(this.consumeExecutor, awaitTerminateMillis, TimeUnit.MILLISECONDS); + } + if (!isCleanExpireMsgExecutorsFromUser) { + this.cleanExpireMsgExecutors.shutdown(); + } } @Override @@ -108,7 +136,9 @@ public void updateCorePoolSize(int corePoolSize) { if (corePoolSize > 0 && corePoolSize <= Short.MAX_VALUE && corePoolSize < this.defaultMQPushConsumer.getConsumeThreadMax()) { - this.consumeExecutor.setCorePoolSize(corePoolSize); + if (consumeExecutor instanceof ThreadPoolExecutor) { + ((ThreadPoolExecutor)this.consumeExecutor).setCorePoolSize(corePoolSize); + } } } @@ -124,7 +154,10 @@ public void decCorePoolSize() { @Override public int getCorePoolSize() { - return this.consumeExecutor.getCorePoolSize(); + if (consumeExecutor instanceof ThreadPoolExecutor) { + return ((ThreadPoolExecutor)this.consumeExecutor).getCorePoolSize(); + } + return -1; } @Override diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java index 3ca465da70d..d954594c2a2 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java @@ -21,6 +21,7 @@ import java.util.HashMap; import java.util.List; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; @@ -58,10 +59,12 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService { private final DefaultMQPushConsumer defaultMQPushConsumer; private final MessageListenerOrderly messageListener; private final BlockingQueue consumeRequestQueue; - private final ThreadPoolExecutor consumeExecutor; + private final ExecutorService consumeExecutor; + private final boolean isConsumeExecutorFromUser; private final String consumerGroup; private final MessageQueueLock messageQueueLock = new MessageQueueLock(); private final ScheduledExecutorService scheduledExecutorService; + private final boolean isScheduledExecutorServiceFromUser; private volatile boolean stopped = false; public ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl, @@ -72,17 +75,28 @@ public ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl defaultMQPushConsu this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer(); this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup(); this.consumeRequestQueue = new LinkedBlockingQueue<>(); - String consumerGroupTag = (consumerGroup.length() > 100 ? consumerGroup.substring(0, 100) : consumerGroup) + "_"; - this.consumeExecutor = new ThreadPoolExecutor( - this.defaultMQPushConsumer.getConsumeThreadMin(), - this.defaultMQPushConsumer.getConsumeThreadMax(), - 1000 * 60, - TimeUnit.MILLISECONDS, - this.consumeRequestQueue, - new ThreadFactoryImpl("ConsumeMessageThread_" + consumerGroupTag)); - - this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_" + consumerGroupTag)); + if (this.defaultMQPushConsumer.getConsumeExecutor() != null) { + this.consumeExecutor = this.defaultMQPushConsumer.getConsumeExecutor(); + this.isConsumeExecutorFromUser = true; + } else { + this.isConsumeExecutorFromUser = false; + this.consumeExecutor = new ThreadPoolExecutor( + this.defaultMQPushConsumer.getConsumeThreadMin(), + this.defaultMQPushConsumer.getConsumeThreadMax(), + 1000 * 60, + TimeUnit.MILLISECONDS, + this.consumeRequestQueue, + new ThreadFactoryImpl("ConsumeMessageThread_" + consumerGroupTag)); + } + + if (this.defaultMQPushConsumer.getConsumeMessageScheduledExecutor() != null) { + this.scheduledExecutorService = this.defaultMQPushConsumer.getConsumeMessageScheduledExecutor(); + isScheduledExecutorServiceFromUser = true; + } else { + this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_" + consumerGroupTag)); + isScheduledExecutorServiceFromUser = false; + } } @Override @@ -104,8 +118,12 @@ public void run() { @Override public void shutdown(long awaitTerminateMillis) { this.stopped = true; - this.scheduledExecutorService.shutdown(); - ThreadUtils.shutdownGracefully(this.consumeExecutor, awaitTerminateMillis, TimeUnit.MILLISECONDS); + if (!isScheduledExecutorServiceFromUser) { + this.scheduledExecutorService.shutdown(); + } + if (!isConsumeExecutorFromUser) { + ThreadUtils.shutdownGracefully(this.consumeExecutor, awaitTerminateMillis, TimeUnit.MILLISECONDS); + } if (MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) { this.unlockAllMQ(); } @@ -120,7 +138,9 @@ public void updateCorePoolSize(int corePoolSize) { if (corePoolSize > 0 && corePoolSize <= Short.MAX_VALUE && corePoolSize < this.defaultMQPushConsumer.getConsumeThreadMax()) { - this.consumeExecutor.setCorePoolSize(corePoolSize); + if (consumeExecutor instanceof ThreadPoolExecutor) { + ((ThreadPoolExecutor)this.consumeExecutor).setCorePoolSize(corePoolSize); + } } } @@ -134,7 +154,10 @@ public void decCorePoolSize() { @Override public int getCorePoolSize() { - return this.consumeExecutor.getCorePoolSize(); + if (consumeExecutor instanceof ThreadPoolExecutor) { + return ((ThreadPoolExecutor)this.consumeExecutor).getCorePoolSize(); + } + return -1; } @Override