深度解析NIO底层

松花皮蛋me 2019-04-11 00:39
文章首发于公众号 松花皮蛋的黑板报松花皮蛋的黑板报,作者就职于京东,在稳定性保障、敏捷开发、高级JAVA、微服务架构有深入的理解

一、BIO、NIO的介绍

我们可以分别启动高版本tomcat-8和低版本tomcat-6,然后模拟连接 Socket socket = new Socket(“localhost”,8080)会发现BIO和NIO最明显的区别

  1. 1、BIO的应用每来一个请求都会起一个线程去接待。
  2. 2、NIO的应用会有一个accepter线程专门负责接待。

所以NIO在做高并发和高吞吐量服务时比BIO更加适合,并且在长链接的情况下,同一时间对CPU的利用率也更高。

传统的BIO也叫”同步阻塞IO”,还有一种叫做”同步非阻塞I/O”

那么问题来了?我们的NIO 底层用的是那种I/O模型呢?其实是IO多路复用

这里提到了两个概念:

1、select/poll 多路由复用器(这里可以先简单的理解成一个队列,有一个单线程在不断的轮询处理队列里的事件)

2、fd 类似于一种请求事件(Socket描述符).有一点很重要,这里的select轮询是阻塞的。

刚刚说的I/O复用模型可以说是奠定了NIO的模型基础,但是我们的UNIX对这个模型做了进一步的优化,刚刚的图我们可以发现一个明显的问题,什么问题呢?select/poll线程像傻瓜一样的顺序轮询fd列表,而且处理的fd也是有限的。默认是1024个。这个时候 epoll(Event Poll 基于事件驱动的select/poll模型)模型就呼之欲出了。

这里的select轮询同样也是阻塞的。在就绪队列里的数据为空的时候,select内部的监听线程就会阻塞。

所以我们说NIO是一个典型的同步非阻塞的I/O。而底层的IO模型是采用的epoll方式的多路复用模型。(在UNIX和Linux下)

总结一下,epoll模型相比select/poll模型的一些优点

select/poll模型

  1. 1、每次调用select,都需要把fd集合从用户态拷贝到内核态。
  2. 2、每次调用select都需要在内核遍历传递进来的所有fd。
  3. 3、select支持的文件描述符数量太小了,默认是1024。

epoll模型

  1. 1、初次调用select时,会挂载所有的fd进来,并且没有从用户态到内核态的内存复制,而是通过内核和用户空间mmap同一块内存来实现的。
  2. 2、epoll在事件就绪时的触发并没有遍历所有fd,而是遍历就绪态的fd链表,节省了大量的CPU时间。
  3. 3、所支持的fd的上限是操作系统的最大文件句柄数,简单理解,也就是可以支持的连接数。一般来说1GB内存的机器上大约是10W个句柄左右

二、NIO的模型介绍和实现原理

NIO这个概念,早在jdk1.4的时候就支持了,我们完全可以通过jdk中的nio功能模块去实现一个NIO框架的服务。下面给出一个简单的例子

public class NioDemo {


public static void main(String[] args)
{
    try {
        initServer(9999);
        listenSelector();
    } catch (IOException e) {
        e.printStackTrace();
    }

}

private static Selector selector;

public static void initServer(int port) throws IOException {
    //init一个通道,并且打开对连接的接收
    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    //设置为非阻塞
    serverSocketChannel.configureBlocking(false);
    //绑定端口
    serverSocketChannel.socket().bind(new InetSocketAddress(port));
    //打开多路复用器,监控监听fd
    selector = Selector.open();
    //注册监听器,SelectionKey.OP_ACCEPT OP_CONNECT OP_READ OP_WRITE
    serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);
    System.out.println("服务端启动成功");
}

public static void listenSelector() throws IOException {
    while (true) {
        System.out.println("select开始阻塞");
        selector.select();
        Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
        System.out.println("获取到新的key");
        while (keyIterator.hasNext()) {
            SelectionKey selectionKey = keyIterator.next();
            //删除已选的key,防止重复处理
            keyIterator.remove();
            try {
                handler(selectionKey,keyIterator);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

public static void  handler(SelectionKey selectionKey,Iterator<SelectionKey> keyIterator) throws IOException {

    if(selectionKey.isAcceptable()) {
        System.out.println("新的客户端连接");
        //有新的客户端连接则注册到读就就绪事件
        ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
        SocketChannel channel = serverSocketChannel.accept();
        channel.configureBlocking(false);
        channel.register(selector,SelectionKey.OP_READ);
    } else if(selectionKey.isReadable()) {
        //通道可读说明可以从buffer里取数据
        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        int readData = socketChannel.read(buffer);
        if(readData>0) {
            String msg = new String(buffer.array(),"GBK").trim();
            System.out.println("服务端收到消息"+msg);
            ByteBuffer byteBuffer = ByteBuffer.wrap("我收到你的消费了".getBytes("UTF-8"));
            socketChannel.write(byteBuffer);
        } else {
            System.out.println("客户端关闭");
            selectionKey.cancel();
        }
    } else {
        System.out.println(selectionKey.isValid());
    }
}
}

当我telnet 192.168.0.101 9999时,会打印

服务端启动成功
select开始阻塞
获取到新的key
新的客户端连接
select开始阻塞  

当我通过浏览器访问时,会打印

服务端启动成功
select开始阻塞
获取到新的key
新的客户端连接
select开始阻塞
获取到新的key
新的客户端连接
select开始阻塞
获取到新的key
新的客户端连接
服务端收到消息GET / HTTP/1.1
Host: 192.168.0.101:9999
Connection: keep-alive
Cache-Control: max-age=0
Upgrade-Insecure-Requests: 1
User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.103 Safari/537.36
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3
Accept-Encoding: gzip, deflate
Accept-Language: zh-CN,zh;q=0.9,en;q=0.8
select开始阻塞
获取到新的key
客户端关闭
select开始阻塞

演示完了上面的代码,可以大概的看到他的执行思路是优先注册链接事件,然后监听这个事件,收到事件后处理完成后,又向select注册接下来的读取就绪、写入就绪事件。我们称这种开发模型为Reactor模型,也叫反应堆模型

事件驱动模型,有一个或多个并发输入源,有一个Service Handler,有多个Request Handlers

三、基于NIO实现的Nettty通信框架

Netty是基于NIO实现的网络通信框架,在rpc框架中被广为应用(dubbo、jsf),同时Netty可以支持多种协议的开发,非常适合高并发的网络编程(弹幕、游戏服务器等)
下面是Netty对JDK原生NIO的一些增强

  1. 1、实现了事件分发,和业务执行的线程池隔离。(也就是我们说的IO线程、工作线程职责剥离)
  2. 2、一个NIO服务端处理网络的闪断、客户端的重复接入、客户端的安全认证、消息的编解码、半包读写等情况。而这些在Netty中都得到了很好的解决。
  3. 3、代码编写较复杂,缺少封装,每增加一层处理需要修改的地方有很多,且很难调试。而Netty实现了PipeLine来实现不同的上下行的Handler。
  4. 4、需要具备其他的额外技能做铺垫,例如熟悉Java多线程编程。这是因为NIO编程涉及到Reactor模式,你必须对多线程和网路编程非常熟悉,才能编写出高质量的NIO程序。
  5. 5、Netty在健壮性、功能、性能、可定制性和可扩展性在同类框架中都是首屈一指的。

四、Netty案例演示

这里以弹幕为例,介绍一下Netty在实际项目中的应用。

这里我们使用WebSocket+Netty的方式来实践弹幕的推送,设想一下,如果我们不用Netty能不能实现弹幕系统的功能?肯定是可以实现的:

简单暴力的方案:Ajax轮询去服务器取消息。客户端按照某个时间间隔不断地向服务端发送请求,请求服务端的最新数据然后更新客户端显示。这种方式实际上浪费了大量流量并且对服务端造成了很大压力。

以上方案的弊端:

  1. 1、Http为半双工超文本协议,也就是说同一时刻,只有一个方向的数据传送。
  2. 2、Http消息冗长,包含请求行、请求头、请求体。占用很多的带宽和服务器资源。
  3. 3、空轮询问题。

使用WebSocket,可以很好的解决http协议带来的问题。

webSocket特点如下:

  1. 1、单一TCP长连接,采用全双工通信模式。这是一个二进制的协议
  2. 2、对代理、防火墙透明。
  3. 3、无头部信息、消息更精简。
  4. 4、通过ping/pong 来保活。
  5. 5、服务器可以主动推送消息给客户端,不在需要客户轮询

直接看代码吧

WebsocketDanmuServer.clss

/**
 * 基于Websocket的弹幕服务
 */
public class WebsocketDanmuServer {

private static final Integer port = 7777;

public static void main(String[] args) {

    ServerBootstrap bootstrap = new ServerBootstrap();

    EventLoopGroup bossGroup = new NioEventLoopGroup();
    EventLoopGroup workerGroup = new NioEventLoopGroup();

    bootstrap.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .childHandler(new WebsocketDanmuChannelInitializer());

    try {
        ChannelFuture channelFuture = bootstrap.bind("127.0.0.1", port).sync();
        System.out.println("弹幕服务器启动,网址是 : " + "http://127.0.0.1:" + port);
        channelFuture.channel().closeFuture().sync();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }
}
}

WebsocketDanmuChannelInitializer

    /**
 * 弹幕服务的上下行handler
 */
    public class WebsocketDanmuChannelInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline channelPipeline = socketChannel.pipeline();
        channelPipeline.addLast("http-decode", new HttpRequestDecoder());//解码
        channelPipeline.addLast("http-encode", new HttpResponseEncoder());//编码
        channelPipeline.addLast("http-aggregator", new HttpObjectAggregator(65536));
        channelPipeline.addLast("http-chunked", new ChunkedWriteHandler());

        channelPipeline.addLast("http-request",new HttpRequestHandler("/ws"));
        channelPipeline.addLast("WebSocket-protocol",new WebSocketServerProtocolHandler("/ws"));
        channelPipeline.addLast("WebSocket-request",new TextWebSocketFrameHandler());
    }

}

HttpRequestHandler

public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> {

private final String wsUri;
private static final File INDEX;

static {
    URL location = HttpRequestHandler.class.getProtectionDomain().getCodeSource().getLocation();
    try {
        String path = location.toURI() + "WebsocketDanMu.html";
        path = !path.contains("file:") ? path : path.substring(5);
        INDEX = new File(path);
    } catch (URISyntaxException e) {
        throw new IllegalStateException("Unable to locate WebsocketChatClient.html", e);
    }
}

public HttpRequestHandler(String wsUri) {
    this.wsUri = wsUri;
}

@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, FullHttpRequest fullHttpRequest) throws Exception {
    if (wsUri.equalsIgnoreCase(fullHttpRequest.getUri())) {
        channelHandlerContext.fireChannelRead(fullHttpRequest.retain()); 
    } else {
        if (HttpHeaders.is100ContinueExpected(fullHttpRequest)) {
            send100Continue(channelHandlerContext);                               
        }

        RandomAccessFile file = new RandomAccessFile(INDEX, "r");

        HttpResponse response = new DefaultHttpResponse(fullHttpRequest.getProtocolVersion(), HttpResponseStatus.OK);
        response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/html; charset=UTF-8");

        boolean keepAlive = HttpHeaders.isKeepAlive(fullHttpRequest);

        if (keepAlive) {          
            response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, file.length());
            response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
        }
        channelHandlerContext.write(response);                    

        if (channelHandlerContext.pipeline().get(SslHandler.class) == null) {     
            channelHandlerContext.write(new DefaultFileRegion(file.getChannel(), 0, file.length()));
        } else {
            channelHandlerContext.write(new ChunkedNioFile(file.getChannel()));
        }
        ChannelFuture future = channelHandlerContext.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);          
        if (!keepAlive) {
            future.addListener(ChannelFutureListener.CLOSE);        
        }

        file.close();
    }
}

private static void send100Continue(ChannelHandlerContext ctx) {
    FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);
    ctx.writeAndFlush(response);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    Channel incoming = ctx.channel();
    System.out.println("Client:" + incoming.remoteAddress() + "异常");
    // 当出现异常就关闭连接
    cause.printStackTrace();
    ctx.close();
}

TextWebSocketFrameHandler

public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {
    Channel incoming = channelHandlerContext.channel();
    for (Channel channel : channels) {
        if (channel != incoming) {
            channel.writeAndFlush(new TextWebSocketFrame(textWebSocketFrame.text()));
        } else {
            channel.writeAndFlush(new TextWebSocketFrame(textWebSocketFrame.text()));
        }
    }
}

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    Channel incoming = ctx.channel();
    channels.writeAndFlush(new TextWebSocketFrame("[SERVER] - " + incoming.remoteAddress() + " 加入"));
    channels.add(incoming);
    System.out.println("Client:" + incoming.remoteAddress() + "加入");
}

@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    Channel incoming = ctx.channel();
    channels.writeAndFlush(new TextWebSocketFrame("[SERVER] - " + incoming.remoteAddress() + " 离开"));
    System.err.println("Client:" + incoming.remoteAddress() + "离开");
    // 不需要手动remove"channels.remove(ctx.channel());"
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    Channel incoming = ctx.channel();
    System.out.println("Client:" + incoming.remoteAddress() + "在线");
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    Channel incoming = ctx.channel();
    System.out.println("Client:" + incoming.remoteAddress() + "掉线");
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    Channel incoming = ctx.channel();
    System.err.println("Client:" + incoming.remoteAddress() + "异常");
    // 当出现异常就关闭连接
    cause.printStackTrace();
    ctx.close();
}
}