netty源码分析之pipeline[二]笔记

Dcr 1年前 ⋅ 1182 阅读

主要内容

  1. netty中的Unsafe到底是干什么的
  2. pipeline中的head
  3. pipeline中的inBound事件传播
  4. pipeline中的tail
  5. pipeline中的outBound事件传播
  6. pipeline 中异常的传播

Unsafe到底是干什么的

  • Unsafe概念 官方解释 | Unsafe operations that should never be called from user-code. | These methods are only provided to implement the actual transport, and must be invoked from an I/O thread

Unsafe在Channel定义,属于Channel的内部类

unsafe接口的所有方法

interface Unsafe {
   RecvByteBufAllocator.Handle recvBufAllocHandle();
   
   SocketAddress localAddress();
   SocketAddress remoteAddress();

   void register(EventLoop eventLoop, ChannelPromise promise);
   void bind(SocketAddress localAddress, ChannelPromise promise);
   void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
   void disconnect(ChannelPromise promise);
   void close(ChannelPromise promise);
   void closeForcibly();
   void beginRead();
   void write(Object msg, ChannelPromise promise);
   void flush();
   
   ChannelPromise voidPromise();
   ChannelOutboundBuffer outboundBuffer();
}

Unsafe继承结构

1-1.png

NioUnsafe在Unsafe的基础上拓展了以下几个接口

public interface NioUnsafe extends Unsafe {
    SelectableChannel ch();
    void finishConnect();
    void read();
    void forceFlush();
}

从增加的接口以及类名上来看,__NioUnsafe__增加了可以访问底层jdk的__SelectableChannel__的功能,定义了从__SelectableChannel__读取数据的__read__方法

__AbstractUnsafe__实现了大部分__Unsafe__的功能

AbstractNioUnsafe__主要是通过代理到其外部类__AbstractNioChannel__拿到了与jdk nio相关的一些信息,比如__SelectableChannel,__SelectionKey__等等

__NioSocketChannelUnsafe__和__NioByteUnsafe__放到一起讲,其实现了IO的基本操作,读,和写,这些操作都与jdk底层相关

NioMessageUnsafe__和__NioByteUnsafe__是处在同一层次的抽象,netty将一个新连接的建立也当作一个io操作来处理,这里的Message的含义我们可以当作是一个__SelectableChannel,读的意思就是accept一个SelectableChannel,写的意思是针对一些无连接的协议,比如UDP来操作的,我们先不用关注

Unsafe的分类

根据结构区分出两种类型的Unsafe

  • 与连接的字节数据读写相关的__NioByteUnsafe__
  • 与新连接建立操作相关的__NioMessageUnsafe__

NioByteUnsafe,NioMessageUnsafe 的读写都是通过委托到外部类NioSocketChannel实现

pipeline-head

head节点pipeline中第一个处理的IO事件,新连接接入和读事件在reactor线程的第二个步骤被检测到

| NioEventLoop

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
     final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
     //新连接的已准备接入或者已存在的连接有数据可读
     if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
         unsafe.read();
     }
}

读操作直接依赖到unsafe来进行

| NioByteUnsafe

@Override
public final void read() {
    final ChannelConfig config = config();
    final ChannelPipeline pipeline = pipeline();
    // 创建ByteBuf分配器
    final ByteBufAllocator allocator = config.getAllocator();
    final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
    allocHandle.reset(config);

    ByteBuf byteBuf = null;
    do {
        // 分配一个ByteBuf
        byteBuf = allocHandle.allocate(allocator);
        // 将数据读取到分配的ByteBuf中去
        allocHandle.lastBytesRead(doReadBytes(byteBuf));
        if (allocHandle.lastBytesRead() <= 0) {
            byteBuf.release();
            byteBuf = null;
            close = allocHandle.lastBytesRead() < 0;
            break;
        }

        // 触发事件,将会引发pipeline的读事件传播
        pipeline.fireChannelRead(byteBuf);
        byteBuf = null;
    } while (allocHandle.continueReading());
    pipeline.fireChannelReadComplete();
}

NioByteUnsafe要做的事情可以简单的分为以下几个步骤

  1. 拿到channel的config之后拿到bytebuf分配器,用分配器来分配一个bytebuf,bytebuf是netty里面的字节数据载体,后面读取的数据都读到这个对象里面
  2. 将channel中的数据读取到bytebuf
  3. 数据读取完之后,调用__pipeline.fireChannelRead(byteBuf)__从head节点开始传播至整个pepeline

| DefaultChannelPipeline

final AbstractChannelHandlerContext head;
//...
head = new HeadContext(this);

public final ChannelPipeline fireChannelRead(Object msg) {
    AbstractChannelHandlerContext.invokeChannelRead(head, msg);
    return this;
}

这里可以看出来fireChannelRead会返回pipeline的上下文节点,拿到上下文节点就可以操作整个pipeline.

1-2.png

| HeadContext

final class HeadContext extends AbstractChannelHandlerContext
        implements ChannelOutboundHandler, ChannelInboundHandler {

    private final Unsafe unsafe;

    HeadContext(DefaultChannelPipeline pipeline) {
        super(pipeline, null, HEAD_NAME, false, true);
        unsafe = pipeline.channel().unsafe();
        setAddComplete();
    }

    @Override
    public ChannelHandler handler() {
        return this;
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        // NOOP
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        // NOOP
    }

    @Override
    public void bind(
            ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
            throws Exception {
        unsafe.bind(localAddress, promise);
    }

    @Override
    public void connect(
            ChannelHandlerContext ctx,
            SocketAddress remoteAddress, SocketAddress localAddress,
            ChannelPromise promise) throws Exception {
        unsafe.connect(remoteAddress, localAddress, promise);
    }

    @Override
    public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
        unsafe.disconnect(promise);
    }

    @Override
    public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
        unsafe.close(promise);
    }

    @Override
    public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
        unsafe.deregister(promise);
    }

    @Override
    public void read(ChannelHandlerContext ctx) {
        unsafe.beginRead();
    }

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        unsafe.write(msg, promise);
    }

    @Override
    public void flush(ChannelHandlerContext ctx) throws Exception {
        unsafe.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.fireExceptionCaught(cause);
    }

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        invokeHandlerAddedIfNeeded();
        ctx.fireChannelRegistered();
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelUnregistered();

        // Remove all handlers sequentially if channel is closed and unregistered.
        if (!channel.isOpen()) {
            destroy();
        }
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelActive();

        readIfIsAutoRead();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelInactive();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ctx.fireChannelRead(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelReadComplete();

        readIfIsAutoRead();
    }

    private void readIfIsAutoRead() {
        if (channel.config().isAutoRead()) {
            channel.read();
        }
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        ctx.fireUserEventTriggered(evt);
    }

    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelWritabilityChanged();
    }
}

从head节点的实现继承关系来看既是ChannelHandlerContext,同时又是inBound和outBound handler

在传播读写事件的时候,head的功能只是简单的将事件传播下去,如__ctx.fireChannelRead(msg)__

在真正执行读写操作的时候,例如在调用__writeAndFlush()__等方法的时候,最终都会委托到unsafe执行,而当一次数据读完,channelReadComplete__方法首先被调用, TA要做的事情除了将事件继续传播下去之外,还得继续向reactor线程注册读事件,即调用__readIfIsAutoRead()

| HeadContext

private void readIfIsAutoRead() {
    if (channel.config().isAutoRead()) {
        channel.read();
    }
}

| AbstractChannel

@Override
public Channel read() {
    pipeline.read();
    return this;
}

默认情况下,Channel都是默认开启自动读取模式的,既只要Channel是active的,读完一波数据之后就继续向selector注册读事件, 这样就可以连续不断的读取数据,最终通过pipeline传递到head节点

| HeadContext

@Override
public void read(ChannelHandlerContext ctx) {
    unsafe.beginRead();
}

委托到了NioByteUnsafe

@Override
public final void beginRead() {
    doBeginRead();
} 

| AbstractNioChannel

@Override
protected void doBeginRead() throws Exception {
    // Channel.read() or ChannelHandlerContext.read() was called
    final SelectionKey selectionKey = this.selectionKey;
    if (!selectionKey.isValid()) {
        return;
    }

    readPending = true;

    final int interestOps = selectionKey.interestOps();
    if ((interestOps & readInterestOp) == 0) {
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}

__doBeginRead()拿到处理过的__selectionKey,然后如果发现该selectionKey若在某个地方被移除readInterestOp操作,这里给他加上. 事实上,标准的netty程序不会走到这一行,只有在三次握手成功之后,如下方法被调用

public void channelActive(ChannelHandlerContext ctx) throws Exception {
    ctx.fireChannelActive();

    readIfIsAutoRead();
}

才会将__readInterestOp__注册到SelectionKey上

总结一点,head节点的作用就是作为pipeline的头节点开始传递读写事件,调用unsafe进行实际的读写操作

pipeline中的inBound事件传播

__pipeline.fireChannelActive();最终会调用到__AbstractNioChannel.doBeginRead(),了解pipeline中的事件传播机制

| DefaultChannlpipeline

public final ChannelPipeline fireChannelActive() {
    AbstractChannelHandlerContext.invokeChannelActive(head);
    return this;
}

三次握手成功之后,__pipeline.fireChannelActice();__被调用,然后以head节点为参数,直接静态调用

| AbstractChannelHandlerContext

static void invokeChannelActive(final AbstractChannelHandlerContext next) {
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelActive();
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelActive();
            }
        });
    }
}

netty为了确保线程的安全性,将确保该操作在reactor线程中被执行,这里直接调用__HeadContext.fireChannelActive()__方法

| HeadContext

public void channelActive(ChannelHandlerContext ctx) throws Exception {
    ctx.fireChannelActive();

    readIfIsAutoRead();
}

| AbstractChannelHandlerContext

public ChannelHandlerContext fireChannelActive() {
    final AbstractChannelHandlerContext next = findContextInbound();
    invokeChannelActive(next);
    return this;
}

调用__findContextInbound()__找到下一个inbound节点,由于当前pipeline的双向链表结构中既有inbound节点,又有outbound节点,看看源码如何找到下一个inBound节点

| AbstractChannelHandlerContext

private AbstractChannelHandlerContext findContextInbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.next;
    } while (!ctx.inbound);
    return ctx;
}

netty寻找下一个inBound节点的过程是一个线性搜索的过程,通过遍历双向链表的下一个节点,知道下一个节点为inBound

找到下一个节点之后激活该节点,执行__invokeChannelActive(next);__,一个递归调用,直到最后一个inBound节点--tail节点

| TailContext

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { }

Tail节点的ChannelActive方法为空方法结束调用.同理分析所有inBound事件传播,正常情况下,既用户不覆盖每个节点的事件传播操作,几乎所有事件都落到tail节点

pipeline中的tail节点

final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {

    TailContext(DefaultChannelPipeline pipeline) {
        super(pipeline, null, TAIL_NAME, true, false);
        setAddComplete();
    }

    @Override
    public ChannelHandler handler() {
        return this;
    }

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception { }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception { }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception { }

    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception { }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        // This may not be a configuration error and so don't log anything.
        // The event may be superfluous for the current pipeline configuration.
        ReferenceCountUtil.release(evt);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        onUnhandledInboundException(cause);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        onUnhandledInboundMessage(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { }
}

正如前面提到,tail节点的大部分作用既终止事件传播(方法体为空),除此之外,有两个重要的方法需要注意,__exceptionCaught()和__channelRead()

| exceptionCaught

protected void onUnhandledInboundException(Throwable cause) {
    try {
        logger.warn(
                "An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
                        "It usually means the last handler in the pipeline did not handle the exception.",
                cause);
    } finally {
        ReferenceCountUtil.release(cause);
    }
}

异常传播的机制和inBound事件传播机制一样,最终如果用户自定义节点没有处理的话,会落到tail节点,tail节点可不会简单吞下这个异常,而是发出警告

| channelRead

protected void onUnhandledInboundMessage(Object msg) {
    try {
        logger.debug(
                "Discarded inbound message {} that reached at the tail of the pipeline. " +
                        "Please check your pipeline configuration.", msg);
    } finally {
        ReferenceCountUtil.release(msg);
    }
}

另外,tail节点在发现字节数据(ByteBuf)或者decoder之后的业务对象在pipeline流转过程中没有被消费,落到tail节点,tail节点也是发出警告,声明该消息已丢弃

pipeline中的outBound事件传播

这里以writeAndFlush操作看看pipeline中的outBound事件如何向外传播

典型的消息推送系统中,会有类似的一段代码

Channel channel = getChannel(userInfo);
channel.writeAndFlush(pushInfo);

这里跟进__channel.writeAndFlush__

| NioSocketChannel

public ChannelFuture writeAndFlush(Object msg) {
    return pipeline.writeAndFlush(msg);
}

从pipeline开始往外传播

public final ChannelFuture writeAndFlush(Object msg) {
    return tail.writeAndFlush(msg);
}

Channel中大部分outBound事件都是从tail开始往外传播,__writeAndFlush()__方法是tail继承而来的

| AbstractChannelHandlerContext

public ChannelFuture writeAndFlush(Object msg) {
    return writeAndFlush(msg, newPromise());
}

public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
    write(msg, true, promise);

    return promise;
}

这里需要注意一点,netty中很多io操作都是异步操作,返回一个ChannelFuture给调用方,调用方拿到这个future可以在适当的时机拿到操作的结果,或者注册回调.

| AbstractChannelHandlerContext

private void write(Object msg, boolean flush, ChannelPromise promise) {
    AbstractChannelHandlerContext next = findContextOutbound();
    final Object m = pipeline.touch(msg, next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        if (flush) {
            next.invokeWriteAndFlush(m, promise);
        } else {
            next.invokeWrite(m, promise);
        }
    } else {
        AbstractWriteTask task;
        if (flush) {
            task = WriteAndFlushTask.newInstance(next, m, promise);
        }  else {
            task = WriteTask.newInstance(next, m, promise);
        }
        safeExecute(executor, task, promise, m);
    }
}

netty为了保证程序的高效执行,所有的核心操作都在reactor线程中处理,如果业务线程调用Channel的读写方法,netty会将该操作封装成一个task,随后在reactor线程中执行

| AbstractChannelHandlerContext

private AbstractChannelHandlerContext findContextOutbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.prev;
    } while (!ctx.outbound);
    return ctx;
}

这里寻找outBound节点与inbound类似

| AbstractChannelHandlerContext

private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
    if (invokeHandler()) {
        invokeWrite0(msg, promise);
        invokeFlush0();
    } else {
        writeAndFlush(msg, promise);
    }
}

调用该节点的ChannelHandler的write方法

| AbstractChannelHandlerContext

private void invokeWrite0(Object msg, ChannelPromise promise) {
    try {
        ((ChannelOutboundHandler) handler()).write(this, msg, promise);
    } catch (Throwable t) {
        notifyOutboundHandlerException(t, promise);
    }
}

在使用outBound类型的ChannelHandler中,一般会继承__ChannelOutboundHandlerAdapter__,看看他的__write__方法是怎么处理outBound事件传播的

| ChannelOutboundHandlerAdapter

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    ctx.write(msg, promise);
}

递归调用__ctx.write(msg,promise);__pipeline的双向链表结构中,最后一个outBound节点就是head节点

| HeadContext

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    unsafe.write(msg, promise);
}

实际情况下,outBound类的节点中会有一种特殊类型的节点叫encoder,它的作用是根据自定义编码规则将业务对象转换成ByteBuf,而这类encoder一般继承自__MessageToByteEncoder__

public abstract class DataPacketEncoder extends MessageToByteEncoder<DatePacket> {

    @Override
    protected void encode(ChannelHandlerContext ctx, DatePacket msg, ByteBuf out) throws Exception {
        // 这里拿到业务对象msg的数据,然后调用 out.writeXXX()系列方法编码
    }
}

| MessageToByteEncoder

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    ByteBuf buf = null;
    try {
        // 需要判断当前编码器能否处理这类对象
        if (acceptOutboundMessage(msg)) {
            I cast = (I) msg;
            // 分配内存
            buf = allocateBuffer(ctx, cast, preferDirect);
            try {
                encode(ctx, cast, buf);
            } finally {
                ReferenceCountUtil.release(cast);
            }
            // buf到这里已经装载着数据,于是把该buf往前丢,直到head节点
            if (buf.isReadable()) {
                ctx.write(buf, promise);
            } else {
                buf.release();
                ctx.write(Unpooled.EMPTY_BUFFER, promise);
            }
            buf = null;
        } else {
            // 如果不能处理,就将outBound事件继续往前面传播
            ctx.write(msg, promise);
        }
    } catch (EncoderException e) {
        throw e;
    } catch (Throwable e) {
        throw new EncoderException(e);
    } finally {
        if (buf != null) {
            buf.release();
        }
    }

调用__acceptOutboundMessage__方法判断,该encoder是否可以处理msg对应的类的对象,通过后就强行转换,这里的泛型I对应的是__DataPacket__,转换之后,开辟一段内存,调用__encode()__,既 回到DataPacketEncoder中,将buf装满数据,最后如果buf中被写了数据(buf.isReadable()),就将该buf往前丢,一直传递到head节点,被head节点的unsafe消费掉

如果当前encoder不能处理当前业务对象,就简单的将该业务对象向前传播,知道head节点,都处理完之后,释放buf,避免对外内存泄露

pipeline中异常传播

通常在业务代码中,会加入一个异常处理器,统一处理pipeline过程中的所有异常,并且一般该异常处理器需要加在自定义节点的末尾,既

此类ExceptionHandler一般继承自__ChannelDuplexHandler__,标识该节点既是一个inBound节点又是一个outBound节点

inBound异常处理

对于每个节点的数据读取都会调用__AbstractChannelHandlerContext.invokeChannelRead()__方法

| AbstractChannelHandlerContext

private void invokeChannelRead(Object msg) {
    try {
        ((ChannelInboundHandler) handler()).channelRead(this, msg);
    } catch (Throwable t) {
        notifyHandlerException(t);
    }
}

可以看到该节点最终委托到其内部的ChannelHandler处理channelRead,而在最外层catch整个Throwable,因此如下用户代码中的异常都会被捕获 进入到__notifyHandlerException(t);__往下传播

| AbstractChannelHandlerContext

private void notifyHandlerException(Throwable cause) {
    // 略去了非关键代码,读者可自行分析
    invokeExceptionCaught(cause);
}
private void invokeExceptionCaught(final Throwable cause) {
    handler().exceptionCaught(this, cause);
}

可以看到,此Handler中异常优先由此Handler中的__exceptionCaught__方法来处理

| ChannelInboundHandlerAdapter

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
        throws Exception {
    ctx.fireExceptionCaught(cause);
}

| AbstractChannelHandlerContext

public ChannelHandlerContext fireExceptionCaught(final Throwable cause) {
    invokeExceptionCaught(next, cause);
    return this;
}

到了这里,已经很清楚了,如果我们在自定义Handler中没有处理异常,那么默认情况下该异常将一直传递下去,遍历每一个节点,直到最后一个自定义异常处理器ExceptionHandler来终结,收编异常

| ExceptionHandler

public Exceptionhandler extends ChannelDuplexHandler {
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        // 处理该异常,并终止异常的传播
    }
}

这也就是为什么异常处理器要加载pipeline的最后了

outBound的异常处理

前面提到的__writeAndFlush__方法为例,看看outBound事件传播过程中的异常最后如何落到__Exceptionhandler__中去的 前面说到__channel.writeAndFlush()__方法最终也会调用到节点的__invokeFlush0()__方法

| AbstractChannelHandlerContext

private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
    if (invokeHandler()) {
        invokeWrite0(msg, promise);
        invokeFlush0();
    } else {
        writeAndFlush(msg, promise);
    }
}


private void invokeFlush0() {
    try {
        ((ChannelOutboundHandler) handler()).flush(this);
    } catch (Throwable t) {
        notifyHandlerException(t);
    }
}

而__invokeFlush0()__会委托其内部的ChannelHandler的flush方法,我们一般实现的既是ChannelHandler的flush方法 在flush的过程中发生了异常,会被__notifyHandlerException(t);__捕获,该方法回合inBound事件传播过程中的异常传 播方法一样,也是轮询找到下一个异常处理器,如果异常处理器在pipeline最后面的话,一定会被执行到.

总结

  1. 一个Channel对应一个Unsafe,Unsafe处理底层操作,NioServerSocketChannel对应NioMessageUnsafe, NioSocketChannel对应NioByteUnsafe

  2. inBound事件从head节点传播到tail节点,outBound事件从tail节点传播到head节点

  3. 异常传播只会往后传播,而且不分inbound还是outbound节点,不像outBound事件一样会往前传播

文章来源

简书闪电侠-netty源码分析之pipeline[二]

全部评论: 0

    我有话说: