Netty Future & Promise

摘要:
Netty 中的所有 IO 操作都是异步的,也就是说任何 IO 调用都将立即返回,返回的 ChannelFuture 实例会在操作完成时,通知用户该操作的状态、结果。

异步调用

我们经常听到同步、异步,为了理解,通常都是通过举例来说明。
比如,
同步调用,好比你在包子店点了一笼小笼包,一直在柜台等到小笼包蒸熟,然后开始吃;
异步调用,好比你在包子店点了一笼小笼包,老板给了你一个小票(凭证),你拿着小票可以去干别的事情,等到差不多时候,你 自己主动 拿着小票去取小笼包来吃(Future),当然还有一种更好的方式是,你的小笼包蒸熟后,老板直接电话 通知 你凭小票来取小笼包(Promise)。

但是好像很少有人给出它们的定义,举例有助于概念理解,定义有助于精确理解。
下面是我个人的一些理解。

什么是同步调用?

  • 调用一个接一个执行,上一个调用函数执行完成,下一个调用才能开始执行
  • 好处是简单,与人的日常思维习惯相符

那什么是异步调用(asynchronous call)?

  • 调用方无须等待调用的函数执行完成,就可继续向下执行
  • 好处是适用场景下(阻塞任务、不关心执行结果,处理不确定事件)会非常高效,可以显著提升性能

异步调用与阻塞

  • 异步调用与阻塞是两回事
  • 大多数情况下,我们使用异步调用,是为了规避阻塞带来的性能降低。
  • 非阻塞的调用,我们照样可以使用异步调用,比如统计操作信息不阻塞,我们不关心操作结果,照样使用异步调用实现

异步调用与多线程

  • 异步调用与多线程,也是两回事
  • 异步调用,绝大多数是通过多线程实现的,方便、逻辑简单
  • 单线程照样可以实现异步调用,一般会通过任务队列管理机制实现单线程的异步调用

异步调用与MQ(消息队列)

  • 异步调用,原理上与 MQ 是相同的:一个异步调用就相当于发出一个执行特定任务的消息,消息发出去就可以了,无需等到任务完成
  • 类似于生产者-消费者模型,通过中间队列,将生产者和消费者关系解耦
  • MQ 是进程维度异步调用的一种实现方式

异步调用与时序

  • 异步调用,跟时序也是两回事,异步调用需不需要保证时序,要看业务需要
  • 业务常常要求具有某种有序关系的异步调用

异步调用与事件驱动

  • 事件驱动机制,常常应用于网络编程、前端(桌面 GUI/Web 浏览器: 键盘输入、鼠标点击/移动 …)领域:事件监听器 Listener、事件调度/分发器 Mediator/Dispatcher、事件处理器 Handler
  • 事件驱动机制,底层都是使用的异步调用,接收到事件,异步处理事件
  • 通过事件驱动机制,异步调用可以很方便实现一串时序逻辑
  • 异步调用与事件驱动结合的系统,会表现出一种特殊的、极具价值的行为:系统可以以任意的顺序响应在任意的时间点产生的事件。(引用自 《Netty in Action.v5》)

Netty 是异步的

Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients.

官网介绍,Netty 是异步的。

  • 一方面,Netty 使用异步 IO 模型,或者更精确地说是同步非阻塞 IO。这个指的 IO 系统调用层面,IO 读写等待就绪后,才会执行读写,不会阻塞线程,已经在 NIO 里详细说了;
  • 另一方面,Netty 中的所有 IO 操作都是异步的。这个指的是用户调用层面,也就是说任何 IO 调用都将立即返回,返回的 ChannelFuture 实例会在操作完成时,通知用户该操作的状态、结果。

Netty Future 体系

Netty 异步 IO 操作,使用 Future、Promise 实现。
Future、Promise 都可以理解为未来任务的一种凭证(期权?),将来任务完成,总是可以通过这个凭证,拿到任务的状态、结果。
future-hierarchy

Netty 模块化做的特别好,Future、Promise 功能,统一封装在 netty-common 模块里(io.netty.util.concurrent 包),可以独立作为通用库使用。

JDK Future

java.util.concurrent.Future

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* A {@code Future} represents the result of an asynchronous computation.
*/
public interface Future<V> {

boolean cancel(boolean mayInterruptIfRunning);

boolean isCancelled();

/**
* Completion may be due to normal termination, an exception, or
* cancellation -- in all of these cases, this method will return {@code true}.
*/
boolean isDone();

V get() throws InterruptedException, ExecutionException;

V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

JDK Future 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class FutureDemo {

private static void testFuture() {
ExecutorService threadPool = Executors.newFixedThreadPool(2);
Future<Integer> future = threadPool.submit(
() -> {
for (int i = 0; i < 10; i++) {
Thread.sleep(1 * 1000L);
System.out.printf("%s === sleep %d\n", Thread.currentThread().getName(), i);
if (i == 5) {
// throw new RuntimeException("error occured!");
}
}
return new Random().nextInt(100);
});

System.out.printf("%s === isDone: %s\n", Thread.currentThread().getName(), future.isDone());
// java.util.concurrent.FutureTask
// System.out.println(future.getClass().getCanonicalName());

long start = System.currentTimeMillis();
try {
// Integer result = future.get();
Integer result = future.get(20, TimeUnit.SECONDS);
System.out.println(result);
} catch (InterruptedException e) {
System.out.printf("InterruptedException (%s) \n", e.getMessage());
} catch (ExecutionException e) {
System.out.printf("ExecutionException (%s) \n", e.getMessage());
} catch (TimeoutException e) {
// mayInterruptIfRunning: true, false
boolean cancel = future.cancel(false);
System.out.printf("cancel() -> %s\n", cancel);
System.out.printf("isCancelled: %s\n", future.isCancelled());
}
System.out.printf("%s === cost: %s\n", Thread.currentThread().getName(), System.currentTimeMillis() - start);

threadPool.shutdown();
}

public static void main(String[] args) {
testFuture();
}

}

JDK Future 缺点

仔细思考 JDK Future,会发现两个问题:

  1. 只有一个 isDone() 方法,判断异步操作是否完成,但是对于完成的定义过于模糊
    正常终止、抛出异常、用户取消,都会使 isDone() 返回 true
    实际使用中,常常需要对这三种情况分别处理

  2. 需要你主动获取操作状态、结果,检查操作是否已经完成,或者一直阻塞直到它完成
    对于一个异步操作,我们更关心的是这个异步操作结束后能否再执行一系列动作。
    如果是主动获取(拉模式),很难精确掌握获取结果的时间,拉取早了,当前线程阻塞,晚了,后续操作又不及时;
    最好是,异步任务执行完了,通知用户(推模式),以便用户继续后面操作。

正是因为 JDK Future 存在上面两个问题,Netty 扩展了自己的 Future。

Future

io.netty.util.concurrent.Future

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/**
* The result of an asynchronous operation.
*/
public interface Future<V> extends java.util.concurrent.Future<V> {

/**
* Returns {@code true} if and only if the I/O operation was completed successfully.
*/
boolean isSuccess();

/**
* Returns the cause of the failed I/O operation if the I/O operation has failed.
*/
Throwable cause();

boolean isCancellable();

/**
* Adds the specified listener to this future. The specified listener is notified when this future is done.
* If this future is already completed, the specified listener is notified immediately.
*/
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);

Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);

/**
* Waits for this future until it is done,
* and rethrows the cause of the failure if this future failed.
*/
Future<V> sync() throws InterruptedException;

/**
* Waits for this future to be completed.
*/
Future<V> await() throws InterruptedException;

/**
* Return the result without blocking. If the future is not done yet this will return {@code null}.
*/
V getNow();
}

Netty Future 解决了 JDK Future 的两个问题:

  1. Future 有两种状态 Uncompleted、Completed,其中 Completed 又有三种状态

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
                                         +---------------------------+
    | Completed successfully |
    +---------------------------+
    +----> isDone() = true |
    +--------------------------+ | | isSuccess() = true |
    | Uncompleted | | +===========================+
    +--------------------------+ | | Completed with failure |
    | isDone() = false | | +---------------------------+
    | isSuccess() = false |----+----> isDone() = true |
    | isCancelled() = false | | | cause() = non-null |
    | cause() = null | | +===========================+
    +--------------------------+ | | Completed by cancellation |
    | +---------------------------+
    +----> isDone() = true |
    | isCancelled() = true |
    +---------------------------+
  2. addListener(listener),可以添加操作完成后的回调操作

  3. await()、sync(),可以以阻塞的方式等待异步操作完成

Future 是只读的 read-only,操作的状态、结果是不可变的,只能读取,不能修改,因此 Netty 还提供了可写的 Future,也即 Promise。

Future 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public class FutureDemo {

public static void main(String[] args) {
EventExecutorGroup threadPool = new DefaultEventExecutorGroup(4);
for (int i = 0; i < 10; i++) {
Future<Integer> future = threadPool.submit(
new Callable<Integer>() {
@Override
public Integer call() throws Exception {
for (int j = 0; j < 4; j++) {
Thread.sleep(1 * 1000L);
System.out.printf("%s === sleep 1s\n", Thread.currentThread().getName());
if (j == 2) {
// throw new RuntimeException("error occured!");
}
}
return new Random().nextInt(100);
}
}
);

// io.netty.util.concurrent.PromiseTask
// System.out.println(future.getClass().getCanonicalName());

future.addListener(
new FutureListener<Integer>() {
@Override
public void operationComplete(Future<Integer> future) throws Exception {
if (future.isSuccess()) {
Integer result = future.get();
System.out.printf("%s === 任务成功, result=%s\n", Thread.currentThread().getName(), result);
// do something when task success
if (result > 50) {
System.out.printf("%s === GOOD, result > 50\n", Thread.currentThread().getName());
} else {
System.out.printf("%s === SAD\n", Thread.currentThread().getName());
}
} else if (future.cause() != null) {
System.out.printf("%s === 任务失败, cause is %s\n", Thread.currentThread().getName(), future.cause().getMessage());
} else {// isCancelled()
System.out.printf("%s === 任务被取消, isCancelled=%s\n", Thread.currentThread().getName(), future.isCancelled());
}
}
}
);
}

System.out.printf("%s === group.shutdownGracefully begin ...\n", Thread.currentThread().getName());
Future<?> future = threadPool.shutdownGracefully();
try {
future.sync();
System.out.printf("%s === group.shutdownGracefully END.\n", Thread.currentThread().getName());
} catch (InterruptedException e) {
// ignore
}
}
}

Promise

io.netty.util.concurrent.Promise

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
* Special {@link Future} which is writable.
*/
public interface Promise<V> extends Future<V> {

/**
* Marks this future as a success and notifies all listeners.
* If it is success or failed already, it will throw an {@link IllegalStateException}.
*/
Promise<V> setSuccess(V result);

/**
* Marks this future as a success and notifies all listeners.
*
* @return {@code true} if and only if successfully marked this future as
* a success. Otherwise {@code false} because this future is
* already marked as either a success or a failure.
*/
boolean trySuccess(V result);

Promise<V> setFailure(Throwable cause);

boolean tryFailure(Throwable cause);

/**
* Make this future impossible to cancel.
*
* @return {@code true} if and only if successfully marked this future as uncancellable
* or it is already done without being cancelled.
* {@code false} if this future has been cancelled already.
*/
boolean setUncancellable();

// 覆写了 Future 部分方法,返回值限定为 Promise
@Override
Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
@Override
Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
@Override
Promise<V> await() throws InterruptedException;
@Override
Promise<V> sync() throws InterruptedException;
}

Promise 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
public class PromiseDemo {

static class PrintTask implements Runnable {

private Promise promise;

public PrintTask(Promise promise) {
this.promise = promise;
}

@Override
public void run() {
try {
int times = this.doWork();
promise.setSuccess(times);
} catch (Exception e) {
promise.setFailure(e);
}
}

private int doWork() throws Exception {
boolean bool = true;
if (bool) {
// throw new RuntimeException("error occured!");
}
return new Random().nextInt(10) + 1;
}
}

public static void main(String[] args) {
EventExecutor executor = new DefaultEventExecutor();
Promise promise = executor.newPromise();
// io.netty.util.concurrent.DefaultPromise
// System.out.println(promise.getClass().getCanonicalName());

promise.addListener(new FutureListener<Integer>() {
@Override
public void operationComplete(Future<Integer> future) throws Exception {
if (future.isSuccess()) {
Integer result = future.get();
System.out.printf("%s === 任务成功, result=%s\n", Thread.currentThread().getName(), result);
// do something when task success
if (result > 5) {
System.out.printf("%s === GOOD, result > 50\n", Thread.currentThread().getName());
} else {
System.out.printf("%s === SAD\n", Thread.currentThread().getName());
}
} else if (future.cause() != null) {
System.out.printf("任务失败, cause is %s\n", future.cause().getMessage());
} else {// isCancelled()
System.out.printf("任务被取消, isCancelled=%s\n", future.isCancelled());
}
}
});

executor.submit(new PrintTask(promise));

// 阻塞主线程
// Promise promiseSync = executor.newPromise();
// try {
// promiseSync.sync();
// } catch (InterruptedException e) {
// // ignore
// }

System.out.printf("%s === executor.shutdownGracefully\n", Thread.currentThread().getName());
executor.shutdownGracefully();
}
}

先看看 Promise promise = executor.newPromise();
DefaultEventExecutor 类的 structure 里找 newPromise() 方法,
发现是继承 AbstractEventExecutor 类的方法

1
2
3
4
@Override
public <V> Promise<V> newPromise() {
return new DefaultPromise<V>(this);
}

后面看看 Netty 具体是如何实现 Promise 的。

AbstractFuture

io.netty.util.concurrent.AbstractFuture
组合使用 await(); cause(); getNow() 方法,实现了 get() 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public abstract class AbstractFuture<V> implements Future<V> {

@Override
public V get() throws InterruptedException, ExecutionException {
await();

Throwable cause = cause();
if (cause == null) {
return getNow();
}
if (cause instanceof CancellationException) {
throw (CancellationException) cause;
}
throw new ExecutionException(cause);
}

@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
if (await(timeout, unit)) {
Throwable cause = cause();
if (cause == null) {
return getNow();
}
if (cause instanceof CancellationException) {
throw (CancellationException) cause;
}
throw new ExecutionException(cause);
}
throw new TimeoutException();
}
}

DefaultPromise

io.netty.util.concurrent.DefaultPromise
主要有两方面:

  1. listener 的添加及通知(addListener、notifyListeners)
  2. 线程的等待及唤醒:DefaultPromise 底层,是通过 Object#wait()、Object#notifyAll() 实现的,锁是当前 DefaultPromise 实例。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {

// listener 的 operationComplete 方法中,可以再次使用 future.addListener() 继续添加 listener
// 可以嵌套的Listener的最大层数,可见最大值为8
private static final int MAX_LISTENER_STACK_DEPTH = Math.min(8,
SystemPropertyUtil.getInt("io.netty.defaultPromise.maxListenerStackDepth", 8));

// 使用底层 Unsafe,保证更新 volatile Object result 操作的原子性(volatile 只能保证可见行)
private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");

private static final Object SUCCESS = new Object();
private static final Object UNCANCELLABLE = new Object();
private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(ThrowableUtil.unknownStackTrace(
new CancellationException(), DefaultPromise.class, "cancel(...)"));

private volatile Object result;

// listeners 是 Object 类型:之所以这样设计,是因为大多数情况下listener只有一个,用集合/数组会造成浪费
/**
* One or more listeners. Can be a {@link GenericFutureListener} or a {@link DefaultFutureListeners}.
* If {@code null}, it means either 1) no listeners were added yet or 2) all listeners were notified.
*
* Threading - synchronized(this). We must support adding listeners when there is no EventExecutor.
*/
private Object listeners;
/**
* 阻塞等待该结果的线程数
* Threading - synchronized(this). We are required to hold the monitor to use Java's underlying wait()/notifyAll().
*/
private short waiters;
/**
* 通知正在进行标识
* Threading - synchronized(this). We must prevent concurrent notification and FIFO listener notification if the executor changes.
*/
private boolean notifyingListeners;

/**
* Get the executor used to notify listeners when this promise is complete.
*/
private final EventExecutor executor;

protected EventExecutor executor() {
return executor;
}

public DefaultPromise(EventExecutor executor) {
this.executor = checkNotNull(executor, "executor");
}

// only for subclasses
protected DefaultPromise() {
executor = null;
}

// ========== add/remove listener

@Override
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
checkNotNull(listener, "listener");

// 线程安全地添加 listener
synchronized (this) {
addListener0(listener);
}

if (isDone()) {
notifyListeners();
}

return this;
}

@Override
public Promise<V> removeListener(final GenericFutureListener<? extends Future<? super V>> listener) {
checkNotNull(listener, "listener");

// 线程安全地删除 listener
synchronized (this) {
removeListener0(listener);
}

return this;
}

private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
if (listeners == null) {
listeners = listener;
} else if (listeners instanceof DefaultFutureListeners) {
((DefaultFutureListeners) listeners).add(listener);
} else {
listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener);
}
}

private void removeListener0(GenericFutureListener<? extends Future<? super V>> listener) {
if (listeners instanceof DefaultFutureListeners) {
((DefaultFutureListeners) listeners).remove(listener);
} else if (listeners == listener) {
listeners = null;
}
}

// ========== set status/result

@Override
public Promise<V> setSuccess(V result) {
if (setSuccess0(result)) {
notifyListeners();
return this;
}
throw new IllegalStateException("complete already: " + this);
}

@Override
public boolean trySuccess(V result) {
if (setSuccess0(result)) {
notifyListeners();
return true;
}
return false;
}

private boolean setSuccess0(V result) {
return setValue0(result == null ? SUCCESS : result);
}

@Override
public Promise<V> setFailure(Throwable cause) {
if (setFailure0(cause)) {
notifyListeners();
return this;
}
throw new IllegalStateException("complete already: " + this, cause);
}

@Override
public boolean tryFailure(Throwable cause) {
if (setFailure0(cause)) {
notifyListeners();
return true;
}
return false;
}

private boolean setFailure0(Throwable cause) {
return setValue0(new CauseHolder(checkNotNull(cause, "cause")));
}

private boolean setValue0(Object objResult) {
// 当前结果为 null 或 UNCANCELLABLE 时,才可设置,一旦设置 OK,后续再设置肯定不成功
if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
checkNotifyWaiters();
return true;
}
return false;
}

// synchronized 持有当前 DefaultPromise 实例锁,与 wait() 一致
private synchronized void checkNotifyWaiters() {
if (waiters > 0) {
// 确实有 waiting 等待的线程,才 notifyAll
notifyAll();
}
}

@Override
public boolean setUncancellable() {
if (RESULT_UPDATER.compareAndSet(this, null, UNCANCELLABLE)) {
return true;
}
Object result = this.result;
return !isDone0(result) || !isCancelled0(result);
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (RESULT_UPDATER.compareAndSet(this, null, CANCELLATION_CAUSE_HOLDER)) {
checkNotifyWaiters();
notifyListeners();
return true;
}
return false;
}

// ========== get status/result

@Override
public boolean isSuccess() {
Object result = this.result;
return result != null && result != UNCANCELLABLE && !(result instanceof CauseHolder);
}

@Override
public boolean isCancellable() {
return result == null;
}

@Override
public Throwable cause() {
Object result = this.result;
return (result instanceof CauseHolder) ? ((CauseHolder) result).cause : null;
}

@Override
public boolean isCancelled() {
return isCancelled0(result);
}

@Override
public boolean isDone() {
return isDone0(result);
}

private static boolean isCancelled0(Object result) {
return result instanceof CauseHolder && ((CauseHolder) result).cause instanceof CancellationException;
}

private static boolean isDone0(Object result) {
return result != null && result != UNCANCELLABLE;
}

@Override
public V getNow() {
Object result = this.result;
if (result instanceof CauseHolder || result == SUCCESS || result == UNCANCELLABLE) {
return null;
}
return (V) result;
}

// ========== sync/await

@Override
public Promise<V> sync() throws InterruptedException {
await();
rethrowIfFailed();
return this;
}

private void rethrowIfFailed() {
Throwable cause = cause();
if (cause == null) {
return;
}
PlatformDependent.throwException(cause);
}

@Override
public Promise<V> await() throws InterruptedException {
// 异步操作已经完成,直接返回
if (isDone()) {
return this;
}

if (Thread.interrupted()) {
throw new InterruptedException(toString());
}

// 死锁检测: 不能在执行异步操作的线程里,执行 await() 方法
checkDeadLock();

// 持有当前 DefaultPromise 实例锁,然后把当前线程添加到锁的 wait-set 中
synchronized (this) {
// 使用 while 不用 if,是为了防止被意外唤醒(其它地方的 notifyAll)
// while 保证唤醒后继续判断 wait 条件 !isDone()
while (!isDone()) {
incWaiters();
try {
wait();
} finally {
decWaiters();
}
}
}
return this;
}

// 不能在执行异步操作的线程里,执行 await() 方法
// 检查当前调用方法的线程是不是执行器的线程如果是则说明发生了死锁需要抛出异常停止死锁操作
protected void checkDeadLock() {
EventExecutor e = executor();
if (e != null && e.inEventLoop()) {
throw new BlockingOperationException(toString());
}
}

private void incWaiters() {
if (waiters == Short.MAX_VALUE) {
throw new IllegalStateException("too many waiters: " + this);
}
++waiters;
}

private void decWaiters() {
--waiters;
}

// ========== notify listeners

private void notifyListeners() {
EventExecutor executor = executor();

// 设置的用于 notify listeners 的线程,就是当前异步操作的线程,直接执行
if (executor.inEventLoop()) {
final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
final int stackDepth = threadLocals.futureListenerStackDepth();
if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
// 执行前增加嵌套层数
threadLocals.setFutureListenerStackDepth(stackDepth + 1);
try {
notifyListenersNow();
} finally {
// 执行完毕,无论成功与否,都要回滚嵌套层数
threadLocals.setFutureListenerStackDepth(stackDepth);
}
return;
}
}

// 设置的用于 notify listeners 的线程,是其它线程,则将它加到该线程的执行任务列表中
safeExecute(executor, new Runnable() {
@Override
public void run() {
notifyListenersNow();
}
});
}

private void notifyListenersNow() {
Object listeners;
// 其它线程可能会执行添加 listener 操作
synchronized (this) {
// Only proceed if there are listeners to notify and we are not already notifying listeners.
if (notifyingListeners || this.listeners == null) {
return;
}
notifyingListeners = true;
listeners = this.listeners;
this.listeners = null;
}
for (;;) {
if (listeners instanceof DefaultFutureListeners) {
notifyListeners0((DefaultFutureListeners) listeners);
} else {
notifyListener0(this, (GenericFutureListener<?>) listeners);
}
synchronized (this) {
if (this.listeners == null) {
// Nothing can throw from within this method,
// so setting notifyingListeners back to false does not need to be in a finally block.
notifyingListeners = false;
return;
}
listeners = this.listeners;
this.listeners = null;
}
}
}

private void notifyListeners0(DefaultFutureListeners listeners) {
GenericFutureListener<?>[] a = listeners.listeners();
int size = listeners.size();
for (int i = 0; i < size; i ++) {
notifyListener0(this, a[i]);
}
}

@SuppressWarnings({ "unchecked", "rawtypes" })
private static void notifyListener0(Future future, GenericFutureListener l) {
try {
l.operationComplete(future);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
}
}
}


private static final class CauseHolder {
final Throwable cause;
CauseHolder(Throwable cause) {
this.cause = cause;
}
}

private static void safeExecute(EventExecutor executor, Runnable task) {
try {
executor.execute(task);
} catch (Throwable t) {
rejectedExecutionLogger.error("Failed to submit a listener notification task. Event loop shut down?", t);
}
}
}

ChannelFuture、ChannelPromise

ChannelFuture、ChannelPromise 扩展,主要是为了获取关联的 Channel 对象,用于异步处理 Channel 中的事件。

ChannelFuture

io.netty.channel.ChannelFuture

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public interface ChannelFuture extends Future<Void> {

/**
* Returns a channel where the I/O operation associated with this future takes place.
*/
Channel channel();

/**
* Returns {@code true} if this {@link ChannelFuture} is a void future
* and so not allow to call any of the following methods:
* <ul>
* <li>{@link #addListener(GenericFutureListener)}</li>
* <li>{@link #await()}</li>
* <li>{@link #sync()}</li>
* </ul>
*/
boolean isVoid();

// 覆写了 Future 部分方法,返回值限定为 ChannelFuture

@Override
ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);

@Override
ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener);

@Override
ChannelFuture sync() throws InterruptedException;

@Override
ChannelFuture await() throws InterruptedException;

}

ChannelPromise

io.netty.channel.ChannelPromise

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* Special {@link ChannelFuture} which is writable.
*/
public interface ChannelPromise extends ChannelFuture, Promise<Void> {

/**
* Returns a new {@link ChannelPromise} if {@link #isVoid()} returns {@code true} otherwise itself.
*/
ChannelPromise unvoid();

ChannelPromise setSuccess();

boolean trySuccess();

@Override
Channel channel();

// 覆写了 ChannelFuture, Promise<Void> 部分方法,返回值限定为 ChannelPromise
@Override
ChannelPromise setSuccess(Void result);
@Override
ChannelPromise setFailure(Throwable cause);
@Override
ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelPromise sync() throws InterruptedException;
@Override
ChannelPromise await() throws InterruptedException;
}

示例

以最常见的关闭等待操作为例

1
2
3
4
// Bind and start to accept incoming connections.
ChannelFuture channelFuture = serverBoot.bind("127.0.0.1", PORT).sync();
// Wait until the server socket is closed.
channelFuture.channel().closeFuture().sync();

  1. channelFuture.channel() 获取关联的 channel

  2. NioServerSocketChannel 的 closeFuture(),继承的 AbstractChannel 方法

    1
    2
    3
    4
    5
    6
    private final CloseFuture closeFuture = new CloseFuture(this);

    @Override
    public ChannelFuture closeFuture() {
    return closeFuture;
    }
  3. 最后执行的实际是 CloseFuture#sync(),当前线程 (main) 阻塞在 CloseFuture 上
    看看线程信息,确实是这样的

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    $ jps
    64928 EchoServer

    $ jstack -l 64928
    ...
    "main" #1 prio=5 os_prio=31 tid=0x00007fef4f809000 nid=0x1803 in Object.wait() [0x000070000bc30000]
    java.lang.Thread.State: WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    - waiting on <0x00000007960ff6f0> (a io.netty.channel.AbstractChannel$CloseFuture)
    at java.lang.Object.wait(Object.java:502)
    at io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:231)
    - locked <0x00000007960ff6f0> (a io.netty.channel.AbstractChannel$CloseFuture)
    at io.netty.channel.DefaultChannelPromise.await(DefaultChannelPromise.java:131)
    at io.netty.channel.DefaultChannelPromise.await(DefaultChannelPromise.java:30)
    at io.netty.util.concurrent.DefaultPromise.sync(DefaultPromise.java:337)
    at io.netty.channel.DefaultChannelPromise.sync(DefaultChannelPromise.java:119)
    at io.netty.channel.DefaultChannelPromise.sync(DefaultChannelPromise.java:30)
    at org.wangming.demo.netty.echo.EchoServer.main(EchoServer.java:56)

    Locked ownable synchronizers:
    - None
    ...

那么,什么时候 main 线程会被唤醒继续执行呢?

1
NioServerSocketChannel#close() -> DefaultChannelPipeline#close() -> TailContext#close() -> HeadContext#close() -> AbstractUnsafe#close(...)

io.netty.channel.AbstractChannel.AbstractUnsafe#close(channelPromise)

1
2
3
4
5
@Override
public final void close(final ChannelPromise promise) {
assertEventLoop();
close(promise, CLOSE_CLOSED_CHANNEL_EXCEPTION, CLOSE_CLOSED_CHANNEL_EXCEPTION, false);
}

io.netty.channel.AbstractChannel.AbstractUnsafe#close(…)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private void close(final ChannelPromise promise, final Throwable cause,
final ClosedChannelException closeCause, final boolean notify) {
// ...

// Close the channel and fail the queued messages in all cases.
doClose0(promise);

// ...
}

private void doClose0(ChannelPromise promise) {
try {
doClose();
closeFuture.setClosed();
safeSetSuccess(promise);
} catch (Throwable t) {
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}

CloseFuture#setClosed() 会执行 trySuccess(),这样就会在 CloseFuture 对象上执行 notifyAll,唤醒 main 线程。

1
2
3
boolean setClosed() {
return super.trySuccess();
}

其它异步编程框架

  1. JDK 8 新加入的 CompletableFuture
    实现了分阶段的异步编程,使分阶段的异步调用扁平化(类似于 ES6 的 Promise),避免了 pyramid of doom(代码嵌套层次太深、可读性差)
  2. RxJava
  3. Guava ListenableFuture

参考

本文参考了互联网上大家的分享,就不一一列举,在此一并谢过。
也希望本文,能对大家有所帮助,若有错误,还请谅解、指正。