netty源码分析之writeAndFlush全解析笔记

Dcr 1年前 ⋅ 2407 阅读

数据通过pipeline的流程

  • 1.数据从head节点流入
  • 2.拆包
  • 3.解码成业务对象
  • 4.经过业务handler处理
  • 5.调用write
  • 6.encode将对象编码成ByteBuf
  • 7.ByteBuf到达head节点
  • 8.调用底层Unsafe写道jdk底层管道

java对象编码过程

BusinessHandler

 protected void channelRead0(ChannelHandlerContext ctx, Request request) throws Exception {
    Response response = doBusiness(request);
    
    if (response != null) {
        ctx.channel().write(response);
    }
 }

业务处理器处理后落到Encoder节点

Encoder

public class Encoder extends MessageToByteEncoder<Response> {
    @Override
    protected void encode(ChannelHandlerContext ctx, Response response, ByteBuf out) throws Exception {
        out.writeByte(response.getVersion());
        out.writeInt(4 + response.getData().length);
        out.writeBytes(response.getData());
    }
}

MessageToByteEncoder

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    ByteBuf buf = null;
    try {
        // 判断当前Handelr是否能处理写入的消息
        if (acceptOutboundMessage(msg)) {
            @SuppressWarnings("unchecked")
            // 强制换换
            I cast = (I) msg;
            // 分配一段ButeBuf
            buf = allocateBuffer(ctx, cast, preferDirect);
            try {
            // 调用encode,这里就调回到  `Encoder` 这个Handelr中    
                encode(ctx, cast, buf);
            } finally {
                // 既然自定义java对象转换成ByteBuf了,那么这个对象就已经无用了,释放掉
                // (当传入的msg类型是ByteBuf的时候,就不需要自己手动释放了)
                ReferenceCountUtil.release(cast);
            }
            // 如果buf中写入了数据,就把buf传到下一个节点
            if (buf.isReadable()) {
                ctx.write(buf, promise);
            } else {
            // 否则,释放buf,将空数据传到下一个节点    
                buf.release();
                ctx.write(Unpooled.EMPTY_BUFFER, promise);
            }
            buf = null;
        } else {
            // 如果当前节点不能处理传入的对象,直接扔给下一个节点处理
            ctx.write(msg, promise);
        }
    } catch (EncoderException e) {
        throw e;
    } catch (Throwable e) {
        throw new EncoderException(e);
    } finally {
        // 当buf在pipeline中处理完之后,释放
        if (buf != null) {
            buf.release();
        }
    }
}

HeadContext

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    unsafe.write(msg, promise);
}

这里的msg就是前面__Encoder__节点中,载有java对象数据的自定义ByteBuf对象

AbstractChannel

@Override
public final void write(Object msg, ChannelPromise promise) {
	//确保该方法的调用在reactor线程中
    assertEventLoop();

    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;

    int size;
    try {
    	//将待写入的对象过滤,把非byteBuf对象和FileRegion过滤,把所有的非直接内存转换成直接内存DirectBuffer
        msg = filterOutboundMessage(msg);
        //估算出需要写入的byteBuf的size
        size = pipeline.estimatorHandle().size(msg);
        if (size < 0) {
            size = 0;
        }
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        ReferenceCountUtil.release(msg);
        return;
    }

    outboundBuffer.addMessage(msg, size, promise);
}

接下来看看__filterOutboundMessage__方法

AbstractNioByteChannel

@Override
protected final Object filterOutboundMessage(Object msg) {
    if (msg instanceof ByteBuf) {
        ByteBuf buf = (ByteBuf) msg;
        if (buf.isDirect()) {
            return msg;
        }

        return newDirectBuffer(buf);
    }

    if (msg instanceof FileRegion) {
        return msg;
    }

    throw new UnsupportedOperationException(
            "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}

重点看看__addMessage__方法

ChannelOutboundBuffer

public void addMessage(Object msg, int size, ChannelPromise promise) {
    // 创建一个待写出的消息节点
    Entry entry = Entry.newInstance(msg, size, total(msg), promise);
    if (tailEntry == null) {
        flushedEntry = null;
        tailEntry = entry;
    } else {
        Entry tail = tailEntry;
        tail.next = entry;
        tailEntry = entry;
    }
    if (unflushedEntry == null) {
        unflushedEntry = entry;
    }

    incrementPendingOutboundBytes(size, false);
}

ChannelOutboundBuffer

ChannelOutboundBuffer里面的数据结构是一个单链表结构,每个节点是一个__Entry__,Entry__里面包含了待写出__ByteBuf__以及消息回调__promise,结构如下图

1-1.png

  • flushedEntry 指针表示第一个被写到操作系统Socket缓冲区中的节点
  • unFlushedEntry 指针表示第一个未被写入到操作系统Socket缓冲区的节点
  • tailEntry 指针表示ChannelOutboundBuffer缓冲区的最后一个节点

初次调用__addMessage__之后,各个指针的情况

1-2.png

fushedEntry 指向空, unFushedEntrytailEntry 都指向新加入的节点

第二次调用__addMessage__

1-3.png

第n次调用__addMessage__ 之后,各个指针的情况为

1-4.png

调用n次__addMessage__, flushedEntry指针一直指向null,表示现在还未有节点需要写出到Socket缓冲区,而__unFushedEntry__之后有n个节点,表示当前n个节点尚未写出到Socket缓冲区.

flush:刷新写队列

不管调用__channel.flush(), 还是__ctx.flush(), 最终都会落到pipeline中的head节点

HeadContext

@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
    unsafe.flush();
}

AbstractUnsafe

public final void flush() {
   assertEventLoop();

   ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
   if (outboundBuffer == null) {
       return;
   }

   outboundBuffer.addFlush();
   flush0();
}

ChannelOutboundBuffer

public void addFlush() {
    Entry entry = unflushedEntry;
    if (entry != null) {
        if (flushedEntry == null) {
            flushedEntry = entry;
        }
        do {
            flushed ++;
            if (!entry.promise.setUncancellable()) {
                int pending = entry.cancel();
                decrementPendingOutboundBytes(pending, false, true);
            }
            entry = entry.next;
        } while (entry != null);
        unflushedEntry = null;
    }
}

首先拿到__unFlushedEntry__ (第一个未被写入到操作系统Socket缓冲区的节点)指针,然后将__flushedEntry__指向__unFlushedEntry__所指向的节点

1-5.png

接下来调用__flush0()__

AbstractUnsafe

protected void flush0() {
    doWrite(outboundBuffer);
}

AbstractNioByteChannel

protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    int writeSpinCount = -1;

    boolean setOpWrite = false;
    for (;;) {
        // 拿到第一个需要flush的节点的数据
        Object msg = in.current();

        if (msg instanceof ByteBuf) {
            // 强转为ByteBuf,若发现没有数据可读,直接删除该节点
            ByteBuf buf = (ByteBuf) msg;

            boolean done = false;
            long flushedAmount = 0;
            // 拿到自旋锁迭代次数
            if (writeSpinCount == -1) {
                writeSpinCount = config().getWriteSpinCount();
            }
            // 自旋,将当前节点写出
            for (int i = writeSpinCount - 1; i >= 0; i --) {
                int localFlushedAmount = doWriteBytes(buf);
                if (localFlushedAmount == 0) {
                    setOpWrite = true;
                    break;
                }

                flushedAmount += localFlushedAmount;
                if (!buf.isReadable()) {
                    done = true;
                    break;
                }
            }

            in.progress(flushedAmount);

            // 写完之后,将当前节点删除
            if (done) {
                in.remove();
            } else {
                break;
            }
        } 
    }
}

1.current() 拿到第一个需要flush的节点的数据

channelOutBoundBuffer

public Object current() {
    Entry entry = flushedEntry;
    if (entry == null) {
        return null;
    }

    return entry.msg;
}

2.获取自旋锁的迭代次数

if (writeSpinCount == -1) {
    writeSpinCount = config().getWriteSpinCount();
}

ChannelConfig

/**
 * Returns the maximum loop count for a write operation until
 * {@link WritableByteChannel#write(ByteBuffer)} returns a non-zero value.
 * It is similar to what a spin lock is used for in concurrency programming.
 * It improves memory utilization and write throughput depending on
 * the platform that JVM runs on.  The default value is {@code 16}.
 */
int getWriteSpinCount();

3.自旋的方式将byteBuf写出到jdk nio的Channel

for (int i = writeSpinCount - 1; i >= 0; i --) {
    int localFlushedAmount = doWriteBytes(buf);
    if (localFlushedAmount == 0) {
        setOpWrite = true;
        break;
    }

    flushedAmount += localFlushedAmount;
    if (!buf.isReadable()) {
        done = true;
        break;
    }
}

doWriteBytes

protected int doWriteBytes(ByteBuf buf) throws Exception {
    final int expectedWrittenBytes = buf.readableBytes();
    return buf.readBytes(javaChannel(), expectedWrittenBytes);
}

出现__javaChannel()__说明已经进入jdk nio Channel的领域

4.删除该节点

ChannelOutBoundBuffer

public boolean remove() {
    Entry e = flushedEntry;
    Object msg = e.msg;

    ChannelPromise promise = e.promise;
    int size = e.pendingSize;

    removeEntry(e);

    if (!e.cancelled) {
        ReferenceCountUtil.safeRelease(msg);
        safeSuccess(promise);
    }

    // recycle the entry
    e.recycle();

    return true;
}

拿到当前被flush掉的节点(flushedEntry所指),然后拿到该节点的回调对象__ChannelPromise__,调用__removeEntry()__方法移除该节点

private void removeEntry(Entry e) {
    if (-- flushed == 0) {
        flushedEntry = null;
        if (e == tailEntry) {
            tailEntry = null;
            unflushedEntry = null;
        }
    } else {
        flushedEntry = e.next;
    }
}

1-6.png

释放该节点数据的内存,调用__safeSuccess__回调

writeAndFlush:写队列并刷新

TailContext

public final ChannelFuture writeAndFlush(Object msg) {
    return tail.writeAndFlush(msg);
}

public ChannelFuture writeAndFlush(Object msg) {
    return writeAndFlush(msg, newPromise());
}

public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
    write(msg, true, promise);

    return promise;
}

private void write(Object msg, boolean flush, ChannelPromise promise) {
    AbstractChannelHandlerContext next = findContextOutbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        if (flush) {
            next.invokeWriteAndFlush(m, promise);
        } else {
            next.invokeWrite(m, promise);
        }
    } 
}

通过flush表示调用__invokeWriteAndFlush__还是__invokerWrite__,__invokeWrite__便是上文中的__write__过程

private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
    invokeWrite0(msg, promise);
    invokeFlush0();
}

最终调用的底层方法和单独调用__write__和__flush__是一样的

private void invokeWrite(Object msg, ChannelPromise promise) {
        invokeWrite0(msg, promise);
}

private void invokeFlush(Object msg, ChannelPromise promise) {
        invokeFlush0(msg, promise);
}

总结

1.pipeline中的编码器原理是创建一个ByteBuf,将java对象转换为ByteBuf,然后再把ByteBuf继续向前传递 2.调用write方法并没有将数据写到Socket缓冲区中,而是写到了一个单向链表的数据结构中,flush才是真正的写出 3.writeAndFlush等价于先将数据写到netty的缓冲区,再将netty缓冲区中的数据写到Socket缓冲区中,写的过程与并发编程类似,用自旋锁保证写成功 4.netty中的缓冲区中的ByteBuf为DirectByteBuf

参考文章

简书闪电侠-netty源码分析之writeAndFlush全解析

全部评论: 0

    我有话说: