NIO 概览

摘要:
IO模型、NIO 示例、Reactor、优劣

Unix/Linux IO

服务端处理网络请求

服务端处理网络请求的典型过程:
服务端处理网络请求流程图

一个输入操作通常包括两个不同的阶段:

  • 等待数据准备好
  • 从内核向用户进程复制数据

第一步通常涉及等待数据从网络中到达。当所等待分组到达时,它被复制到内核中的某个缓冲区。
第二步就是把数据从内核缓冲区复制到应用进程缓冲区

Intel CPU 提供Ring0-Ring3四种级别的运行模式,Ring0级别最高,Ring3最低。Linux使用了Ring3级别运行用户态,Ring0作为内核态。Ring3状态不能访问Ring0的地址空间,包括代码和数据。因此用户态是没有权限去操作内核态的资源的,它只能通过系统调用外完成用户态到内核态的切换,然后在完成相关操作后再有内核态切换回用户态。

应用程序在系统调用完成上面2步操作时,调用方式的阻塞、非阻塞,操作系统在处理应用程序请求时处理方式的同步、异步处理的不同,参考 《UNIX网络编程.卷1》,可以分为5种I/O模型

IO模型

1. 阻塞式I/O模型(blocking I/O)

blocking-io

在阻塞式I/O模型中,应用程序在从调用recvfrom开始到它返回有数据报准备好这段时间是阻塞的,recvfrom返回成功后,应用进程开始处理数据报。

比喻 一个人在钓鱼,当没鱼上钩时,就坐在岸边一直等
优点 线程模型简单,方便开发
缺点 每个连接需要独立的进程/线程单独处理,当并发请求量大时为了维护程序,内存、线程频繁切换开销较大

2. 非阻塞式I/O模型(non-blocking I/O)

nonblocking-io

在非阻塞式I/O模型中,应用程序把一个套接字设置为非阻塞就是告诉内核,当所请求的I/O操作无法完成时,不要将进程睡眠,而是返回一个错误,应用程序基于I/O操作函数将不断的轮询数据是否已经准备好,如果没有准备好,继续轮询,直到数据准备好为止。

比喻 边钓鱼边玩手机,隔会再看看有没有鱼上钩,有的话就迅速拉杆
优点 不会阻塞在内核的等待数据过程,每次发起的I/O请求可以立即返回,不用阻塞等待,实时性较好
缺点 轮询将会不断地询问内核,这将占用大量的CPU时间,系统资源利用率较低,所以一般情况不会使用这种I/O模型

3. I/O复用模型(I/O multiplexing)

multiplexing-io

在I/O复用模型中,select 系统调用也会使进程阻塞,但是和阻塞I/O所不同的的,阻塞在 select 系统调用上,等待多个套接字中的任一个变为可读,而不是阻塞在真正的 I/O 系统调用 recvfrom 上。

比喻 放了一堆鱼竿,在岸边一直守着这堆鱼竿,直到有鱼上钩
优点 可以基于一个阻塞对象,同时在多个描述符上等待就绪,而不是使用多个线程(每个文件描述符一个线程),这样可以大大节省系统资源
缺点 当连接数较少时效率相比多线程+阻塞I/O模型效率较低,可能延迟更大,因为单个连接处理需要2次系统调用 (select + recvfrom),占用时间会有增加

疑问 ?
I/O 复用模型,I/O 本身的调用是非阻塞的吧!

scalable I/O event notification mechanism:

  • POSIX: select, poll
  • Linux: epoll
  • FreeBSD: kqueue (macOS)

select:上世纪 80 年代就实现了,它支持注册 FD_SETSIZE(1024) 个 socket,在那个年代肯定是够用的,现在肯定是不行的。

poll:1997 年,出现了 poll 作为 select 的替代者,最大的区别就是,poll 不再限制 socket 数量。

select 和 poll 都有一个共同的问题,那就是它们都只会告诉你有几个通道准备好了,但是不会告诉你具体是哪几个通道。所以,一旦知道有通道准备好以后,自己还是需要进行一次扫描,显然这个不太好,通道少的时候还行,一旦通道的数量是几十万个以上的时候,扫描一次的时间都很可观了,时间复杂度 O(n)。所以,后来才催生了以下实现。

epoll:2002 年随 Linux 内核 2.5.44 发布,epoll 能直接返回具体的准备好的通道,时间复杂度 O(1)。

除了 Linux 中的 epoll,2000 年 FreeBSD 出现了 Kqueue,还有就是,Solaris 中有 /dev/poll。

Windows 平台的非阻塞 IO 使用的是 select,但是 Windows 中 IOCP 提供的异步 IO 非常强大。

4. 信号驱动式I/O模型(signal-driven I/O) [ˈdrɪvn]

signal-driven-io

应用程序首先开启套接字的信号驱动式I/O功能,并通过sigaction系统调用安装一个信号处理函数,该系统调用将立即返回,进程继续运行并不阻塞。当数据准备好时,进程会收到内核产生的一个SIGIO信号,随后可以在信号处理函数中调用I/O操作函数 recvfrom 读取数据。

比喻 鱼竿上系了个铃铛,当铃铛响,就知道鱼上钩,然后可以专心玩手机

5. 异步I/O模型(asynchronous I/O)

asyn-io

由POSIX规范定义,应用程序告知内核启动某个操作,并让内核在整个操作(包括将数据从内核复制到应用程序自己的缓冲区)完成后通知应用程序。这种模型与信号驱动模型的主要区别在于:信号驱动I/O是由内核通知应用程序何时启动一个I/O操作,而异步I/O模型是由内核通知应用程序I/O操作何时完成。

优点 系统资源利用率高
缺点 要实现真正的异步 I/O,操作系统需要做大量的工作。目前 Windows 下通过 IOCP 实现了真正的异步 I/O,而在 Linux 系统下,Linux2.6才引入,目前 AIO 并不完善,因此在 Linux 下实现高并发网络编程时都是以 IO复用模型模式为主。

5种I/O模型总结

io-comparison

POSIX 对同步I/O、异步I/O 这两个术语的定义如下:

  • A synchronous I/O operation causes the requesting process to be blocked until that I/O operation completes. 同步I/O在直到 I/O 操作完成期间,会导致请求进程阻塞
  • An asynchronous I/O operation does not cause the requesting process to be blocked. 异步I/O在直到 I/O 操作完成期间,都不会导致请求进程阻塞

从上图中我们可以看出,可以看出,越往后,阻塞越少,理论上效率也是最优。其五种I/O模型中,前四种属于同步I/O,因为其中真正的I/O操作(recvfrom)将阻塞进程/线程,只有异步I/O模型才于POSIX定义的异步I/O相匹配。

I/O 复用模型 与 AIO 相比,就是多了一层从内核copy数据到应用空间的阻塞,从而不能算作asynchronous I/O类。但是,这层小小的阻塞无足轻重。

NIO

官方称之为 New IO,也有人称之为 Non-Blocking IO。
NIO 对应的是上面 I/O复用模型(I/O multiplexing)。
有些人喜欢将 NIO 称为 异步非阻塞 IO,但是如果严格按照 POSIX 定义,它并不是异步 IO。
不过也不必纠结于术语,知道其中的道理就好。

NIO vs IO

  • 标准 IO 基于 流 (Stream) 进行操作;NIO 是基于 通道 (Channel) 进行操作的。
  • Channel 是双向的,既可以写数据到通道,又可以从通道中读取数据,它能更好地反映出底层操作系统的真实情况(Linux 底层网络 IO 就是双向的);而流的读写只能是单向的,要么是输入流,要么是输出流,不能既是输入流又是输出流。
  • NIO能够实现阻塞/非阻塞的网络通信,而IO只能实现阻塞式的网络通信。

示例

NioServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
public class NioServer {

private static final ByteBuffer buffer = ByteBuffer.allocate(8 * 1024);

public static void main(String[] args) throws IOException {

// 创建 Selector (选择器/多路复用器)
Selector selector = Selector.open();

// 创建 ServerSocketChannel
ServerSocketChannel serverChannel = ServerSocketChannel.open();
// 绑定本机监听端口
serverChannel.socket().bind(new InetSocketAddress(7788));
// 设置为非阻塞模式
serverChannel.configureBlocking(false);
// serverChannel 注册 OP_ACCEPT 事件到 selector
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.printf("[%s] Server startup!\n", Thread.currentThread().getName());

boolean stopped = false;
while (!stopped) {
int selectedNum = selector.select(1000);
if (selectedNum < 1) {
continue;
}

// 遍历 selectedKeys
Set<SelectionKey> selected = selector.selectedKeys();
Iterator<SelectionKey> iter = selected.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
// key 处理了,要从 selectedKeys 中移除
iter.remove();

if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel clientChannel = server.accept();
// 设置为非阻塞模式
clientChannel.configureBlocking(false);
// 注册 OP_READ 事件到 selector
clientChannel.register(selector, SelectionKey.OP_READ);
System.out.printf("[%s] accept a new client socket connection: %s\n",
Thread.currentThread().getName(), clientChannel.getRemoteAddress());

ByteBuffer writeBuffer = ByteBuffer.wrap("你好, I'm server!".getBytes(StandardCharsets.UTF_8));
clientChannel.write(writeBuffer);
}

if (key.isReadable()) {
SocketChannel clientChannel = (SocketChannel) key.channel();
buffer.clear();
int bytesRead = clientChannel.read(buffer);
if (bytesRead > 0) {
String body = new String(buffer.array(), 0, bytesRead, StandardCharsets.UTF_8);
System.out.printf("[%s] 收到来自 client (%s) 的数据: %s\n",
Thread.currentThread().getName(), clientChannel.getRemoteAddress(), body);
buffer.flip();
clientChannel.write(buffer);

} else if (bytesRead == 0) {// 读到 0 字节, ignore

} else {// EOF
System.out.printf("[%s] client(%s) EOF! bytesRead=%s\n",
Thread.currentThread().getName(), clientChannel.getRemoteAddress(), bytesRead);
key.cancel();
clientChannel.close();
}
}
}
}

selector.close();
}
}

client

1
$ nc 127.0.0.1 7788

NioClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
public class NioClient {

public static void main(String[] args) throws IOException {

Selector selector = Selector.open();
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);

// If this channel is in non-blocking mode then an invocation of this method initiates a non-blocking connection operation.
// If the connection is established immediately, as can happen with a local connection, then this method returns true.
// Otherwise this method returns false and the connection operation must later be completed by invoking the #finishConnect method.
boolean connected = socketChannel.connect(new InetSocketAddress("127.0.0.1", 7788));
System.out.printf("[%s] connected to 127.0.0.1:7788? %s\n", Thread.currentThread().getName(), connected);
if (connected) {
socketChannel.register(selector, SelectionKey.OP_READ);
} else {
socketChannel.register(selector, SelectionKey.OP_CONNECT);
}

boolean stopped = false;
while (!stopped) {
int selectedNum = selector.select(1000);
if (selectedNum < 1) {
continue;
}

Set<SelectionKey> selected = selector.selectedKeys();
Iterator<SelectionKey> iter = selected.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();

if (key.isConnectable()) {
SocketChannel sc = (SocketChannel) key.channel();
boolean finished = sc.finishConnect();
if (finished) {
System.out.printf("[%s] %s -> %s: 连接成功.\n",
Thread.currentThread().getName(), sc.getLocalAddress(), sc.getRemoteAddress());
sc.register(selector, SelectionKey.OP_READ);
} else {
System.out.printf("[%s] %s -> %s: 连接失败!\n",
Thread.currentThread().getName(), sc.getLocalAddress(), sc.getRemoteAddress());
key.cancel();
sc.close();
}
}

if (key.isReadable()) {
SocketChannel clientChannel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(8 * 1024);
int bytesRead = clientChannel.read(buffer);
if (bytesRead > 0) {
String body = new String(buffer.array(), 0, bytesRead, StandardCharsets.UTF_8);
System.out.printf("[%s] 收到来自 server (%s) 的数据: %s\n",
Thread.currentThread().getName(), clientChannel.getRemoteAddress(), body);

String now = "你好, 现在时间: " + new Date();
ByteBuffer writeBuffer = ByteBuffer.wrap(now.getBytes(StandardCharsets.UTF_8));
clientChannel.write(writeBuffer);

} else if (bytesRead == 0) {// 读到 0 字节, ignore

} else {// EOF
System.out.printf("[%s] server(%s) EOF! bytesRead=%s\n",
Thread.currentThread().getName(), clientChannel.getRemoteAddress(), bytesRead);
key.cancel();
clientChannel.close();
}
}
}
}

selector.close();
}
}

NIO API

Buffer

一个 Buffer 本质上是内存中的一块,我们可以将数据写入这块内存,之后从这块内存获取数据。
NIO 通道 只能与 Buffer 进行交互。数据总是从通道读取到缓冲区,或者从缓冲区写入到通道中。

属性:
capacity:容量,永远不会变化
limit:不应该被读或写的第一个元素的索引
position:下一个将要被读或写的元素的索引

Buffer-hierarchy.png

java.nio.Buffer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
public abstract class Buffer {

// Invariants: mark <= position <= limit <= capacity
private int mark = -1;
private int position = 0;
private int limit;
private int capacity;

public final int capacity() {
return capacity;
}

public final int position() {
return position;
}

public final Buffer position(int newPosition) {
if ((newPosition > limit) || (newPosition < 0))
throw new IllegalArgumentException();
position = newPosition;
if (mark > position) mark = -1;
return this;
}

public final int limit() {
return limit;
}

public final Buffer limit(int newLimit) {
if ((newLimit > capacity) || (newLimit < 0))
throw new IllegalArgumentException();
limit = newLimit;
if (position > limit) position = limit;
if (mark > limit) mark = -1;
return this;
}

public final Buffer mark() {
mark = position;
return this;
}

public final Buffer flip() {
limit = position;
position = 0;
mark = -1;
return this;
}

public final Buffer reset() {
int m = mark;
if (m < 0)
throw new InvalidMarkException();
position = m;
return this;
}

public final Buffer clear() {
position = 0;
limit = capacity;
mark = -1;
return this;
}

public final Buffer rewind() {
position = 0;
mark = -1;
return this;
}

public final int remaining() {
return limit - position;
}

public final boolean hasRemaining() {
return position < limit;
}

// ...
}

java.nio.ByteBuffer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
public abstract class ByteBuffer extends Buffer
implements Comparable<ByteBuffer> {

// These fields are declared here rather than in Heap-X-Buffer in order to
// reduce the number of virtual method invocations needed to access these
// values, which is especially costly when coding small buffers.
final byte[] hb; // Non-null only for heap buffers
final int offset;
boolean isReadOnly; // Valid only for heap buffers

ByteBuffer(int mark, int pos, int lim, int cap, // package-private
byte[] hb, int offset) {
super(mark, pos, lim, cap);
this.hb = hb;
this.offset = offset;
}

ByteBuffer(int mark, int pos, int lim, int cap) { // package-private
this(mark, pos, lim, cap, null, 0);
}

public static ByteBuffer allocate(int capacity) {
if (capacity < 0)
throw new IllegalArgumentException();
return new HeapByteBuffer(capacity, capacity);
}

public static ByteBuffer allocateDirect(int capacity) {
return new DirectByteBuffer(capacity);
}

public static ByteBuffer wrap(byte[] array, int offset, int length) {
try {
return new HeapByteBuffer(array, offset, length);
} catch (IllegalArgumentException x) {
throw new IndexOutOfBoundsException();
}
}

public static ByteBuffer wrap(byte[] array) {
return wrap(array, 0, array.length);
}

public abstract ByteBuffer slice();

public abstract ByteBuffer duplicate();

public abstract ByteBuffer compact();

public abstract boolean isDirect();

public abstract ByteBuffer asReadOnlyBuffer();


// -- Singleton get/put methods --

public abstract byte get();
public abstract ByteBuffer put(byte b);
public abstract byte get(int index);
public abstract ByteBuffer put(int index, byte b);


// -- Bulk get operations --

public ByteBuffer get(byte[] dst, int offset, int length) {
checkBounds(offset, length, dst.length);
if (length > remaining())
throw new BufferUnderflowException();
int end = offset + length;
for (int i = offset; i < end; i++)
dst[i] = get();
return this;
}

public ByteBuffer get(byte[] dst) {
return get(dst, 0, dst.length);
}


// -- Bulk put operations --

public ByteBuffer put(ByteBuffer src) {
if (src == this)
throw new IllegalArgumentException();
int n = src.remaining();
if (n > remaining())
throw new BufferOverflowException();
for (int i = 0; i < n; i++)
put(src.get());
return this;
}

public ByteBuffer put(byte[] src, int offset, int length) {
checkBounds(offset, length, src.length);
if (length > remaining())
throw new BufferOverflowException();
int end = offset + length;
for (int i = offset; i < end; i++)
this.put(src[i]);
return this;
}

public final ByteBuffer put(byte[] src) {
return put(src, 0, src.length);
}


// -- Other stuff --

public final boolean hasArray() {
return (hb != null) && !isReadOnly;
}

public final byte[] array() {
if (hb == null)
throw new UnsupportedOperationException();
if (isReadOnly)
throw new ReadOnlyBufferException();
return hb;
}

public final int arrayOffset() {
if (hb == null)
throw new UnsupportedOperationException();
if (isReadOnly)
throw new ReadOnlyBufferException();
return offset;
}

// -- Other byte stuff: Access to binary data --

boolean bigEndian // package-private
= true;
boolean nativeByteOrder // package-private
= (Bits.byteOrder() == ByteOrder.BIG_ENDIAN);

public final ByteOrder order() {
return bigEndian ? ByteOrder.BIG_ENDIAN : ByteOrder.LITTLE_ENDIAN;
}

public final ByteBuffer order(ByteOrder bo) {
bigEndian = (bo == ByteOrder.BIG_ENDIAN);
nativeByteOrder =
(bigEndian == (Bits.byteOrder() == ByteOrder.BIG_ENDIAN));
return this;
}

public abstract int getInt();
public abstract ByteBuffer putInt(int value);
public abstract int getInt(int index);
public abstract ByteBuffer putInt(int index, int value);

public abstract char getChar();
public abstract ByteBuffer putChar(char value);
public abstract char getChar(int index);
public abstract ByteBuffer putChar(int index, char value);

public abstract short getShort();
public abstract ByteBuffer putShort(short value);
public abstract short getShort(int index);
public abstract ByteBuffer putShort(int index, short value);

public abstract long getLong();
public abstract ByteBuffer putLong(long value);
public abstract long getLong(int index);
public abstract ByteBuffer putLong(int index, long value);

public abstract float getFloat();
public abstract ByteBuffer putFloat(float value);
public abstract float getFloat(int index);
public abstract ByteBuffer putFloat(int index, float value);

public abstract double getDouble();
public abstract ByteBuffer putDouble(double value);
public abstract double getDouble(int index);
public abstract ByteBuffer putDouble(int index, double value);

}

Selector

SelectionKey

SocketChannel

ServerSocketChannel

Reactor

Event-driven processing pattern: Reactor, Proactor
Scalable IO in Java

Reactor

  • Reactor responds to IO events by dispatching the appropriate handler
  • Handlers perform non-blocking actions
  • Manage by binding handlers to events

在中,Reactor等待某个事件或者可应用或个操作的状态发生(比如文件描述符可读写,或者是socket可读写),然后把这个事件传给事先注册的Handler(事件处理函数或者回调函数),由后者来做实际的读写操作,其中的读写操作都需要应用程序同步操作,所以Reactor是非阻塞同步网络模型。

如果把I/O操作改为异步,即交给操作系统来完成能进一步提升性能,这就是异步网络模型Proactor。

single threaded version

single threaded version

  • Basic Reactor Design
    • single threaded version

示例即是这一种,单线程管天下

后面两种 Multithreaded Designs
Strategically add threads for scalability
Mainly applicable to multiprocessors

worker thread pools

worker thread pools

  • Worker Threads
    • Offload non-IO processing to speed up Reactor thread
      • Similar to POSA2 Proactor designs

multiple reactor threads

multiple reactor threads

  • Using Reactor Pools
    • Load-balance to match CPU and IO rates
    • Static or dynamic construction
      • Each with own Selector, Thread, dispatch loop
    • Main acceptor distributes load to other reactors

Java 原生 NIO 编程的缺点

  1. NIO的类库和API繁杂,使用麻烦,你需要熟练掌握Selector、ServerSocketChannel、SocketChannel、ByteBuffer等。
  2. 需要具备其他的额外技能做铺垫,例如熟悉Java多线程编程。这是因为NIO编程涉及到Reactor模式,你必须对多线程和网路编程非常熟悉,才能编写出高质量的NIO程序。
  3. 可靠性能力补齐,工作量和难度都非常大。例如客户端面临断连重连、网络闪断、粘包拆包、失败缓存、网络拥塞和异常码流的处理等问题,NIO编程的特点是功能开发相对容易,但是可靠性能力补齐的工作量和难度都非常大。
  4. JDK NIO的BUG,例如臭名昭著的epoll bug,它会导致Selector空轮询,最终导致CPU 100%。官方声称在JDK1.6版本的update18修复了该问题,但是直到JDK1.7版本该问题仍旧存在,只不过该BUG发生概率降低了一些而已,它并没有被根本解决。该BUG以及与该BUG相关的问题单可以参见以下链接内容。
    http://bugs.java.com/bugdatabase/view_bug.do?bug_id=6403933
    http://bugs.java.com/bugdatabase/view_bug.do?bug_id=2147719

示例中,只是 NIO 编程的演示代码,缺少太多功能:

  • 没有实现 multiple reactor pattern
  • 无论是 client 还是 server 都不能主动发送消息
  • 没有编码、解码
  • 没有考虑粘包、拆包
  • 没有长连接心跳机制

大家可以试着完成这些功能,将有助于理解 netty 原理,因为有名的 netty 实际是基于 Java 原生 NIO 做了良好的封装,让 NIO 使用更方便、更省心、更健壮、功能更丰富、性能更好。

参考

本文参考了互联网上大家的分享,就不一一列举,在此一并谢过。
也希望本文,能对大家有所帮助,若有错误,还请谅解、指正。