数据通过pipeline的流程
- 1.数据从head节点流入
- 2.拆包
- 3.解码成业务对象
- 4.经过业务handler处理
- 5.调用write
- 6.encode将对象编码成ByteBuf
- 7.ByteBuf到达head节点
- 8.调用底层Unsafe写道jdk底层管道
java对象编码过程
BusinessHandler
protected void channelRead0(ChannelHandlerContext ctx, Request request) throws Exception {
Response response = doBusiness(request);
if (response != null) {
ctx.channel().write(response);
}
}
业务处理器处理后落到Encoder节点
Encoder
public class Encoder extends MessageToByteEncoder<Response> {
@Override
protected void encode(ChannelHandlerContext ctx, Response response, ByteBuf out) throws Exception {
out.writeByte(response.getVersion());
out.writeInt(4 + response.getData().length);
out.writeBytes(response.getData());
}
}
MessageToByteEncoder
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ByteBuf buf = null;
try {
// 判断当前Handelr是否能处理写入的消息
if (acceptOutboundMessage(msg)) {
@SuppressWarnings("unchecked")
// 强制换换
I cast = (I) msg;
// 分配一段ButeBuf
buf = allocateBuffer(ctx, cast, preferDirect);
try {
// 调用encode,这里就调回到 `Encoder` 这个Handelr中
encode(ctx, cast, buf);
} finally {
// 既然自定义java对象转换成ByteBuf了,那么这个对象就已经无用了,释放掉
// (当传入的msg类型是ByteBuf的时候,就不需要自己手动释放了)
ReferenceCountUtil.release(cast);
}
// 如果buf中写入了数据,就把buf传到下一个节点
if (buf.isReadable()) {
ctx.write(buf, promise);
} else {
// 否则,释放buf,将空数据传到下一个节点
buf.release();
ctx.write(Unpooled.EMPTY_BUFFER, promise);
}
buf = null;
} else {
// 如果当前节点不能处理传入的对象,直接扔给下一个节点处理
ctx.write(msg, promise);
}
} catch (EncoderException e) {
throw e;
} catch (Throwable e) {
throw new EncoderException(e);
} finally {
// 当buf在pipeline中处理完之后,释放
if (buf != null) {
buf.release();
}
}
}
HeadContext
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
unsafe.write(msg, promise);
}
这里的msg就是前面__Encoder__节点中,载有java对象数据的自定义ByteBuf对象
AbstractChannel
@Override
public final void write(Object msg, ChannelPromise promise) {
//确保该方法的调用在reactor线程中
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
int size;
try {
//将待写入的对象过滤,把非byteBuf对象和FileRegion过滤,把所有的非直接内存转换成直接内存DirectBuffer
msg = filterOutboundMessage(msg);
//估算出需要写入的byteBuf的size
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}
outboundBuffer.addMessage(msg, size, promise);
}
接下来看看__filterOutboundMessage__方法
AbstractNioByteChannel
@Override
protected final Object filterOutboundMessage(Object msg) {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (buf.isDirect()) {
return msg;
}
return newDirectBuffer(buf);
}
if (msg instanceof FileRegion) {
return msg;
}
throw new UnsupportedOperationException(
"unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}
重点看看__addMessage__方法
ChannelOutboundBuffer
public void addMessage(Object msg, int size, ChannelPromise promise) {
// 创建一个待写出的消息节点
Entry entry = Entry.newInstance(msg, size, total(msg), promise);
if (tailEntry == null) {
flushedEntry = null;
tailEntry = entry;
} else {
Entry tail = tailEntry;
tail.next = entry;
tailEntry = entry;
}
if (unflushedEntry == null) {
unflushedEntry = entry;
}
incrementPendingOutboundBytes(size, false);
}
ChannelOutboundBuffer
ChannelOutboundBuffer里面的数据结构是一个单链表结构,每个节点是一个__Entry__,Entry__里面包含了待写出__ByteBuf__以及消息回调__promise,结构如下图
- flushedEntry 指针表示第一个被写到操作系统Socket缓冲区中的节点
- unFlushedEntry 指针表示第一个未被写入到操作系统Socket缓冲区的节点
- tailEntry 指针表示ChannelOutboundBuffer缓冲区的最后一个节点
初次调用__addMessage__之后,各个指针的情况
fushedEntry 指向空, unFushedEntry 和 tailEntry 都指向新加入的节点
第二次调用__addMessage__
第n次调用__addMessage__ 之后,各个指针的情况为
调用n次__addMessage__, flushedEntry指针一直指向null,表示现在还未有节点需要写出到Socket缓冲区,而__unFushedEntry__之后有n个节点,表示当前n个节点尚未写出到Socket缓冲区.
flush:刷新写队列
不管调用__channel.flush(), 还是__ctx.flush(), 最终都会落到pipeline中的head节点
HeadContext
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
unsafe.flush();
}
AbstractUnsafe
public final void flush() {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
return;
}
outboundBuffer.addFlush();
flush0();
}
ChannelOutboundBuffer
public void addFlush() {
Entry entry = unflushedEntry;
if (entry != null) {
if (flushedEntry == null) {
flushedEntry = entry;
}
do {
flushed ++;
if (!entry.promise.setUncancellable()) {
int pending = entry.cancel();
decrementPendingOutboundBytes(pending, false, true);
}
entry = entry.next;
} while (entry != null);
unflushedEntry = null;
}
}
首先拿到__unFlushedEntry__ (第一个未被写入到操作系统Socket缓冲区的节点)指针,然后将__flushedEntry__指向__unFlushedEntry__所指向的节点
接下来调用__flush0()__
AbstractUnsafe
protected void flush0() {
doWrite(outboundBuffer);
}
AbstractNioByteChannel
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
int writeSpinCount = -1;
boolean setOpWrite = false;
for (;;) {
// 拿到第一个需要flush的节点的数据
Object msg = in.current();
if (msg instanceof ByteBuf) {
// 强转为ByteBuf,若发现没有数据可读,直接删除该节点
ByteBuf buf = (ByteBuf) msg;
boolean done = false;
long flushedAmount = 0;
// 拿到自旋锁迭代次数
if (writeSpinCount == -1) {
writeSpinCount = config().getWriteSpinCount();
}
// 自旋,将当前节点写出
for (int i = writeSpinCount - 1; i >= 0; i --) {
int localFlushedAmount = doWriteBytes(buf);
if (localFlushedAmount == 0) {
setOpWrite = true;
break;
}
flushedAmount += localFlushedAmount;
if (!buf.isReadable()) {
done = true;
break;
}
}
in.progress(flushedAmount);
// 写完之后,将当前节点删除
if (done) {
in.remove();
} else {
break;
}
}
}
}
1.current() 拿到第一个需要flush的节点的数据
channelOutBoundBuffer
public Object current() {
Entry entry = flushedEntry;
if (entry == null) {
return null;
}
return entry.msg;
}
2.获取自旋锁的迭代次数
if (writeSpinCount == -1) {
writeSpinCount = config().getWriteSpinCount();
}
ChannelConfig
/**
* Returns the maximum loop count for a write operation until
* {@link WritableByteChannel#write(ByteBuffer)} returns a non-zero value.
* It is similar to what a spin lock is used for in concurrency programming.
* It improves memory utilization and write throughput depending on
* the platform that JVM runs on. The default value is {@code 16}.
*/
int getWriteSpinCount();
3.自旋的方式将byteBuf写出到jdk nio的Channel
for (int i = writeSpinCount - 1; i >= 0; i --) {
int localFlushedAmount = doWriteBytes(buf);
if (localFlushedAmount == 0) {
setOpWrite = true;
break;
}
flushedAmount += localFlushedAmount;
if (!buf.isReadable()) {
done = true;
break;
}
}
doWriteBytes
protected int doWriteBytes(ByteBuf buf) throws Exception {
final int expectedWrittenBytes = buf.readableBytes();
return buf.readBytes(javaChannel(), expectedWrittenBytes);
}
出现__javaChannel()__说明已经进入jdk nio Channel的领域
4.删除该节点
ChannelOutBoundBuffer
public boolean remove() {
Entry e = flushedEntry;
Object msg = e.msg;
ChannelPromise promise = e.promise;
int size = e.pendingSize;
removeEntry(e);
if (!e.cancelled) {
ReferenceCountUtil.safeRelease(msg);
safeSuccess(promise);
}
// recycle the entry
e.recycle();
return true;
}
拿到当前被flush掉的节点(flushedEntry所指),然后拿到该节点的回调对象__ChannelPromise__,调用__removeEntry()__方法移除该节点
private void removeEntry(Entry e) {
if (-- flushed == 0) {
flushedEntry = null;
if (e == tailEntry) {
tailEntry = null;
unflushedEntry = null;
}
} else {
flushedEntry = e.next;
}
}
释放该节点数据的内存,调用__safeSuccess__回调
writeAndFlush:写队列并刷新
TailContext
public final ChannelFuture writeAndFlush(Object msg) {
return tail.writeAndFlush(msg);
}
public ChannelFuture writeAndFlush(Object msg) {
return writeAndFlush(msg, newPromise());
}
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
write(msg, true, promise);
return promise;
}
private void write(Object msg, boolean flush, ChannelPromise promise) {
AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
}
}
通过flush表示调用__invokeWriteAndFlush__还是__invokerWrite__,__invokeWrite__便是上文中的__write__过程
private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
invokeWrite0(msg, promise);
invokeFlush0();
}
最终调用的底层方法和单独调用__write__和__flush__是一样的
private void invokeWrite(Object msg, ChannelPromise promise) {
invokeWrite0(msg, promise);
}
private void invokeFlush(Object msg, ChannelPromise promise) {
invokeFlush0(msg, promise);
}
总结
1.pipeline中的编码器原理是创建一个ByteBuf,将java对象转换为ByteBuf,然后再把ByteBuf继续向前传递 2.调用write方法并没有将数据写到Socket缓冲区中,而是写到了一个单向链表的数据结构中,flush才是真正的写出 3.writeAndFlush等价于先将数据写到netty的缓冲区,再将netty缓冲区中的数据写到Socket缓冲区中,写的过程与并发编程类似,用自旋锁保证写成功 4.netty中的缓冲区中的ByteBuf为DirectByteBuf