netty在服务端端口绑定和新连接建立得过程中会建立相应得channel,而与channel得动作密切相关的是pipeline这个概念,pipeline可以看作是一条流水线,字节流输入,加工输出;
pipeline结构
pipeline实际上就是一个由Context组成的双向链表,头部为HeadContext,尾部为TailContext
pipeline 默认结构
每个节点都是一个__ChannelHandlerContext__对象,每个context节点保存了它包裹的执行器__ChannelHandler__执行操作所需要的上下文
pipeline 初始化
pipeline初始化通过AbstractChannel(Channel parent)构造函数->newChannelPipeline()->new DefaultChannelPipeline(this);
| DefaultChannelPipeline 构造函数
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
pipeline 添加节点
常见的客户端代码
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new Spliter())
p.addLast(new Decoder());
p.addLast(new BusinessHandler())
p.addLast(new Encoder());
}
});
- 输入
- spliter将来源TCP数据包拆包
- 对包进行decoder
- 传入业务处理器BusinessHandler
- 业务处理完encoder
- 输出
此时pipeline结构
这里通过两种颜色区分pipeline中两种不同类型的节点,一个是__ChannelInboundHandler__,处理inBound事件,最典型的就是读取数据流,加工处理;还有一种类型的Handler是__ChannelOutboundHandler__, 处理outBound事件,比如当调用__writeAndFlush()__类方法时,就会经过该种类型的handler
判断__Inbound__还是__Outbound__
通过__instanceof__关键字根据接口类型判断
private static boolean isInbound(ChannelHandler handler) {
return handler instanceof ChannelInboundHandler;
}
private static boolean isOutbound(ChannelHandler handler) {
return handler instanceof ChannelOutboundHandler;
}
| DefaultChannelPipeline
@Override
public final ChannelPipeline addLast(ChannelHandler... handlers) {
return addLast(null, handlers);
}
@Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
for (ChannelHandler h: handlers) {
addLast(executor, null, h);
}
return this;
}
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
// 1.检查是否有重复handler
checkMultiplicity(handler);
// 2.创建节点
newCtx = newContext(group, filterName(name, handler), handler);
// 3.添加节点
addLast0(newCtx);
}
// 4.回调用户方法
callHandlerAdded0(handler);
return this;
}
这里着重记录添加节点,如果对1和2步骤的感兴趣的可以看原文链接
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev; // 1
newCtx.next = tail; // 2
prev.next = newCtx; // 3
tail.prev = newCtx; // 4
}
添加完节点之后,回调用户方法
pipeline删除节点
1.找到待删除节点 2.调整双向链表指针删除 3.回调用户函数
| DefaultChannelPipeline
private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
if (ctx == null) {
throw new NoSuchElementException(handler.getClass().getName());
} else {
return ctx;
}
}
@Override
public final ChannelHandlerContext context(ChannelHandler handler) {
if (handler == null) {
throw new NullPointerException("handler");
}
AbstractChannelHandlerContext ctx = head.next;
for (;;) {
if (ctx == null) {
return null;
}
if (ctx.handler() == handler) {
return ctx;
}
ctx = ctx.next;
}
}
删除操作
private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
assert ctx != head && ctx != tail;
synchronized (this) {
// 2.调整双向链表指针删除
remove0(ctx);
}
// 3.回调用户函数
callHandlerRemoved0(ctx);
return ctx;
}
private static void remove0(AbstractChannelHandlerContext ctx) {
AbstractChannelHandlerContext prev = ctx.prev;
AbstractChannelHandlerContext next = ctx.next;
prev.next = next; // 1
next.prev = prev; // 2
}