forked from kangjianwei/LearningJDK
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathThreadPoolExecutor.java
2901 lines (2571 loc) · 123 KB
/
ThreadPoolExecutor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
/*
* This file is available under and governed by the GNU General Public
* License version 2 only, as published by the Free Software Foundation.
* However, the following notice accompanied the original version of this
* file:
*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/
package java.util.concurrent;
import java.util.ArrayList;
import java.util.ConcurrentModificationException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* An {@link ExecutorService} that executes each submitted task using
* one of possibly several pooled threads, normally configured
* using {@link Executors} factory methods.
*
* <p>Thread pools address two different problems: they usually
* provide improved performance when executing large numbers of
* asynchronous tasks, due to reduced per-task invocation overhead,
* and they provide a means of bounding and managing the resources,
* including threads, consumed when executing a collection of tasks.
* Each {@code ThreadPoolExecutor} also maintains some basic
* statistics, such as the number of completed tasks.
*
* <p>To be useful across a wide range of contexts, this class
* provides many adjustable parameters and extensibility
* hooks. However, programmers are urged to use the more convenient
* {@link Executors} factory methods {@link
* Executors#newCachedThreadPool} (unbounded thread pool, with
* automatic thread reclamation), {@link Executors#newFixedThreadPool}
* (fixed size thread pool) and {@link
* Executors#newSingleThreadExecutor} (single background thread), that
* preconfigure settings for the most common usage
* scenarios. Otherwise, use the following guide when manually
* configuring and tuning this class:
*
* <dl>
*
* <dt>Core and maximum pool sizes</dt>
*
* <dd>A {@code ThreadPoolExecutor} will automatically adjust the
* pool size (see {@link #getPoolSize})
* according to the bounds set by
* corePoolSize (see {@link #getCorePoolSize}) and
* maximumPoolSize (see {@link #getMaximumPoolSize}).
*
* When a new task is submitted in method {@link #execute(Runnable)},
* if fewer than corePoolSize threads are running, a new thread is
* created to handle the request, even if other worker threads are
* idle. Else if fewer than maximumPoolSize threads are running, a
* new thread will be created to handle the request only if the queue
* is full. By setting corePoolSize and maximumPoolSize the same, you
* create a fixed-size thread pool. By setting maximumPoolSize to an
* essentially unbounded value such as {@code Integer.MAX_VALUE}, you
* allow the pool to accommodate an arbitrary number of concurrent
* tasks. Most typically, core and maximum pool sizes are set only
* upon construction, but they may also be changed dynamically using
* {@link #setCorePoolSize} and {@link #setMaximumPoolSize}. </dd>
*
* <dt>On-demand construction</dt>
*
* <dd>By default, even core threads are initially created and
* started only when new tasks arrive, but this can be overridden
* dynamically using method {@link #prestartCoreThread} or {@link
* #prestartAllCoreThreads}. You probably want to prestart threads if
* you construct the pool with a non-empty queue. </dd>
*
* <dt>Creating new threads</dt>
*
* <dd>New threads are created using a {@link ThreadFactory}. If not
* otherwise specified, a {@link Executors#defaultThreadFactory} is
* used, that creates threads to all be in the same {@link
* ThreadGroup} and with the same {@code NORM_PRIORITY} priority and
* non-daemon status. By supplying a different ThreadFactory, you can
* alter the thread's name, thread group, priority, daemon status,
* etc. If a {@code ThreadFactory} fails to create a thread when asked
* by returning null from {@code newThread}, the executor will
* continue, but might not be able to execute any tasks. Threads
* should possess the "modifyThread" {@code RuntimePermission}. If
* worker threads or other threads using the pool do not possess this
* permission, service may be degraded: configuration changes may not
* take effect in a timely manner, and a shutdown pool may remain in a
* state in which termination is possible but not completed.</dd>
*
* <dt>Keep-alive times</dt>
*
* <dd>If the pool currently has more than corePoolSize threads,
* excess threads will be terminated if they have been idle for more
* than the keepAliveTime (see {@link #getKeepAliveTime(TimeUnit)}).
* This provides a means of reducing resource consumption when the
* pool is not being actively used. If the pool becomes more active
* later, new threads will be constructed. This parameter can also be
* changed dynamically using method {@link #setKeepAliveTime(long,
* TimeUnit)}. Using a value of {@code Long.MAX_VALUE} {@link
* TimeUnit#NANOSECONDS} effectively disables idle threads from ever
* terminating prior to shut down. By default, the keep-alive policy
* applies only when there are more than corePoolSize threads, but
* method {@link #allowCoreThreadTimeOut(boolean)} can be used to
* apply this time-out policy to core threads as well, so long as the
* keepAliveTime value is non-zero. </dd>
*
* <dt>Queuing</dt>
*
* <dd>Any {@link BlockingQueue} may be used to transfer and hold
* submitted tasks. The use of this queue interacts with pool sizing:
*
* <ul>
*
* <li>If fewer than corePoolSize threads are running, the Executor
* always prefers adding a new thread
* rather than queuing.
*
* <li>If corePoolSize or more threads are running, the Executor
* always prefers queuing a request rather than adding a new
* thread.
*
* <li>If a request cannot be queued, a new thread is created unless
* this would exceed maximumPoolSize, in which case, the task will be
* rejected.
*
* </ul>
*
* There are three general strategies for queuing:
* <ol>
*
* <li><em> Direct handoffs.</em> A good default choice for a work
* queue is a {@link SynchronousQueue} that hands off tasks to threads
* without otherwise holding them. Here, an attempt to queue a task
* will fail if no threads are immediately available to run it, so a
* new thread will be constructed. This policy avoids lockups when
* handling sets of requests that might have internal dependencies.
* Direct handoffs generally require unbounded maximumPoolSizes to
* avoid rejection of new submitted tasks. This in turn admits the
* possibility of unbounded thread growth when commands continue to
* arrive on average faster than they can be processed.
*
* <li><em> Unbounded queues.</em> Using an unbounded queue (for
* example a {@link LinkedBlockingQueue} without a predefined
* capacity) will cause new tasks to wait in the queue when all
* corePoolSize threads are busy. Thus, no more than corePoolSize
* threads will ever be created. (And the value of the maximumPoolSize
* therefore doesn't have any effect.) This may be appropriate when
* each task is completely independent of others, so tasks cannot
* affect each others execution; for example, in a web page server.
* While this style of queuing can be useful in smoothing out
* transient bursts of requests, it admits the possibility of
* unbounded work queue growth when commands continue to arrive on
* average faster than they can be processed.
*
* <li><em>Bounded queues.</em> A bounded queue (for example, an
* {@link ArrayBlockingQueue}) helps prevent resource exhaustion when
* used with finite maximumPoolSizes, but can be more difficult to
* tune and control. Queue sizes and maximum pool sizes may be traded
* off for each other: Using large queues and small pools minimizes
* CPU usage, OS resources, and context-switching overhead, but can
* lead to artificially low throughput. If tasks frequently block (for
* example if they are I/O bound), a system may be able to schedule
* time for more threads than you otherwise allow. Use of small queues
* generally requires larger pool sizes, which keeps CPUs busier but
* may encounter unacceptable scheduling overhead, which also
* decreases throughput.
*
* </ol>
*
* </dd>
*
* <dt>Rejected tasks</dt>
*
* <dd>New tasks submitted in method {@link #execute(Runnable)} will be
* <em>rejected</em> when the Executor has been shut down, and also when
* the Executor uses finite bounds for both maximum threads and work queue
* capacity, and is saturated. In either case, the {@code execute} method
* invokes the {@link
* RejectedExecutionHandler#rejectedExecution(Runnable, ThreadPoolExecutor)}
* method of its {@link RejectedExecutionHandler}. Four predefined handler
* policies are provided:
*
* <ol>
*
* <li>In the default {@link ThreadPoolExecutor.AbortPolicy}, the handler
* throws a runtime {@link RejectedExecutionException} upon rejection.
*
* <li>In {@link ThreadPoolExecutor.CallerRunsPolicy}, the thread
* that invokes {@code execute} itself runs the task. This provides a
* simple feedback control mechanism that will slow down the rate that
* new tasks are submitted.
*
* <li>In {@link ThreadPoolExecutor.DiscardPolicy}, a task that
* cannot be executed is simply dropped.
*
* <li>In {@link ThreadPoolExecutor.DiscardOldestPolicy}, if the
* executor is not shut down, the task at the head of the work queue
* is dropped, and then execution is retried (which can fail again,
* causing this to be repeated.)
*
* </ol>
*
* It is possible to define and use other kinds of {@link
* RejectedExecutionHandler} classes. Doing so requires some care
* especially when policies are designed to work only under particular
* capacity or queuing policies. </dd>
*
* <dt>Hook methods</dt>
*
* <dd>This class provides {@code protected} overridable
* {@link #beforeExecute(Thread, Runnable)} and
* {@link #afterExecute(Runnable, Throwable)} methods that are called
* before and after execution of each task. These can be used to
* manipulate the execution environment; for example, reinitializing
* ThreadLocals, gathering statistics, or adding log entries.
* Additionally, method {@link #terminated} can be overridden to perform
* any special processing that needs to be done once the Executor has
* fully terminated.
*
* <p>If hook, callback, or BlockingQueue methods throw exceptions,
* internal worker threads may in turn fail, abruptly terminate, and
* possibly be replaced.</dd>
*
* <dt>Queue maintenance</dt>
*
* <dd>Method {@link #getQueue()} allows access to the work queue
* for purposes of monitoring and debugging. Use of this method for
* any other purpose is strongly discouraged. Two supplied methods,
* {@link #remove(Runnable)} and {@link #purge} are available to
* assist in storage reclamation when large numbers of queued tasks
* become cancelled.</dd>
*
* <dt>Reclamation</dt>
*
* <dd>A pool that is no longer referenced in a program <em>AND</em>
* has no remaining threads may be reclaimed (garbage collected)
* without being explicitly shutdown. You can configure a pool to
* allow all unused threads to eventually die by setting appropriate
* keep-alive times, using a lower bound of zero core threads and/or
* setting {@link #allowCoreThreadTimeOut(boolean)}. </dd>
*
* </dl>
*
* <p><b>Extension example</b>. Most extensions of this class
* override one or more of the protected hook methods. For example,
* here is a subclass that adds a simple pause/resume feature:
*
* <pre> {@code
* class PausableThreadPoolExecutor extends ThreadPoolExecutor {
* private boolean isPaused;
* private ReentrantLock pauseLock = new ReentrantLock();
* private Condition unpaused = pauseLock.newCondition();
*
* public PausableThreadPoolExecutor(...) { super(...); }
*
* protected void beforeExecute(Thread t, Runnable r) {
* super.beforeExecute(t, r);
* pauseLock.lock();
* try {
* while (isPaused) unpaused.await();
* } catch (InterruptedException ie) {
* t.interrupt();
* } finally {
* pauseLock.unlock();
* }
* }
*
* public void pause() {
* pauseLock.lock();
* try {
* isPaused = true;
* } finally {
* pauseLock.unlock();
* }
* }
*
* public void resume() {
* pauseLock.lock();
* try {
* isPaused = false;
* unpaused.signalAll();
* } finally {
* pauseLock.unlock();
* }
* }
* }}</pre>
*
* @since 1.5
* @author Doug Lea
*/
/*
* ThreadPoolExecutor通常被简称为【线程池】,虽然与ForkJoinPool是兄弟关系,但两者的侧重点完全不同
*
* ThreadPoolExecutor由两部分组成:线程池(pool)与阻塞队列(queue)
* 线程池(workers)中存储Worker,一个Worker兼具【任务】与【线程】的含义
* 阻塞队列(workQueue)中存储排队的【任务】
*
* Worker可分为两类:【R】型Worker和【N】型Worker
* 【R】型Worker携带了【任务】,它被执行完后,转为【N】型Worker。
* 【N】型Worker自身不携带【任务】,它专门负责从阻塞队列中取出任务以便执行。
*
* 【R】型线程:【R】型Worker中的线程域,一对一关系
* 【N】型线程:【N】型Worker中的线程域,一对一关系
*
* ★★★ 1. 在不产生歧义的情形下,下面可能会混用【Worker】与【线程】两词
* ★★★ 2. 在前提1下,这里将:
* 【R】型Worker或【R】型线程简称做【R】
* 【N】型Worker或【N】型线程简称做【N】
*
* 线程池状态:
* -1 【运行】RUNNING : 接收新线程,并可以处理阻塞任务
* 0 【关闭】SHUTDOWN : 不接收新线程,但可以处理阻塞任务,
* 1 【停止】STOP : 不接收新线程,不可以处理阻塞任务,且对正在执行的线程设置中断标记
* 2 【完结】TIDYING : 线程池空闲,阻塞队列为空,在调用terminated()后转入TERMINATED状态
* 3 【终止】TERMINATED: terminated()方法已执行完
*
* 状态转换:
* shutdown() 线程池空闲,阻塞队列为空
* ┌─────────────▶ SHUTDOWN ────────────┐
* │ │ │ terminated()之后
* RUNNING ──┤ │ shutdownNow() ├──▶ TIDYING ───────────────▶ TERMINATED
* │ ▼ │
* └─────────────▶ STOP ────────────┘
* shutdownNow() 线程池空闲
*
*
*/
public class ThreadPoolExecutor extends AbstractExecutorService {
/*
* The main pool control state, ctl, is an atomic integer packing
* two conceptual fields
* workerCount, indicating the effective number of threads
* runState, indicating whether running, shutting down etc
*
* In order to pack them into one int, we limit workerCount to
* (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2
* billion) otherwise representable. If this is ever an issue in
* the future, the variable can be changed to be an AtomicLong,
* and the shift/mask constants below adjusted. But until the need
* arises, this code is a bit faster and simpler using an int.
*
* The workerCount is the number of workers that have been
* permitted to start and not permitted to stop. The value may be
* transiently different from the actual number of live threads,
* for example when a ThreadFactory fails to create a thread when
* asked, and when exiting threads are still performing
* bookkeeping before terminating. The user-visible pool size is
* reported as the current size of the workers set.
*
* The runState provides the main lifecycle control, taking on values:
*
* RUNNING: Accept new tasks and process queued tasks
* SHUTDOWN: Don't accept new tasks, but process queued tasks
* STOP: Don't accept new tasks, don't process queued tasks,
* and interrupt in-progress tasks
* TIDYING: All tasks have terminated, workerCount is zero,
* the thread transitioning to state TIDYING
* will run the terminated() hook method
* TERMINATED: terminated() has completed
*
* The numerical order among these values matters, to allow
* ordered comparisons. The runState monotonically increases over
* time, but need not hit each state. The transitions are:
*
* RUNNING -> SHUTDOWN
* On invocation of shutdown()
* (RUNNING or SHUTDOWN) -> STOP
* On invocation of shutdownNow()
* SHUTDOWN -> TIDYING
* When both queue and pool are empty
* STOP -> TIDYING
* When pool is empty
* TIDYING -> TERMINATED
* When the terminated() hook method has completed
*
* Threads waiting in awaitTermination() will return when the
* state reaches TERMINATED.
*
* Detecting the transition from SHUTDOWN to TIDYING is less
* straightforward than you'd like because the queue may become
* empty after non-empty and vice versa during SHUTDOWN state, but
* we can only terminate if, after seeing that it is empty, we see
* that workerCount is 0 (which sometimes entails a recheck -- see
* below).
*/
// 线程池状态标记,运行状态初始为RUNNING,工作线程数量初始为0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
/* runState is stored in the high-order bits */
// 线程池状态标记掩码
private static final int COUNT_BITS = Integer.SIZE - 3; // 29
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1; // 0b-0x 0001-FFF FFFF
// 由线程池状态标记的前3个比特位记录线程池的运行状态
private static final int RUNNING = -1 << COUNT_BITS; // 0b-0x 1110-000 0000,【运行】
private static final int SHUTDOWN = 0 << COUNT_BITS; // 0b-0x 0000-000 0000,【关闭】
private static final int STOP = 1 << COUNT_BITS; // 0b-0x 0010-000 0000,【停止】
private static final int TIDYING = 2 << COUNT_BITS; // 0b-0x 0100-000 0000,【完结】
private static final int TERMINATED = 3 << COUNT_BITS; // 0b-0x 0110-000 0000,【终止】
private static final boolean ONLY_ONE = true;
/**
* Lock held on access to workers set and related bookkeeping.
* While we could use a concurrent set of some sort, it turns out
* to be generally preferable to use a lock. Among the reasons is
* that this serializes interruptIdleWorkers, which avoids
* unnecessary interrupt storms, especially during shutdown.
* Otherwise exiting threads would concurrently interrupt those
* that have not yet interrupted. It also simplifies some of the
* associated statistics bookkeeping of largestPoolSize etc. We
* also hold mainLock on shutdown and shutdownNow, for the sake of
* ensuring workers set is stable while separately checking
* permission to interrupt and actually interrupting.
*/
// 线程池锁,线程池状态标记发生变化时使用
private final ReentrantLock mainLock = new ReentrantLock();
/**
* Wait condition to support awaitTermination.
*/
// 线程池{条件对象}
private final Condition termination = mainLock.newCondition();
/**
* Set containing all worker threads in pool. Accessed only when holding mainLock.
*/
// 线程池,存储【 核心Worker】与【非核心Worker】
private final HashSet<Worker> workerPool = new HashSet<>();
/**
* The queue used for holding tasks and handing off to worker
* threads. We do not require that workQueue.poll() returning
* null necessarily means that workQueue.isEmpty(), so rely
* solely on isEmpty to see if the queue is empty (which we must
* do for example when deciding whether to transition from
* SHUTDOWN to TIDYING). This accommodates special-purpose
* queues such as DelayQueues for which poll() is allowed to
* return null even if it may later return non-null when delays
* expire.
*/
// 阻塞队列,存储阻塞任务
private final BlockingQueue<Runnable> workQueue;
/*
* All user control parameters are declared as volatiles so that
* ongoing actions are based on freshest values, but without need
* for locking, since no internal invariants depend on them
* changing synchronously with respect to other actions.
*/
/**
* Factory for new threads. All threads are created using this
* factory (via method addWorker). All callers must be prepared
* for addWorker to fail, which may reflect a system or user's
* policy limiting the number of threads. Even though it is not
* treated as an error, failure to create threads may result in
* new tasks being rejected or existing ones remaining stuck in
* the queue.
*
* We go further and preserve pool invariants even in the face of
* errors such as OutOfMemoryError, that might be thrown while
* trying to create threads. Such errors are rather common due to
* the need to allocate a native stack in Thread.start, and users
* will want to perform clean pool shutdown to clean up. There
* will likely be enough memory available for the cleanup code to
* complete without encountering yet another OutOfMemoryError.
*/
// 线程工厂,用来构造执行【任务】的线程
private volatile ThreadFactory threadFactory;
/**
* Handler called when saturated or shutdown in execute.
*/
// 【拒绝策略】处理器
private volatile RejectedExecutionHandler handler;
/**
* If false (default), core threads stay alive even when idle.
* If true, core threads use keepAliveTime to time out waiting for work.
*/
// 是否主动启用了超时设置
private volatile boolean allowCoreThreadTimeOut;
/**
* Timeout in nanoseconds for idle threads waiting for work.
* Threads use this timeout when there are more than corePoolSize
* present or if allowCoreThreadTimeOut. Otherwise they wait
* forever for new work.
*/
// 【N】型Worker的最大空闲时间(启用了超时设置后生效)
private volatile long keepAliveTime;
/**
* Core pool size is the minimum number of workers to keep alive
* (and not allow to time out etc) unless allowCoreThreadTimeOut
* is set, in which case the minimum is zero.
*
* Since the worker count is actually stored in COUNT_BITS bits,
* the effective limit is {@code corePoolSize & COUNT_MASK}.
*/
// 线程池【核心阈值】,>=0,一般用来限制【R】的数量
private volatile int corePoolSize;
/**
* Maximum pool size.
*
* Since the worker count is actually stored in COUNT_BITS bits,
* the effective limit is {@code maximumPoolSize & COUNT_MASK}.
*/
// 线程池【最大阈值】,>0,一般用来限制【N】的数量(线程池满,阻塞队列也满时,限制【R】的数量)
private volatile int maximumPoolSize;
/**
* Tracks largest attained poize. Accessed onol sly under mainLock.
*/
// 记录线程池中Worker数量达到的最大值
private int largestPoolSize;
/**
* Counter for completed tasks. Updated only on termination of worker threads. Accessed only under mainLock.
*/
// 记录线程池累计执行的任务数量(每个Worker退出时都会累加一下该Worker执行过的任务数量)
private long completedTaskCount;
/**
* The default rejected execution handler.
*/
// 默认【阻塞策略】,其行为是丢弃任务,且抛出异常
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
/**
* Permission required for callers of shutdown and shutdownNow.
* We additionally require (see checkShutdownAccess) that callers
* have permission to actually interrupt threads in the worker set
* (as governed by Thread.interrupt, which relies on
* ThreadGroup.checkAccess, which in turn relies on
* SecurityManager.checkAccess). Shutdowns are attempted only if
* these checks pass.
*
* All actual invocations of Thread.interrupt (see
* interruptIdleWorkers and interruptWorkers) ignore
* SecurityExceptions, meaning that the attempted interrupts
* silently fail. In the case of shutdown, they should not fail
* unless the SecurityManager has inconsistent policies, sometimes
* allowing access to a thread and sometimes not. In such cases,
* failure to actually interrupt threads may disable or delay full
* termination. Other uses of interruptIdleWorkers are advisory,
* and failure to actually interrupt will merely delay response to
* configuration changes so is not handled exceptionally.
*/
// 关于【关闭】线程池的权限
private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread");
/*▼ 构造器 ████████████████████████████████████████████████████████████████████████████████┓ */
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters, the default thread factory and the default rejected
* execution handler.
*
* <p>It may be more convenient to use one of the {@link Executors}
* factory methods instead of this general purpose constructor.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler);
}
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters and {@linkplain ThreadPoolExecutor.AbortPolicy
* default rejected execution handler}.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler);
}
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters and
* {@linkplain Executors#defaultThreadFactory default thread factory}.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler);
}
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
/*
* corePoolSize :线程池【核心阈值】,>=0,一般用来限制【R】的数量
* maximumPoolSize:线程池【最大阈值】,>0, 一般用来限制【N】的数量(线程池满,阻塞队列也满时,限制【R】的数量)
* keepAliveTime :【N】的最大空闲时间(启用了超时设置后生效)
* unit :时间单位
* workQueue :阻塞队列
* threadFactory :线程工厂
* handler :拒绝策略
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0) {
throw new IllegalArgumentException();
}
if (workQueue == null || threadFactory == null || handler == null) {
throw new NullPointerException();
}
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
/*▲ 构造器 ████████████████████████████████████████████████████████████████████████████████┛ */
/*▼ 创建/执行/清理任务 ████████████████████████████████████████████████████████████████████████████████┓ */
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@link RejectedExecutionHandler}.
*
* @param command the task to execute
*
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
// 执行给定的任务(视情形将其封装到线程池,或放入阻塞队列排队)
public void execute(Runnable command) {
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running,
* try to start a new thread with the given command as its first task.
* The call to addWorker atomically checks runState and workerCount,
* and so prevents false alarms that would add threads when it shouldn't,
* by returning false.
*
* 2. If a task can be successfully queued,
* then we still need to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method.
* So we recheck state and if necessary roll back the enqueuing if stopped,
* or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new thread.
* If it fails, we know we are shut down or saturated and so reject the task.
*/
// 不能执行空任务
if(command == null) {
throw new NullPointerException();
}
// 获取线程池状态标记
int state = ctl.get();
// 线程池的Worker数量在【核心阈值】以内
if(workerCountOf(state)<corePoolSize) {
// 向线程池内添加并启动一个【R】
if(addWorker(command, true)) {
// 执行成功,直接返回
return;
}
state = ctl.get();
}
/*
* 至此,有三种情形:
* 1.线程池中的Worker数量超过了【核心阈值】(有些线程池将【核心阈值】设为0)
* 2.【R】添加失败(由于线程争用,线程池状态发生了变化)
* 3.【R】启动失败(少见)
*
* 此时,会尝试将任务添加到阻塞队列
*/
// {-1} 如果线程池仍处于【运行】状态(可以接收新线程),且任务可以被成功添加到阻塞队列
if(isRunning(state) && workQueue.offer(command)) {
/*
* 至此,任务成功进入了阻塞队列
*
* 为了执行阻塞队列中这个任务,
* 于是,此时,尝试向线程池中添加一个【N】,
* 目的是借助【N】间接执行阻塞队列中的任务
*
* 当然,这里如果没能成功添加【N】,也不要紧,
* 因为稍后【R】执行完后,就会转变为【N】
*/
// 再次检查线程池标记
int recheck = ctl.get();
// {0123} 如果线程池已结束【运行】状态,这意味着线程池不再接收新线程了
if(!isRunning(recheck)) {
// 赶紧从阻塞队列中移除刚刚添加的任务
if(remove(command)) {
// 任务没能保存到阻塞队列,添加失败,执行【拒绝策略】
reject(command);
}
} else {
/* {-1} 至此,说明线程池仍在【运行】状态 */
// 如果线程池为空(通常出现在【核心阈值】为0的场景)
if(workerCountOf(recheck) == 0) {
// 向线程池添加一个【N】,以便处理阻塞队列中的任务
addWorker(null, false);
}
/* 不为null的话不用管,因为自然有别的线程去处理阻塞队列的任务 */
}
} else {
/*
* 至此,有两种情形:
* 1. {0123} 线程池已结束【运行】状态,这意味程池不再接收新的Worker了
* 2. {-1} 线程池正处于【运行】状态,但任务没能成功进入阻塞队列(阻塞队列满了)
*
* 情形1:
* 这种情形下,addWorker(command, false)往往返回false,
* 即任务准备进入阻塞队列时,而且恰好线程池也已关闭,那么需要执行拒绝策略
*
* 情形2:
* 这种情形下,可能是阻塞队列满了,此时改用【最大阈值】最为线程池的容量限制,
* 重新尝试将任务包装为Worker放入了线程池,如果还是失败,则返回false
* 比如【核心阈值】与【最大阈值】一样大时,这里往往是失败的
*/
if(!addWorker(command, false)) {
// 任务进入线程池失败,执行【拒绝策略】
reject(command);
}
}
}
/**
* Checks if a new worker can be added with respect to current
* pool state and the given bound (either core or maximum). If so,
* the worker count is adjusted accordingly, and, if possible, a
* new worker is created and started, running firstTask as its
* first task. This method returns false if the pool is stopped or
* eligible to shut down. It also returns false if the thread
* factory fails to create a thread when asked. If the thread
* creation fails, either due to the thread factory returning
* null, or due to an exception (typically OutOfMemoryError in
* Thread.start()), we roll back cleanly.
*
* @param firstTask the task the new thread should run first (or
* null if none). Workers are created with an initial first task
* (in method execute()) to bypass queuing when there are fewer
* than corePoolSize threads (in which case we always start one),
* or when the queue is full (in which case we must bypass queue).
* Initially idle threads are usually created via
* prestartCoreThread or to replace other dying workers.
*
* @param core if true use corePoolSize as bound, else
* maximumPoolSize. (A boolean indicator is used here rather than a
* value to ensure reads of fresh values after checking other pool
* state).
*
* @return true if successful
*/
/*
* 将给定任务(如果存在)包装到Worker中,然后将Worker添加到线程池,并启动Worker
*
* firstTask为null时添加【N】型Worker,否则,添加【R】型Worker
* core==true表示使用【核心阈值】,core==false表示使用【最大阈值】
*
* 返回true代表Worker被成功添加,而且被成功启动
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for(int state = ctl.get(); ; ) {
/* Check if queue empty only if necessary */
// {0123} 如果线程池不再接收新线程
if(runStateAtLeast(state, SHUTDOWN)) {
// {123} 如果线程池不再处理阻塞任务,直接返回
if(runStateAtLeast(state, STOP)) {
return false;
}
/* {0} 至此,线程池处于【关闭】状态,即线程池不再接收新线程,但可以处理阻塞任务 */
// 如果任务不为空,直接返回(因为不再接收新线程[任务]了)
if(firstTask != null) {
return false;
}
// 如果阻塞队列为空,直接返回(因为虽然可以处理阻塞任务,但已经没有阻塞任务了)
if(workQueue.isEmpty()){
return false;
}
}
/*
* 至此,线程池有两种状态:
*
* 1.线程池处于【运行】状态{-1},可以接收新线程,并可以处理阻塞任务
* 2.线程池处于【关闭】状态{0},但firstTask==null(是【N】型Worker),而且阻塞队列不为空(有阻塞任务)
*/
// 原子地增加线程池中的Worker数量
for (;;) {
// 获取当前情境下需要使用的阈值限制
int count = (core ? corePoolSize : maximumPoolSize) & COUNT_MASK;
/*
* 至此,由于形参的不同组合,出现了4种情形:
*
* 1.(command, true) ==> firstTask!=null,count==corePoolSize
* 添加【R】到线程池,使用【核心阈值】;
* 线程池的Worker数量在【核心阈值】以内,此时可以持续增加【R】型Worker。
*
* 2.(command, false) ==> firstTask!=null,count==maximumPoolSize
* 添加【R】到线程池,使用【最大阈值】;
* 线程池正处于【运行】状态,且工作线程数量达到了核心阈值,而且阻塞队列也满了,
* 此时无法任务添加到阻塞队列,于是只能将线程池容量阈值扩大为【最大阈值】,
* 以便继续添加【R】型Worker去执行该任务。
*
* 3.(null, false) ==> firstTask==null,count==maximumPoolSize
* 添加【N】到线程池,使用【最大阈值】;
* 3.1 用在最低保障启动中,参见ensurePrestart()
* 3.2 工作线程被异常结束,或者,工作线程数量已经低于最小预留值的保障,此时也需要新增一个【N】型Worker去处理阻塞队列中的任务
* 3.3 线程池为空时(例如核心阈值为0),此时也需要专门添加一个【N】型Worker去处理阻塞队列中的任务
*
* 4.(null, true) ==> firstTask==null,count==corePoolSize
* 添加【N】到线程池,使用【核心阈值】;
* 4.1 用在最低保障启动中,参见ensurePrestart()
* 4.2 用在预启动中,参见prestartAllCoreThreads()和prestartCoreThread()
* 4.3 扩大了【核心阈值】,此时也需要补充【N】型Worker,以便与阻塞队列的容量匹配
*/
// 如果线程池中的工作线程(Worker)数量超过了对应情形下的阈值,则返回fasle,表示无法再添加新的任务
if(workerCountOf(state) >= count) {
return false;
}
// 原子地递【增】线程池中工作线程(Worker)的数量,并更新线程池状态标记
if(compareAndIncrementWorkerCount(state)) {
// 任务数量更新成功,跳出双重循环
break retry;
}
/* 至此,说明上面的原子操作失败了,可能是发生了线程争用 */
// 如果线程池中的任务数量发生了变化,需要重新获取线程池状态标记
state = ctl.get(); // Re-read ctl