摘要:
Netty 中的所有 IO 操作都是异步的,也就是说任何 IO 调用都将立即返回,返回的 ChannelFuture 实例会在操作完成时,通知用户该操作的状态、结果。
异步调用
我们经常听到同步、异步,为了理解,通常都是通过举例来说明。
比如,
同步调用,好比你在包子店点了一笼小笼包,一直在柜台等到小笼包蒸熟,然后开始吃;
异步调用,好比你在包子店点了一笼小笼包,老板给了你一个小票(凭证),你拿着小票可以去干别的事情,等到差不多时候,你 自己主动 拿着小票去取小笼包来吃(Future),当然还有一种更好的方式是,你的小笼包蒸熟后,老板直接电话 通知 你凭小票来取小笼包(Promise)。
但是好像很少有人给出它们的定义,举例有助于概念理解,定义有助于精确理解。
下面是我个人的一些理解。
什么是同步调用?
- 调用一个接一个执行,上一个调用函数执行完成,下一个调用才能开始执行
- 好处是简单,与人的日常思维习惯相符
那什么是异步调用(asynchronous call)?
- 调用方无须等待调用的函数执行完成,就可继续向下执行
- 好处是适用场景下(阻塞任务、不关心执行结果,处理不确定事件)会非常高效,可以显著提升性能
异步调用与阻塞
- 异步调用与阻塞是两回事
- 大多数情况下,我们使用异步调用,是为了规避阻塞带来的性能降低。
- 非阻塞的调用,我们照样可以使用异步调用,比如统计操作信息不阻塞,我们不关心操作结果,照样使用异步调用实现
异步调用与多线程
- 异步调用与多线程,也是两回事
- 异步调用,绝大多数是通过多线程实现的,方便、逻辑简单
- 单线程照样可以实现异步调用,一般会通过任务队列管理机制实现单线程的异步调用
异步调用与MQ(消息队列)
- 异步调用,原理上与 MQ 是相同的:一个异步调用就相当于发出一个执行特定任务的消息,消息发出去就可以了,无需等到任务完成
- 类似于生产者-消费者模型,通过中间队列,将生产者和消费者关系解耦
- MQ 是进程维度异步调用的一种实现方式
异步调用与时序
- 异步调用,跟时序也是两回事,异步调用需不需要保证时序,要看业务需要
- 业务常常要求具有某种有序关系的异步调用
异步调用与事件驱动
- 事件驱动机制,常常应用于网络编程、前端(桌面 GUI/Web 浏览器: 键盘输入、鼠标点击/移动 …)领域:事件监听器 Listener、事件调度/分发器 Mediator/Dispatcher、事件处理器 Handler
- 事件驱动机制,底层都是使用的异步调用,接收到事件,异步处理事件
- 通过事件驱动机制,异步调用可以很方便实现一串时序逻辑
- 异步调用与事件驱动结合的系统,会表现出一种特殊的、极具价值的行为:系统可以以任意的顺序响应在任意的时间点产生的事件。(引用自 《Netty in Action.v5》)
Netty 是异步的
Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients.
官网介绍,Netty 是异步的。
- 一方面,Netty 使用异步 IO 模型,或者更精确地说是同步非阻塞 IO。这个指的 IO 系统调用层面,IO 读写等待就绪后,才会执行读写,不会阻塞线程,已经在 NIO 里详细说了;
- 另一方面,Netty 中的所有 IO 操作都是异步的。这个指的是用户调用层面,也就是说任何 IO 调用都将立即返回,返回的 ChannelFuture 实例会在操作完成时,通知用户该操作的状态、结果。
Netty Future 体系
Netty 异步 IO 操作,使用 Future、Promise 实现。
Future、Promise 都可以理解为未来任务的一种凭证(期权?),将来任务完成,总是可以通过这个凭证,拿到任务的状态、结果。
Netty 模块化做的特别好,Future、Promise 功能,统一封装在 netty-common 模块里(io.netty.util.concurrent 包),可以独立作为通用库使用。
JDK Future
java.util.concurrent.Future
1 | /** |
JDK Future 示例
1 | public class FutureDemo { |
JDK Future 缺点
仔细思考 JDK Future,会发现两个问题:
只有一个 isDone() 方法,判断异步操作是否完成,但是对于完成的定义过于模糊
正常终止、抛出异常、用户取消,都会使 isDone() 返回 true
实际使用中,常常需要对这三种情况分别处理需要你主动获取操作状态、结果,检查操作是否已经完成,或者一直阻塞直到它完成
对于一个异步操作,我们更关心的是这个异步操作结束后能否再执行一系列动作。
如果是主动获取(拉模式),很难精确掌握获取结果的时间,拉取早了,当前线程阻塞,晚了,后续操作又不及时;
最好是,异步任务执行完了,通知用户(推模式),以便用户继续后面操作。
正是因为 JDK Future 存在上面两个问题,Netty 扩展了自己的 Future。
Future
io.netty.util.concurrent.Future
1 | /** |
Netty Future 解决了 JDK Future 的两个问题:
Future 有两种状态 Uncompleted、Completed,其中 Completed 又有三种状态
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16+---------------------------+
| Completed successfully |
+---------------------------+
+----> isDone() = true |
+--------------------------+ | | isSuccess() = true |
| Uncompleted | | +===========================+
+--------------------------+ | | Completed with failure |
| isDone() = false | | +---------------------------+
| isSuccess() = false |----+----> isDone() = true |
| isCancelled() = false | | | cause() = non-null |
| cause() = null | | +===========================+
+--------------------------+ | | Completed by cancellation |
| +---------------------------+
+----> isDone() = true |
| isCancelled() = true |
+---------------------------+addListener(listener),可以添加操作完成后的回调操作
await()、sync(),可以以阻塞的方式等待异步操作完成
Future 是只读的 read-only,操作的状态、结果是不可变的,只能读取,不能修改,因此 Netty 还提供了可写的 Future,也即 Promise。
Future 示例
1 | public class FutureDemo { |
Promise
io.netty.util.concurrent.Promise
1 | /** |
Promise 示例
1 | public class PromiseDemo { |
先看看 Promise promise = executor.newPromise();
DefaultEventExecutor 类的 structure 里找 newPromise()
方法,
发现是继承 AbstractEventExecutor 类的方法1
2
3
4
public <V> Promise<V> newPromise() {
return new DefaultPromise<V>(this);
}
后面看看 Netty 具体是如何实现 Promise 的。
AbstractFuture
io.netty.util.concurrent.AbstractFuture
组合使用 await(); cause(); getNow() 方法,实现了 get() 方法
1 | public abstract class AbstractFuture<V> implements Future<V> { |
DefaultPromise
io.netty.util.concurrent.DefaultPromise
主要有两方面:
- listener 的添加及通知(addListener、notifyListeners)
- 线程的等待及唤醒:DefaultPromise 底层,是通过 Object#wait()、Object#notifyAll() 实现的,锁是当前 DefaultPromise 实例。
1 | public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> { |
ChannelFuture、ChannelPromise
ChannelFuture、ChannelPromise 扩展,主要是为了获取关联的 Channel 对象,用于异步处理 Channel 中的事件。
ChannelFuture
io.netty.channel.ChannelFuture
1 | public interface ChannelFuture extends Future<Void> { |
ChannelPromise
io.netty.channel.ChannelPromise
1 | /** |
示例
以最常见的关闭等待操作为例1
2
3
4// Bind and start to accept incoming connections.
ChannelFuture channelFuture = serverBoot.bind("127.0.0.1", PORT).sync();
// Wait until the server socket is closed.
channelFuture.channel().closeFuture().sync();
channelFuture.channel()
获取关联的 channelNioServerSocketChannel 的
closeFuture()
,继承的 AbstractChannel 方法1
2
3
4
5
6private final CloseFuture closeFuture = new CloseFuture(this);
public ChannelFuture closeFuture() {
return closeFuture;
}最后执行的实际是 CloseFuture#sync(),当前线程 (main) 阻塞在 CloseFuture 上
看看线程信息,确实是这样的1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22$ jps
64928 EchoServer
$ jstack -l 64928
...
"main" #1 prio=5 os_prio=31 tid=0x00007fef4f809000 nid=0x1803 in Object.wait() [0x000070000bc30000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x00000007960ff6f0> (a io.netty.channel.AbstractChannel$CloseFuture)
at java.lang.Object.wait(Object.java:502)
at io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:231)
- locked <0x00000007960ff6f0> (a io.netty.channel.AbstractChannel$CloseFuture)
at io.netty.channel.DefaultChannelPromise.await(DefaultChannelPromise.java:131)
at io.netty.channel.DefaultChannelPromise.await(DefaultChannelPromise.java:30)
at io.netty.util.concurrent.DefaultPromise.sync(DefaultPromise.java:337)
at io.netty.channel.DefaultChannelPromise.sync(DefaultChannelPromise.java:119)
at io.netty.channel.DefaultChannelPromise.sync(DefaultChannelPromise.java:30)
at org.wangming.demo.netty.echo.EchoServer.main(EchoServer.java:56)
Locked ownable synchronizers:
- None
...
那么,什么时候 main 线程会被唤醒继续执行呢?1
NioServerSocketChannel#close() -> DefaultChannelPipeline#close() -> TailContext#close() -> HeadContext#close() -> AbstractUnsafe#close(...)
io.netty.channel.AbstractChannel.AbstractUnsafe#close(channelPromise)1
2
3
4
5
public final void close(final ChannelPromise promise) {
assertEventLoop();
close(promise, CLOSE_CLOSED_CHANNEL_EXCEPTION, CLOSE_CLOSED_CHANNEL_EXCEPTION, false);
}
io.netty.channel.AbstractChannel.AbstractUnsafe#close(…)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20private void close(final ChannelPromise promise, final Throwable cause,
final ClosedChannelException closeCause, final boolean notify) {
// ...
// Close the channel and fail the queued messages in all cases.
doClose0(promise);
// ...
}
private void doClose0(ChannelPromise promise) {
try {
doClose();
closeFuture.setClosed();
safeSetSuccess(promise);
} catch (Throwable t) {
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
CloseFuture#setClosed() 会执行 trySuccess(),这样就会在 CloseFuture 对象上执行 notifyAll,唤醒 main 线程。1
2
3boolean setClosed() {
return super.trySuccess();
}
其它异步编程框架
- JDK 8 新加入的 CompletableFuture
实现了分阶段的异步编程,使分阶段的异步调用扁平化(类似于 ES6 的 Promise),避免了 pyramid of doom(代码嵌套层次太深、可读性差) - RxJava
- Guava ListenableFuture
参考
本文参考了互联网上大家的分享,就不一一列举,在此一并谢过。
也希望本文,能对大家有所帮助,若有错误,还请谅解、指正。