forked from kangjianwei/LearningJDK
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathWindowsAsynchronousSocketChannelImpl.java
1358 lines (1126 loc) · 54.9 KB
/
WindowsAsynchronousSocketChannelImpl.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* Copyright (c) 2008, 2013, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package sun.nio.ch;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.nio.channels.AlreadyConnectedException;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.CompletionHandler;
import java.nio.channels.ConnectionPendingException;
import java.nio.channels.InterruptedByTimeoutException;
import java.nio.channels.ShutdownChannelGroupException;
import java.security.AccessController;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import jdk.internal.misc.Unsafe;
import sun.net.util.SocketExceptions;
/**
* Windows implementation of AsynchronousSocketChannel using overlapped I/O.
*/
// 异步Socket通道的本地实现
class WindowsAsynchronousSocketChannelImpl extends AsynchronousSocketChannelImpl implements Iocp.OverlappedChannel {
private static final Unsafe unsafe = Unsafe.getUnsafe();
/**
* typedef struct _WSABUF {
* u_long len;
* char FAR * buf;
* } WSABUF;
*/
private static final int SIZEOF_WSABUF = dependsArch(8, 16); // 目标/源头缓冲区基础信息,用在读/写操作中
private static final int OFFSETOF_LEN = 0;
private static final int OFFSETOF_BUF = dependsArch(4, 8);
/** maximum vector size for scatter/gather I/O */
private static final int MAX_WSABUF = 16;
private static final int SIZEOF_WSABUFARRAY = MAX_WSABUF * SIZEOF_WSABUF;
/** socket handle. Use begin()/end() around each usage of this handle. */
final long handle; // [客户端Socket]/[服务端Socket(通信)]在本地(native层)的文件描述符
/** completion key to identify channel when I/O completes */
private final int completionKey; // 完成键,用来关联通道
/** I/O completion port that the socket is associated with */
private final Iocp iocp; // 通道组
/**
* Pending I/O operations are tied to an OVERLAPPED structure that can only
* be released when the I/O completion event is posted to the completion port.
* Where I/O operations complete immediately then it is possible
* there may be more than two OVERLAPPED structures in use.
*/
private final PendingIoCache ioCache; // 重叠IO结构的缓存池
/** per-channel arrays of WSABUF structures */
private final long readBufferArray; // 目标缓冲区基础信息
private final long writeBufferArray; // 源头缓冲区基础信息
// 本地指针长度,通常为4字节或8字节
private static int addressSize = unsafe.addressSize();
static {
IOUtil.load();
initIDs();
}
WindowsAsynchronousSocketChannelImpl(Iocp iocp, boolean failIfGroupShutdown) throws IOException {
super(iocp);
/*
* 获取Java层的文件描述符fd在本地(native层)的引用值。
* fd是[客户端Socket]/[服务端Socket(通信)]在Java层的文件描述符。
*/
long handle = IOUtil.fdVal(fd);
int key = 0;
try {
/*
* 将指定Socket的引用handle关联到"完成端口"上,并在keyToChannel中记录handle所在通道(支持重叠IO结构)的引用。
* 返回值为与通道channel建立关联的完成键。
*/
key = iocp.associate(this, handle);
} catch(ShutdownChannelGroupException x) {
if(failIfGroupShutdown) {
closesocket0(handle);
throw x;
}
} catch(IOException x) {
closesocket0(handle);
throw x;
}
this.handle = handle;
this.iocp = iocp;
this.completionKey = key;
this.ioCache = new PendingIoCache();
// allocate WSABUF arrays
this.readBufferArray = unsafe.allocateMemory(SIZEOF_WSABUFARRAY);
this.writeBufferArray = unsafe.allocateMemory(SIZEOF_WSABUFARRAY);
}
WindowsAsynchronousSocketChannelImpl(Iocp iocp) throws IOException {
this(iocp, true);
}
// 存在安全管理器的情形下,需要使用此方法执行【bind】操作
private void doPrivilegedBind(final SocketAddress sa) throws IOException {
try {
AccessController.doPrivileged(new PrivilegedExceptionAction<Void>() {
public Void run() throws IOException {
bind(sa);
return null;
}
});
} catch(PrivilegedActionException e) {
throw (IOException) e.getException();
}
}
// 实现异步IO中的"connect"操作
@Override
<A> Future<Void> implConnect(SocketAddress remote, A attachment, CompletionHandler<Void, ? super A> handler) {
// 如果通道处于关闭状态,则需要给出异常提示
if(!isOpen()) {
Throwable exc = new ClosedChannelException();
// 未设置回调handler时,直接包装异常信息
if(handler == null) {
return CompletedFuture.withFailure(exc);
}
// 处理回调句柄,会视情形进行直接处理或间接处理
Invoker.invoke(this, handler, attachment, null, exc);
return null;
}
// 检查指定的Socket地址是否合规
InetSocketAddress isa = Net.checkAddress(remote);
SecurityManager sm = System.getSecurityManager();
if(sm != null) {
sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort());
}
/*
* check and update state.
* ConnectEx requires the socket to be bound to a local address.
*/
IOException bindException = null;
// 如果还未绑定,这里需要进行绑定操作
synchronized(stateLock) {
// 如果客户端与服务端已建立连接,则抛异常
if(state == ST_CONNECTED) {
throw new AlreadyConnectedException();
}
// 如果客户端已经准备与服务端建立连接,则抛异常
if(state == ST_PENDING) {
throw new ConnectionPendingException();
}
if(localAddress == null) {
try {
// 使用通配IP和随机端口号初始化Socket地址
SocketAddress any = new InetSocketAddress(0);
// 如果不存在安全管理器,则直接进行绑定
if(sm == null) {
bind(any);
// 如果存在安全管理器,则需要经过权限校验
} else {
doPrivilegedBind(any);
}
} catch(IOException x) {
bindException = x;
}
}
// 如果绑定成功,则进入下一步:指示客户端准备与服务端建立连接
if(bindException == null) {
state = ST_PENDING;
}
}
// 如果这里出现异常,则需要反馈异常信息,并退出connect操作
if(bindException != null) {
try {
// 由于绑定失败,所以关闭通道
close();
} catch(IOException ignore) {
}
// 未设置回调handler时,直接包装异常信息
if(handler == null) {
return CompletedFuture.withFailure(bindException);
}
// 处理回调句柄,会视情形进行直接处理或间接处理
Invoker.invoke(this, handler, attachment, null, bindException);
return null;
}
// 创建在Java层挂起任务,等待底层执行完之后通知Java层
PendingFuture<Void, A> future = new PendingFuture<>(this, handler, attachment);
// 构造一个"connect"任务
ConnectTask<A> task = new ConnectTask<A>(isa, future);
// 为future设置上下文,即设置实际需要执行的操作
future.setContext(task);
// 执行"connect"操作,connect结束后,会通知阻塞的工作线程
task.run();
return future;
}
// 实现异步IO中的读取操作
@Override
<V extends Number, A> Future<V> implRead(boolean isScatteringRead, ByteBuffer dst, ByteBuffer[] dsts, long timeout, TimeUnit unit, A attachment, CompletionHandler<V, ? super A> handler) {
// 目标缓冲区组:存储从当前通道读取到的数据
ByteBuffer[] bufs;
// 如果需要散射读(即将单个通道中的内容读取到多个目标缓冲区)
if(isScatteringRead) {
bufs = dsts;
} else {
bufs = new ByteBuffer[1];
bufs[0] = dst;
}
// 创建在Java层挂起任务,等待底层执行完之后通知Java层
PendingFuture<V, A> future = new PendingFuture<>(this, handler, attachment);
// 构造一个"读取"任务
final ReadTask<V, A> readTask = new ReadTask<>(bufs, isScatteringRead, future);
// 为future设置上下文,即设置实际需要执行的操作
future.setContext(readTask);
/*
* 如果设置了超时时间,则需要安排一个定时任务做楔子;
* 等到了截止时间时,会通过该楔子判断当前异步任务是否完成。
* 如果超时了还没完成,那就得抛出超时异常了。
*/
if(timeout>0L) {
// 待执行的一次性定时任务
Runnable task = new Runnable() {
public void run() {
readTask.timeout();
}
};
// 执行一次性的定时任务task,并返回任务本身:在任务启动后的timeout时长后开始执行
Future<?> timeoutTask = iocp.schedule(task, timeout, unit);
// 设置timeoutTask作为楔子,在当前异步IO操作超时的时候给出提醒
future.setTimeoutTask(timeoutTask);
}
// 执行"读取"操作,读取结束后,会通知阻塞的工作线程
readTask.run();
return future;
}
// 实现异步IO中的写入操作
@Override
<V extends Number, A> Future<V> implWrite(boolean isGatheringWrite, ByteBuffer src, ByteBuffer[] srcs, long timeout, TimeUnit unit, A attachment, CompletionHandler<V, ? super A> handler) {
// 源头缓冲区组:存储向当前通道写入的数据
ByteBuffer[] bufs;
// 如果需要聚集写(即将多个源头缓存区的内容发送到单个通道)
if(isGatheringWrite) {
bufs = srcs;
} else {
bufs = new ByteBuffer[1];
bufs[0] = src;
}
// 在Java层挂起任务,等待底层执行完之后通知Java层
PendingFuture<V, A> future = new PendingFuture<>(this, handler, attachment);
// 构造"写入"操作
final WriteTask<V, A> writeTask = new WriteTask<>(bufs, isGatheringWrite, future);
// 为future设置上下文,即设置实际需要执行的操作
future.setContext(writeTask);
/*
* 如果设置了超时时间,则需要安排一个定时任务做楔子;
* 等到了截止时间时,会通过该楔子判断当前异步任务是否完成。
* 如果超时了还没完成,那就得抛出超时异常了。
*/
if(timeout>0L) {
// 待执行的一次性定时任务
Runnable task = new Runnable() {
public void run() {
writeTask.timeout();
}
};
// 执行一次性的定时任务task,并返回任务本身:在任务启动后的timeout时长后开始执行
Future<?> timeoutTask = iocp.schedule(task, timeout, unit);
// 设置timeoutTask作为楔子,在当前异步IO操作超时的时候给出提醒
future.setTimeoutTask(timeoutTask);
}
// 执行"写入"操作,写入结束后,会通知阻塞的工作线程
writeTask.run();
return future;
}
// 实现对异步IO通道的关闭操作
@Override
void implClose() throws IOException {
/* close socket (may cause outstanding async I/O operations to fail). */
// 关闭[客户端Socket]
closesocket0(handle);
/* waits until all I/O operations have completed */
// 关闭重叠IO结构的缓存池
ioCache.close();
/* release arrays of WSABUF structures */
// 释放一些占用的本地内存
unsafe.freeMemory(readBufferArray);
unsafe.freeMemory(writeBufferArray);
/*
* finally disassociate from the completion port
* (key can be 0 if channel created when group is shutdown)
*/
if(completionKey != 0) {
// 解除当前通道与完成键的关联
iocp.disassociate(completionKey);
}
}
// 取消异步IO操作时的回调
@Override
public void onCancel(PendingFuture<?, ?> task) {
if(task.getContext() instanceof ConnectTask) {
// 中止读取与写入
killConnect();
}
if(task.getContext() instanceof ReadTask) {
// 中止读取
killReading();
}
if(task.getContext() instanceof WriteTask) {
// 中止写入
killWriting();
}
}
/**
* Invoked by Iocp when an I/O operation competes.
*/
/*
* 从任务结果映射集移除一条记录,并返回移除掉的重叠IO结构缓存池
* 重叠IO结构被移除下来后,会先尝试将其缓存,缓存池已满时则直接释放重叠IO结构的本地内存
*/
@Override
public <V, A> PendingFuture<V, A> getByOverlapped(long overlapped) {
return ioCache.remove(overlapped);
}
// 返回异步IO通道组,这是对完成端口的包装
@Override
public AsynchronousChannelGroupImpl group() {
return iocp;
}
/** invoked by WindowsAsynchronousServerSocketChannelImpl */
// 返回当前通道的句柄
long handle() {
return handle;
}
/** invoked by WindowsAsynchronousServerSocketChannelImpl when new connection accept */
// 设置[服务端Socket(通信)]的本地地址与远程地址
void setConnected(InetSocketAddress localAddress, InetSocketAddress remoteAddress) {
synchronized(stateLock) {
state = ST_CONNECTED;
this.localAddress = localAddress;
this.remoteAddress = remoteAddress;
}
}
private static int dependsArch(int value32, int value64) {
return (addressSize == 4) ? value32 : value64;
}
private static native void initIDs();
private static native int connect0(long socket, boolean preferIPv6, InetAddress remote, int remotePort, long overlapped) throws IOException;
private static native void updateConnectContext(long socket) throws IOException;
private static native int read0(long socket, int count, long addres, long overlapped) throws IOException;
private static native int write0(long socket, int count, long address, long overlapped) throws IOException;
private static native void shutdown0(long socket, int how) throws IOException;
private static native void closesocket0(long socket) throws IOException;
/**
* Implements the task to initiate a connection and the handler to
* consume the result when the connection is established (or fails).
*/
// 异步IO操作:进行connect操作,即客户端与服务端建立连接
private class ConnectTask<A> implements Runnable, Iocp.ResultHandler {
private final InetSocketAddress remote; // 需要连接到的远端Socket的地址
private final PendingFuture<Void, A> future; // 在Java层挂起的任务,等待填充执行结果
ConnectTask(InetSocketAddress remote, PendingFuture<Void, A> future) {
this.remote = remote;
this.future = future;
}
/**
* Task to initiate a connection.
*/
@Override
public void run() {
long overlapped = 0L;
Throwable exc = null;
try {
// 添加一个读锁
begin();
/*
* synchronize on future to allow this thread handle the case
* where the connection is established immediately.
*/
synchronized(future) {
/*
* 将future与overlapped建立关联
*
* 向重叠IO结构缓存池ioCache中存储一个键值对<overlapped, future>,
* 即将一个OVERLAPPED结构(如果不存在则新建)与future进行绑定,并返回OVERLAPPED结构的本地引用。
*/
overlapped = ioCache.add(future);
/* initiate the connection */
// 当前Socket向指定地址处的ServerSocket发起连接
int n = connect0(handle, Net.isIPv6Available(), remote.getAddress(), remote.getPort(), overlapped);
/*
* 情形1.1:当前IO操作有成效
*
* 如果本地反馈IOStatus.UNAVAILABLE消息,
* 说明别的IO操作正在进行,当前IO操作已进入队列排队。
*
* 此种情形下需要挂起future以待后续处理。
*/
if(n == IOStatus.UNAVAILABLE) {
// connection is pending
return;
}
/*
* 情形1.2:当前IO操作有成效:客户端成功与服务端建立了连接。
*
* 此种情形下需要进行一些关于connect的收尾处理。
*/
/* connection established immediately */
// 客户端与服务端connect成功后需要执行的一些收尾操作
afterConnect();
// 设置执行结果为null,因为connect无返回值
future.setResult(null);
}
} catch(Throwable e) {
/*
* 情形2:当前IO操作没成效,而且抛出了异常
*
* 此种情形下不需要挂起future,可以立即填充执行结果。
*/
// 如果出状况之前设置了重叠IO结构
if(overlapped != 0L) {
// 从任务结果映射集移除一条记录;会尝试缓存重叠IO结构,缓存池已满时则直接释放重叠IO结构的本地内存
ioCache.remove(overlapped);
}
exc = e;
} finally {
// 移除一个读锁
end();
}
// connect过程中出现了异常
if(exc != null) {
// 关闭异步通道
closeChannel();
// 构造一条异常信息
exc = SocketExceptions.of(toIOException(exc), remote);
// 如果connect过程中出现了异常,则设置任务执行结果为异常
future.setFailure(exc);
}
// 当异步IO操作已有执行结果时,接下来处理future中记录的回调句柄
Invoker.invoke(future);
}
/**
* Invoked by handler thread when connection established.
*/
// 当IO线程执行完"connect"操作后,唤醒工作线程,在工作线程中调用此方法设置任务执行的结果
@Override
public void completed(int bytesTransferred, boolean canInvokeDirect) {
Throwable exc = null;
try {
// 添加一个读锁
begin();
// 客户端与服务端connect成功后需要执行的一些收尾操作
afterConnect();
// 设置任务执行的结果
future.setResult(null);
} catch(Throwable e) {
/* channel is closed or unable to finish connect */
exc = e;
} finally {
// 移除一个读锁
end();
}
/* can't close channel while in begin/end block */
if(exc != null) {
// 关闭异步通道
closeChannel();
// 生成一条异常信息
IOException e = SocketExceptions.of(toIOException(exc), remote);
// 设置异常信息
future.setFailure(e);
}
if(canInvokeDirect) {
// 直接处理future中记录的回调句柄,不会改变当前线程的递归调用深度
Invoker.invokeUnchecked(future);
} else {
// 当异步IO操作已有执行结果时,接下来处理future中记录的回调句柄
Invoker.invoke(future);
}
}
/**
* Invoked by handler thread when failed to establish connection.
*/
// 当IO线程执行完"connect"操作后,唤醒工作线程,在工作线程中调用此方法设置任务执行中的异常信息
@Override
public void failed(int error, IOException e) {
e = SocketExceptions.of(e, remote);
if(isOpen()) {
// 关闭异步通道
closeChannel();
// 如果通道未关闭,则设置给定的异常信息
future.setFailure(e);
} else {
// 生成一条异常信息
e = SocketExceptions.of(new AsynchronousCloseException(), remote);
// 如果通道已关闭,则设置一个通道已关闭的异常
future.setFailure(e);
}
// 当异步IO操作已有执行结果时,接下来处理future中记录的回调句柄
Invoker.invoke(future);
}
/**
* Invoke after a connection is successfully established.
*/
// 客户端与服务端connect成功后需要执行的一些收尾操作
private void afterConnect() throws IOException {
/*
* 更新[客户Socket]的一些连接信息
* 参见:https://stackoverflow.com/questions/9169086/tcp-shutdown-with-sockets-connected-through-acceptex
*/
updateConnectContext(handle);
synchronized(stateLock) {
// 指示客户端与服务端已建立连接
state = ST_CONNECTED;
// 记录已连接的远端Socket地址
remoteAddress = remote;
}
}
// 关闭异步通道
private void closeChannel() {
try {
close();
} catch(IOException ignore) {
}
}
// 异常类型转换
private IOException toIOException(Throwable x) {
if(x instanceof IOException) {
if(x instanceof ClosedChannelException) {
x = new AsynchronousCloseException();
}
return (IOException) x;
}
return new IOException(x);
}
}
/**
* Implements the task to initiate a read and the handler to consume the
* result when the read completes.
*/
// 异步IO操作:进行读取操作,读取当前通道中的数据,并将其存储到指定的缓冲区中
private class ReadTask<V, A> implements Runnable, Iocp.ResultHandler {
private final boolean scatteringRead; // 是否为散射读(即将单个通道中的内容读取到多个目标缓冲区)
private final ByteBuffer[] bufs; // 目标缓冲区组:存储从当前通道读取到的数据
private final int numBufs; // 目标缓冲区的数量
private ByteBuffer[] shadow; // 影子缓冲区组,暂时存储将要读到的数据
private final PendingFuture<V, A> future; // 在Java层挂起的任务,等待填充执行结果
ReadTask(ByteBuffer[] bufs, boolean scatteringRead, PendingFuture<V, A> future) {
this.bufs = bufs;
this.numBufs = Math.min(bufs.length, MAX_WSABUF);
this.scatteringRead = scatteringRead;
this.future = future;
}
@Override
@SuppressWarnings("unchecked")
public void run() {
long overlapped = 0L; // OVERLAPPED结构的本地引用
boolean prepared = false; // 指示目标缓冲区是否已经准备好
boolean pending = false; // 指示读取操作是否已经挂起
try {
// 添加一个读锁
begin();
/* substitute non-direct buffers */
// 准备目标缓冲区,以接收即将读取到的数据
prepareBuffers();
// 已准备好目标缓冲区
prepared = true;
/* get an OVERLAPPED structure (from the cache or allocate) */
/*
* 将future与overlapped建立关联
*
* 向重叠IO结构缓存池ioCache中存储一个键值对<overlapped, future>,
* 即将一个OVERLAPPED结构(如果不存在则新建)与future进行绑定,并返回OVERLAPPED结构的本地引用。
*/
overlapped = ioCache.add(future);
// 从handle处的通道中读取数据,读取到的数据将暂时存储到shadow中
int n = read0(handle, numBufs, readBufferArray, overlapped);
/*
* 情形1:当前IO操作有成效
*
* 本地反馈IOStatus.UNAVAILABLE消息有两种原因:
* 1.读取成功
* 2.读取失败,失败原因是别的IO操作正在进行,当前IO操作已进入队列排队
*
* 此种情形下需要挂起future以待后续处理。
*/
if(n == IOStatus.UNAVAILABLE) {
// 标记future任务已挂起,需要后续填充IO操作的结果
pending = true;
return;
}
/*
* 情形2.1:当前IO操作没成效,原因是Socket通道已关闭
*
* 此时无法读取内容,则将执行结果设置为-1。
*/
if(n == IOStatus.EOF) {
// 结束读取,并非因为超时
enableReading();
// 直接设置执行结果为EOF
if(scatteringRead) {
future.setResult((V) Long.valueOf(-1L));
} else {
future.setResult((V) Integer.valueOf(-1));
}
/*
* 情形2.2:当前IO操作没成效,而且抛出了异常
*
* 本地产生了其他错误消息的话,直接抛异常。
*
* 此种情形下不需要挂起future,可以立即填充执行结果。
*/
} else {
throw new InternalError("Read completed immediately");
}
} catch(Throwable e) {
/*
* failed to initiate read.
* reset read flag before releasing waiters.
*/
// 结束读取,并非因为超时
enableReading();
if(e instanceof ClosedChannelException) {
e = new AsynchronousCloseException();
}
if(!(e instanceof IOException)) {
e = new IOException(e);
}
// 如果读取过程中出现了异常,则设置任务执行结果为异常
future.setFailure(e);
} finally {
/* release resources if I/O not pending */
/*
* 如果当前IO操作没成效,那么当场就会被设置任务执行结果。
* 在此种情形下,也就没必要挂起future了。
* 因此,此处的工作就是将overlapped与相应的future取消关联。
*/
if(!pending) {
// 如果出状况之前设置了重叠IO结构
if(overlapped != 0L) {
// 从任务结果映射集移除一条记录;会尝试缓存重叠IO结构,缓存池已满时则直接释放重叠IO结构的本地内存
ioCache.remove(overlapped);
}
// 如果目标缓冲区已经准备好,此处需要释放其空间
if(prepared) {
// 如果目标缓冲区是直接缓冲区,则释放其本地内存
releaseBuffers();
}
}
// 移除一个读锁
end();
}
// 当异步IO操作已有执行结果时,接下来处理future中记录的回调句柄
Invoker.invoke(future);
}
/**
* Executed when the I/O has completed
*/
// 当IO线程执行完"读取"操作后,唤醒工作线程,在工作线程中调用此方法设置任务执行的结果
@Override
@SuppressWarnings("unchecked")
public void completed(int bytesTransferred, boolean canInvokeDirect) {
// 如果没读到有效字节,说明遇到了EOF
if(bytesTransferred == 0) {
bytesTransferred = -1; // EOF
} else {
// 将读到的字节存入目标缓冲区
updateBuffers(bytesTransferred);
}
// 如果目标缓冲区是直接缓冲区,则释放其本地内存
releaseBuffers();
// release waiters if not already released by timeout
synchronized(future) {
// 如果任务已经执行完成(之前设置过了执行结果),则直接返回
if(future.isDone()) {
return;
}
// 结束读取,并非因为超时
enableReading();
// 设置执行结果为读到的字节数
if(scatteringRead) {
future.setResult((V) Long.valueOf(bytesTransferred));
} else {
future.setResult((V) Integer.valueOf(bytesTransferred));
}
}
if(canInvokeDirect) {
// 直接处理future中记录的回调句柄,不会改变当前线程的递归调用深度
Invoker.invokeUnchecked(future);
} else {
// 当异步IO操作已有执行结果时,接下来处理future中记录的回调句柄
Invoker.invoke(future);
}
}
// 当IO线程执行完"读取"操作后,唤醒工作线程,在工作线程中调用此方法设置任务执行中的异常信息
@Override
public void failed(int error, IOException e) {
// 如果目标缓冲区是直接缓冲区,则释放其本地内存
releaseBuffers();
/* release waiters if not already released by timeout */
// 如果通道已经关闭,则需要更新异常为"关闭异常"
if(!isOpen()) {
e = new AsynchronousCloseException();
}
synchronized(future) {
// 如果任务已经执行完成(之前设置过了执行结果),则直接返回
if(future.isDone()) {
return;
}
// 结束读取,并非因为超时
enableReading();
// 如果出现了异常,则设置任务执行结果为相应的异常
future.setFailure(e);
}
// 当异步IO操作已有执行结果时,接下来处理future中记录的回调句柄
Invoker.invoke(future);
}
/**
* Invoked prior to read to prepare the WSABUF array. Where necessary,
* it substitutes non-direct buffers with direct buffers.
*/
// 准备目标缓冲区,以接收即将读取到的数据
void prepareBuffers() {
// 影子缓冲区组,暂时存储将要读取到的数据
shadow = new ByteBuffer[numBufs];
// 目标缓冲区基础信息,包含目标缓冲区地址与目标缓冲区的剩余空间信息
long address = readBufferArray;
// 遍历每个目标缓冲区
for(int i = 0; i<numBufs; i++) {
ByteBuffer dst = bufs[i];
int pos = dst.position();
int lim = dst.limit();
assert (pos<=lim);
int rem = (pos<=lim ? lim - pos : 0);
long addr;
// 如果目标缓冲区不是直接缓冲区,则需要创建一个影子缓冲区暂时存储将要读到的数据
if(!(dst instanceof DirectBuffer)) {
// 获取一块容量至少为rem个字节的直接缓冲区
ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
// 记下目标缓冲区
shadow[i] = bb;
// 目标缓冲区地址
addr = ((DirectBuffer) bb).address();
// 如果目标缓冲区已经是直接缓冲区,则直接使用当前目标缓冲区
} else {
// 记下目标缓冲区
shadow[i] = dst;
// 目标缓冲区地址
addr = ((DirectBuffer) dst).address() + pos;
}
unsafe.putAddress(address + OFFSETOF_BUF, addr); // 记录目标缓冲区地址
unsafe.putInt(address + OFFSETOF_LEN, rem); // 记录目标缓冲区的剩余空间
address += SIZEOF_WSABUF;
}
}
/**
* Invoked after a read has completed to update the buffer positions
* and release any substituted buffers.
*/
// 将读到的字节存入目标缓冲区
void updateBuffers(int bytesRead) {
// 遍历目标缓冲区,设置好游标
for(int i = 0; i<numBufs; i++) {
ByteBuffer nextBuffer = shadow[i];
int pos = nextBuffer.position();
int len = nextBuffer.remaining();
// 如果剩余的字节数大于目标缓冲区剩余的空间,则可以直接填满当前目标缓冲区
if(bytesRead >= len) {
bytesRead -= len;
int newPosition = pos + len;
try {
nextBuffer.position(newPosition);
} catch(IllegalArgumentException x) {
// position changed by another
}
// 剩余字节数已经小于当前目标缓冲区的剩余空间,则把剩余的字节填充到目标缓冲区
} else {
// Buffers not completely filled
if(bytesRead>0) {
assert (pos + bytesRead<(long) Integer.MAX_VALUE);
int newPosition = pos + bytesRead;
try {
nextBuffer.position(newPosition);
} catch(IllegalArgumentException x) {
// position changed by another
}
}
break;
}
}
// Put results from shadow into the slow buffers
for(int i = 0; i<numBufs; i++) {
// 如果原先的目标缓冲区已经是直接缓冲区,则忽略此目标缓冲区
if(bufs[i] instanceof DirectBuffer) {
continue;
}
// 将影子缓冲区从写模式切换到读模式
shadow[i].flip();
try {
// 将影子缓冲区中的数据转入原先给出的目标缓冲区中
bufs[i].put(shadow[i]);
} catch(BufferOverflowException x) {
// position changed by another
}
}
}
// 如果目标缓冲区是直接缓冲区,则释放其本地内存
void releaseBuffers() {
for(int i = 0; i<numBufs; i++) {
if(!(bufs[i] instanceof DirectBuffer)) {
// 采用FILO的形式(入栈模式)将shadow[i]放入Buffer缓存池以待复用
Util.releaseTemporaryDirectBuffer(shadow[i]);