深入学习Netty-笔记

Dcr 1年前 ⋅ 1041 阅读

Netty是什么,解决了什么问题

  • 官方解释,是什么 | Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients.

  • 解决了什么问题 | Netty is a NIO client server framework which enables quick and easy development of network applications such as protocol servers and clients. It greatly simplifies and streamlines network programming such as TCP and UDP socket server.

netty是一个异步事件驱动的NIO网络应用框架.快速开发可维护的高性能协议服务端和客户端,极大的简化了网络编程.

网络编程界的Hello,World!

开始之前先写一个最简单的协议DISCARD协议,唯一需要做的就是忽略所有接收到的数据. 案例中用到的netty版本:4.1.72.Final

import io.netty.buffer.ByteBuf;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * Handles a server-side channel.
 */
public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1)

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
        // Discard the received data silently.
        ((ByteBuf) msg).release(); // (3)
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}
  • DiscardServerHandler继承了ChannelInboundHandlerAdapter,这是一个ChannelInboundHandler的实现.ChannelInboundHandler提供了可以覆盖的各种事件处理方法. 这里重写channelRead事件,每当接收到客户端的数据时将使用接收到的消息调用此方法. 实现DISCARD协议,处理程序只要忽略接收到消息.ByteBuf是一个引用计数的对象,必须通过release()方法显式释放. 注意释放传递给处理程序的引用计数对象是处理程序的责任,通常channelRead()的处理方法是这样实现的.
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    try {
        // Do something with msg
    } finally {
        ReferenceCountUtil.release(msg);
    }
}
  • 当Netty由于IO错误或处理事件时由于抛出的异常而引发异常时,exceptionCaught()事件处理程序方法使用Throwable调用.在大多数情况下,应该记录捕获的异常,并且在这里应该关闭与之关联的通道.

接下来DISCARD服务端的前半部分就实现好了,接下来编写main()方法,启动DiscardServerHandler服务端.

import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
    
/**
 * Discards any incoming data.
 */
public class DiscardServer {
    
    private int port;
    
    public DiscardServer(int port) {
        this.port = port;
    }
    
    public void run() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap(); // (2)
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class) // (3)
             .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ch.pipeline().addLast(new DiscardServerHandler());
                 }
             })
             .option(ChannelOption.SO_BACKLOG, 128)          // (5)
             .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
    
            // Bind and start to accept incoming connections.
            ChannelFuture f = b.bind(port).sync(); // (7)
    
            // Wait until the server socket is closed.
            // In this example, this does not happen, but you can do that to gracefully
            // shut down your server.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
    
    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        }

        new DiscardServer(port).run();
    }
}
  • NioEventLoopGroup是一个多线程事件循环器用于处理I/O操作. Netty为不同类型的传输提供了各种EventLoopGroup实现,在这个案例中,我们实现一个服务端应用程序,因此使用两个 NioEventLoopGroup.一个"boss"接收请求,一个"worker",boss接收到的请求将连接传递给worker执行.使用多少线程以及他们如何映射到创建的channels取决于EventLoopGroup实现,甚至可以通过构造参数进行配置.
  • ServerBootstrap是一个设置服务器的助手类.
  • 可以直接使用channel设置服务器,但是这是一个繁琐的过程,大多数情况下不需要操作它.
  • NioServerSocketChannel类,用于实例化一个新的Channel以接收传入的连接.
  • ChannelLInitializer是一个特殊的处理程序,目的时帮助用户配置一个新的Channel. 我们重写的DiscardServerHandler就需要通过这个类来配置到ChannelPipeline中,随着应用程序变得复杂,可以向管道中添加更多的处理程序,并最终将这个匿名类提取到父类中. 接下来就剩下绑定端口号启动服务了.

到这里就用netty完成了,网络编程的hello,world了!!

接收数据

因为上面实现的时一个Discard Server 所以你得不到任何回应,只能看到他启动了,为了证明它能够正常运行需要把它接收到的数据打印出来. 目前我们已经知道每次接收到数据都会调用channelRead方法.那我们就改造channelRead方法验证一下.

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    ByteBuf in = (ByteBuf) msg;
    try {
        while (in.isReadable()) { // (1)
            System.out.print((char) in.readByte());
            System.out.flush();
        }
    } finally {
        ReferenceCountUtil.release(msg); // (2)
    }
}

其实这里可以简化成in.release().

然后重新启动通过浏览器请求或者telnet的命令访问该服务就可以将请求打印出来,这里我们请求http://localhost:8080?text=hello,world

GET /?text=hello,world HTTP/1.1
Host: localhost:8080
Connection: keep-alive
Cache-Control: max-age=0
sec-ch-ua: " Not A;Brand";v="99", "Chromium";v="96", "Google Chrome";v="96"
sec-ch-ua-mobile: ?0
sec-ch-ua-platform: "Windows"
Upgrade-Insecure-Requests: 1
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9
Sec-Fetch-Site: none
Sec-Fetch-Mode: navigate
Sec-Fetch-User: ?1
Sec-Fetch-Dest: document
Accept-Encoding: gzip, deflate, br
Accept-Language: zh-CN,zh;q=0.9,en;q=0.8

响应服务

到目前为止我们一直在消耗数据而没有做出任何响应.然而服务器通常应该响应请求.接下来通过实现ECHO协议向客户端写入响应信息,在该协议中,任何接收到的数据都直接返回,依旧是修改channelRead方法即可.

@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ctx.write(msg); // (1)
        ctx.flush(); // (2)
    }
  • channelHandlerContext对象提供各种操作,是你能够触发各种I/O事件与操作.在这里直接调用write(Object)写入接收到的消息.注意这里不再是Discard例子中释放接收到的消息,而是写入后Netty为你释放. write方法不会直接将消息写入连接,而是在内部缓冲,然后由flush方法写入.或者直接调用writeAndFlush方法.

时间服务器

接下来尝试一个完整的服务端/客户端的案例,实现TIME协议,在这个例子中展示构造和发送消息以及完成时关闭连接.


public class TimeServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(final ChannelHandlerContext ctx) { // (1)
        final ByteBuf time = ctx.alloc().buffer(4); // (2)
        time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
        
        final ChannelFuture f = ctx.writeAndFlush(time); // (3)
        f.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                assert f == future;
                ctx.close();
            }
        }); // (4)
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

因为忽略任何接收到的数据,建立连接直接返回所以这次不用channelRead方法,重写channelActive方法

这里我们声明一个32位整数来表示当前时间. 为了发送一条消息,需要分配一个缓冲区来包含这条消息.32位整数容量至少为4个字节.通过ChannelHandlerContext.alloc()获取当前的ByteBufAllocator并分配一个新的缓冲区.

时间服务器客户端

public class TimeClient {
    public static void main(String[] args) throws Exception {
        String host = args[0];
        int port = Integer.parseInt(args[1]);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        
        try {
            Bootstrap b = new Bootstrap(); // (1)
            b.group(workerGroup); // (2)
            b.channel(NioSocketChannel.class); // (3)
            b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new TimeClientHandler());
                }
            });
            
            // Start the client.
            ChannelFuture f = b.connect(host, port).sync(); // (5)

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}
  • Bootstrap类似于ServerBootstrap,只是它用于非服务器通道,比如客户端通道或无连接通道. 注意客户端用的时connect方法而不是bind方法.

它从服务端接收一个32位的整数,并将其转换成人类可读的格式打印出来然后关闭连接.

import java.util.Date;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg; // (1)
        try {
            long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        } finally {
            m.release();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

在TCP/IP中,Netty读取从对等端发送的数据到ByteBuf.看起来非常简单,与服务端实例没什么不同,但是这个处理程序有可能抛出IndexOutOfBoudsException异常.

为什么会抛出IndexOutOfBoudsException异常

处理基于流处理,接收到的数据存储在socket的缓冲区中,而且基于流传输的不是包队列而是字节队列.这意味着即使你将两个消息作为两个独立的包发送,操作系统也不会将他们看作两个消息,而只是看作一串字节,因此不能保证你所读的内容与远程同行所写的内容完全一致.例如操作系统的TCP/IP栈收到了三个数据包:

ABC DEF GHI

由于基于流的协议这一属性,在你的应用程序里很有可能会以以下形势读取到:

AB CDEFG H I

因此,接收部分,无论是服务端还是客户端,都应该将接收到的数据进行整理,整理成一个或多个有意义的结构,以便应用程序能够轻松理解成原来的样子:

ABC DEF GHI

第一种解决方案

看回TIME客户端的案例,因为32位的数据量太小不太可能被拆分,但是随着流量的增加碎片化的可能性会增加. 简单的解决方案就是创建一个内部缓冲区,当4个字节都被接收到缓冲区再反序列化数据.

import java.util.Date;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    private ByteBuf buf;
    
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        buf = ctx.alloc().buffer(4); // (1)
    }
    
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) {
        buf.release(); // (1)
        buf = null;
    }
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg;
        buf.writeBytes(m); // (2)
        m.release();
        
        if (buf.readableBytes() >= 4) { // (3)
            long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        }
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

第二种解决方案

虽然第一个解决方案解决了消息碎片化的问题,但是修改完之后看起来不够干净和优雅. 其实可以向channelpipeline种添加多个channelhandler,将单个处理操作拆分解耦成多个模块化的channelHandler,以降低复杂性.例如将TimeClientHandler分解成两个handler,时间解码器和处理碎片化问题处理器.

public class TimeDecoder extends ByteToMessageDecoder { // (1)
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2)
        if (in.readableBytes() < 4) {
            return; // (3)
        }
        
        out.add(in.readBytes(4)); // (4)
    }
}

这样就把解码和处理碎片化问题拆分开.

b.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
    }
});
  • 注意实际开发过程中尽量使用POJO而不是ByteBuf 这样才能更好的提高代码的可读性,可维护性和重用性.

关闭应用程序

关闭一个Netty应用程序通常和关闭你通过shutdown优雅()创建的所有EventLoopGroups一样简单。它返回一个Future,通知你EventLoopGroup已经完全终止,所有属于组的通道已经关闭。

参考资料

netty官方文档

全部评论: 0

    我有话说: