forked from kangjianwei/LearningJDK
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathExecutorCompletionService.java
281 lines (241 loc) · 11.7 KB
/
ExecutorCompletionService.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
/*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
/*
* This file is available under and governed by the GNU General Public
* License version 2 only, as published by the Free Software Foundation.
* However, the following notice accompanied the original version of this
* file:
*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/
package java.util.concurrent;
/**
* A {@link CompletionService} that uses a supplied {@link Executor}
* to execute tasks. This class arranges that submitted tasks are,
* upon completion, placed on a queue accessible using {@code take}.
* The class is lightweight enough to be suitable for transient use
* when processing groups of tasks.
*
* <p>
*
* <b>Usage Examples.</b>
*
* Suppose you have a set of solvers for a certain problem, each
* returning a value of some type {@code Result}, and would like to
* run them concurrently, processing the results of each of them that
* return a non-null value, in some method {@code use(Result r)}. You
* could write this as:
*
* <pre> {@code
* void solve(Executor e,
* Collection<Callable<Result>> solvers)
* throws InterruptedException, ExecutionException {
* CompletionService<Result> cs
* = new ExecutorCompletionService<>(e);
* solvers.forEach(cs::submit);
* for (int i = solvers.size(); i > 0; i--) {
* Result r = cs.take().get();
* if (r != null)
* use(r);
* }
* }}</pre>
*
* Suppose instead that you would like to use the first non-null result
* of the set of tasks, ignoring any that encounter exceptions,
* and cancelling all other tasks when the first one is ready:
*
* <pre> {@code
* void solve(Executor e,
* Collection<Callable<Result>> solvers)
* throws InterruptedException {
* CompletionService<Result> cs
* = new ExecutorCompletionService<>(e);
* int n = solvers.size();
* List<Future<Result>> futures = new ArrayList<>(n);
* Result result = null;
* try {
* solvers.forEach(solver -> futures.add(cs.submit(solver)));
* for (int i = n; i > 0; i--) {
* try {
* Result r = cs.take().get();
* if (r != null) {
* result = r;
* break;
* }
* } catch (ExecutionException ignore) {}
* }
* } finally {
* futures.forEach(future -> future.cancel(true));
* }
*
* if (result != null)
* use(result);
* }}</pre>
*
* @since 1.5
*/
/*
* 借助Executor和BlockingQueue实现的【任务执行-剥离框架】
*
* 当给定的任务结束后,不管是正常结束,还是异常结束,或者是被取消,都会被存入一个阻塞队列中
* 后续可以从阻塞队列中取出这些已结束的任务,并获取它们的返回值或任务状态
*/
public class ExecutorCompletionService<V> implements CompletionService<V> {
// 【任务执行器】
private final Executor executor;
/**
* 【任务执行框架】
*
* 只有当executor是AbstractExecutorService的实例时,该字段等同于executor,否则,该字段为null。
* 换句话说,当executor为【线程池】ThreadPoolExecutor或【任务池】ForkJoinPool(及其子类)时,aes有效
*/
private final AbstractExecutorService aes;
// 阻塞队列,存储已结束的任务(不管是正常结束,还是异常结束,或者是被取消)
private final BlockingQueue<Future<V>> completionQueue;
/*▼ 构造方法 ████████████████████████████████████████████████████████████████████████████████┓ */
/**
* Creates an ExecutorCompletionService using the supplied
* executor for base task execution and a
* {@link LinkedBlockingQueue} as a completion queue.
*
* @param executor the executor to use
*
* @throws NullPointerException if executor is {@code null}
*/
public ExecutorCompletionService(Executor executor) {
if(executor == null) {
throw new NullPointerException();
}
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null;
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}
/**
* Creates an ExecutorCompletionService using the supplied
* executor for base task execution and the supplied queue as its
* completion queue.
*
* @param executor the executor to use
* @param completionQueue the queue to use as the completion queue
* normally one dedicated for use by this service. This
* queue is treated as unbounded -- failed attempted
* {@code Queue.add} operations for completed tasks cause
* them not to be retrievable.
*
* @throws NullPointerException if executor or completionQueue are {@code null}
*/
public ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue) {
if(executor == null || completionQueue == null) {
throw new NullPointerException();
}
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null;
this.completionQueue = completionQueue;
}
/*▲ 构造方法 ████████████████████████████████████████████████████████████████████████████████┛ */
/*▼ 创建任务 ████████████████████████████████████████████████████████████████████████████████┓ */
// 包装Callable,创建新任务
private RunnableFuture<V> newTaskFor(Callable<V> task) {
if(aes == null) {
return new FutureTask<V>(task);
} else {
return aes.newTaskFor(task);
}
}
// 包装Runnable,创建新任务
private RunnableFuture<V> newTaskFor(Runnable task, V result) {
if(aes == null) {
return new FutureTask<V>(task, result);
} else {
return aes.newTaskFor(task, result);
}
}
/*▲ 创建任务 ████████████████████████████████████████████████████████████████████████████████┛ */
/*▼ 提交/执行任务 ████████████████████████████████████████████████████████████████████████████████┓ */
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
// 提交/执行任务,在任务结束后,将任务存入阻塞队列
public Future<V> submit(Callable<V> task) {
if(task == null) {
throw new NullPointerException();
}
RunnableFuture<V> f = newTaskFor(task);
executor.execute(new QueueingFuture<V>(f, completionQueue));
// 返回值是Future类型,目的是为了判断任务状态,获取计算结果
return f;
}
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
// 提交/执行任务,在任务结束后,将任务存入阻塞队列
public Future<V> submit(Runnable task, V result) {
if(task == null) {
throw new NullPointerException();
}
RunnableFuture<V> f = newTaskFor(task, result);
executor.execute(new QueueingFuture<V>(f, completionQueue));
// 返回值是Future类型,目的是为了判断任务状态,获取计算结果
return f;
}
/*▲ 提交/执行任务 ████████████████████████████████████████████████████████████████████████████████┛ */
/*▼ 获取已结束任务 ████████████████████████████████████████████████████████████████████████████████┓ */
// 从阻塞队列取出/移除的一个已结束任务,可能会被阻塞
public Future<V> take() throws InterruptedException {
return completionQueue.take();
}
// 从阻塞队列取出/移除的一个已结束任务,不会被阻塞,但可能返回null
public Future<V> poll() {
return completionQueue.poll();
}
// 在指定时间内从阻塞队列取出/移除的一个已结束任务,如果超时,则返回null
public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException {
return completionQueue.poll(timeout, unit);
}
/*▲ 获取已结束任务 ████████████████████████████████████████████████████████████████████████████████┛ */
/**
* FutureTask extension to enqueue upon completion.
*/
// 增强FutureTask,可以将已结束的任务存储到阻塞队列
private static class QueueingFuture<V> extends FutureTask<Void> {
// 待执行任务
private final Future<V> task;
// 存储已结束的任务
private final BlockingQueue<Future<V>> completionQueue;
QueueingFuture(RunnableFuture<V> task, BlockingQueue<Future<V>> completionQueue) {
super(task, null);
this.task = task;
this.completionQueue = completionQueue;
}
// 至此,当前任务已经结束(不管是正常结束,还是异常结束,或者是被取消),所有等待结果的线程也已唤醒
protected void done() {
// 将已结束的任务加入阻塞队列
completionQueue.add(task);
}
}
}