Netty Pipeline

摘要:

netty 源码版本 io.netty:netty-all:4.1.32.Final

Channel、ChannelPipeline、ChannelHandlerContext、ChannelHandler

Channel

AbstractChannel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {

private final Channel parent;
private final ChannelId id;
private final Unsafe unsafe;
private final DefaultChannelPipeline pipeline;

/**
* Creates a new instance.
*
* @param parent
* the parent of this channel. {@code null} if there's no parent.
*/
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}

/**
* Returns a new {@link DefaultChannelPipeline} instance.
*/
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}

@Override
public ChannelPipeline pipeline() {
return pipeline;
}

// ...
}

ChannelPipeline

ChannelHandlerContext

ChannelHandler

Handles an I/O event or intercepts an I/O operation, and forwards it to its next handler in its ChannelPipeline.

Channel-Pipeline-Context-Handler 关系

Channel-Pipeline-Context-Handler

Channel 和 ChannelPipeline 是一对一的关联关系,而 ChannelPipeline 内部的多个 ChannelHandlerContext 形成了双向链表(链表的头是 HeadContext, 链表的尾是 TailContext),Context 只是对 ChannelHandler 的封装。

Servlet 的 Filter
Spring MVC 的 Intercepter

ChannelPipeline 添加 ChannelHandler

示例

1
2
3
4
5
6
7
8
9
10
ch.pipeline()
// Decoders
.addLast("frameDecoder", new DelimiterBasedFrameDecoder(Commons.MAX_FRAME_LENGTH, Delimiters.lineDelimiter()))
.addLast("stringDecoder", Commons.STRING_DECODER)
// Encoder
.addLast("stringEncoder", Commons.STRING_ENCODER)
// log just for dev/test env, NOT product env
.addLast(Commons.LOGGING_HANDLER_INFO)
// business logic ChannelHandler
.addLast(BUSINESS_EVENT_LOOP_GROUP, "echoServerHandler", ECHO_SERVER_HANDLER);

DefaultChannelPipeline#addLast

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);

newCtx = newContext(group, filterName(name, handler), handler);

addLast0(newCtx);

// If the registered is false it means that the channel was not registered on an eventloop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}

EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}

private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}

我们知道 AbstractChannelHandlerContext 中有 inbound 和 outbound 两个 boolean 变量, 分别用于标识 Context 所对应的 handler 的类型, 即:

inbound 为真时, 表示对应的 ChannelHandler 实现了 ChannelInboundHandler 方法.

outbound 为真时, 表示对应的 ChannelHandler 实现了 ChannelOutboundHandler 方法.

注意, 如果我们捕获了一个事件, 并且想让这个事件继续传递下去, 那么需要调用 Context 相应的传播方法.

pipeline 中的 inbound 事件传播

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
// DefaultChannelPipeline#fireChannelActive
@Override
public final ChannelPipeline fireChannelActive() {
AbstractChannelHandlerContext.invokeChannelActive(head);
return this;
}

// AbstractChannelHandlerContext#invokeChannelActive(AbstractChannelHandlerContext)
// 静态方法调用传进来 context 的 invoker 方法
static void invokeChannelActive(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelActive();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelActive();
}
});
}
}

// invoker 方法内部会调用该 context 所包含的 handler 的真正方法
private void invokeChannelActive() {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelActive(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelActive();
}
}

// DefaultChannelPipeline.HeadContext#channelActive
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {

HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, true, true);
unsafe = pipeline.channel().unsafe();
setAddComplete();
}

// HeadContext 的 handler 就是 this 自己
@Override
public ChannelHandler handler() {
return this;
}

// 关键点,调用 ctx.fireChannelActive() 才能继续调用下一个节点
// handler 调用结束后,如果还需要继续向后传递,就调用 context 的 fire*** 方法
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
readIfIsAutoRead();
}

// ...
}

// AbstractChannelHandlerContext#fireChannelActive
// 找到下一个节点之后,执行 invokeChannelActive(next)
// 递归调用,直到最后一个 inbound 节点,即 tail 节点
@Override
public ChannelHandlerContext fireChannelActive() {
invokeChannelActive(findContextInbound());
return this;
}

// 遍历双向链表的下一个节点,直到下一个节点为 inbound
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!ctx.inbound);
return ctx;
}


// DefaultChannelPipeline.TailContext#channelActive
// Tail 节点的该方法为空,结束调用(Tail 节点的大部分方法体为空,作用即终止事件的传播)
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
onUnhandledInboundChannelActive();
}

protected void onUnhandledInboundChannelActive() {
}

同理,可以分析所有 inbound 事件的传播,正常情况下,即用户如果不覆盖每个节点的事件传播操作,所有 inbound 事件最后都落到 Tail 节点上。

pipeline 中的 outbound 事件传播

参考

本文参考了互联网上大家的分享,就不一一列举,在此一并谢过。
也希望本文,能对大家有所帮助,若有错误,还请谅解、指正。