netty源码分析之pipeline[一]-笔记

Dcr 1年前 ⋅ 1306 阅读

netty在服务端端口绑定和新连接建立得过程中会建立相应得channel,而与channel得动作密切相关的是pipeline这个概念,pipeline可以看作是一条流水线,字节流输入,加工输出;

pipeline结构

pipeline实际上就是一个由Context组成的双向链表,头部为HeadContext,尾部为TailContext

1-1.png pipeline 默认结构

每个节点都是一个__ChannelHandlerContext__对象,每个context节点保存了它包裹的执行器__ChannelHandler__执行操作所需要的上下文

pipeline 初始化

pipeline初始化通过AbstractChannel(Channel parent)构造函数->newChannelPipeline()->new DefaultChannelPipeline(this);

| DefaultChannelPipeline 构造函数

protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    tail = new TailContext(this);
    head = new HeadContext(this);

    head.next = tail;
    tail.prev = head;
}

pipeline 添加节点

常见的客户端代码

bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
     @Override
     public void initChannel(SocketChannel ch) throws Exception {
         ChannelPipeline p = ch.pipeline();
         p.addLast(new Spliter())
         p.addLast(new Decoder());
         p.addLast(new BusinessHandler())
         p.addLast(new Encoder());
     }
});
  • 输入
  • spliter将来源TCP数据包拆包
  • 对包进行decoder
  • 传入业务处理器BusinessHandler
  • 业务处理完encoder
  • 输出

此时pipeline结构

1-2.png

这里通过两种颜色区分pipeline中两种不同类型的节点,一个是__ChannelInboundHandler__,处理inBound事件,最典型的就是读取数据流,加工处理;还有一种类型的Handler是__ChannelOutboundHandler__, 处理outBound事件,比如当调用__writeAndFlush()__类方法时,就会经过该种类型的handler

判断__Inbound__还是__Outbound__

通过__instanceof__关键字根据接口类型判断

private static boolean isInbound(ChannelHandler handler) {
    return handler instanceof ChannelInboundHandler;
}

private static boolean isOutbound(ChannelHandler handler) {
    return handler instanceof ChannelOutboundHandler;
}

| DefaultChannelPipeline

@Override
public final ChannelPipeline addLast(ChannelHandler... handlers) {
    return addLast(null, handlers);
}
@Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
    for (ChannelHandler h: handlers) {
        addLast(executor, null, h);
    }
    return this;
}
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
        // 1.检查是否有重复handler
        checkMultiplicity(handler);
        // 2.创建节点
        newCtx = newContext(group, filterName(name, handler), handler);
        // 3.添加节点
        addLast0(newCtx);
    }
   
    // 4.回调用户方法
    callHandlerAdded0(handler);
    
    return this;
}

这里着重记录添加节点,如果对1和2步骤的感兴趣的可以看原文链接

private void addLast0(AbstractChannelHandlerContext newCtx) {
    AbstractChannelHandlerContext prev = tail.prev;
    newCtx.prev = prev; // 1
    newCtx.next = tail; // 2
    prev.next = newCtx; // 3
    tail.prev = newCtx; // 4
}

1-3.png

添加完节点之后,回调用户方法

pipeline删除节点

1.找到待删除节点 2.调整双向链表指针删除 3.回调用户函数

| DefaultChannelPipeline

private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
    AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
    if (ctx == null) {
        throw new NoSuchElementException(handler.getClass().getName());
    } else {
        return ctx;
    }
}

@Override
public final ChannelHandlerContext context(ChannelHandler handler) {
    if (handler == null) {
        throw new NullPointerException("handler");
    }

    AbstractChannelHandlerContext ctx = head.next;
    for (;;) {

        if (ctx == null) {
            return null;
        }

        if (ctx.handler() == handler) {
            return ctx;
        }

        ctx = ctx.next;
    }
}

删除操作

private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
    assert ctx != head && ctx != tail;

    synchronized (this) {
        // 2.调整双向链表指针删除
        remove0(ctx);
    }
    // 3.回调用户函数
    callHandlerRemoved0(ctx);
    return ctx;
}

private static void remove0(AbstractChannelHandlerContext ctx) {
    AbstractChannelHandlerContext prev = ctx.prev;
    AbstractChannelHandlerContext next = ctx.next;
    prev.next = next; // 1
    next.prev = prev; // 2
}

1-4.png

参考文章

简书闪电侠 netty源码分析之pipeline(一)

全部评论: 0

    我有话说: