扫一扫
关注公众号
我们可以分别启动高版本tomcat-8和低版本tomcat-6,然后模拟连接 Socket socket = new Socket(“localhost”,8080)会发现BIO和NIO最明显的区别
所以NIO在做高并发和高吞吐量服务时比BIO更加适合,并且在长链接的情况下,同一时间对CPU的利用率也更高。
传统的BIO也叫”同步阻塞IO”,还有一种叫做”同步非阻塞I/O”
那么问题来了?我们的NIO 底层用的是那种I/O模型呢?其实是IO多路复用
这里提到了两个概念:
1、select/poll 多路由复用器(这里可以先简单的理解成一个队列,有一个单线程在不断的轮询处理队列里的事件)
2、fd 类似于一种请求事件(Socket描述符).有一点很重要,这里的select轮询是阻塞的。
刚刚说的I/O复用模型可以说是奠定了NIO的模型基础,但是我们的UNIX对这个模型做了进一步的优化,刚刚的图我们可以发现一个明显的问题,什么问题呢?select/poll线程像傻瓜一样的顺序轮询fd列表,而且处理的fd也是有限的。默认是1024个。这个时候 epoll(Event Poll 基于事件驱动的select/poll模型)模型就呼之欲出了。
这里的select轮询同样也是阻塞的。在就绪队列里的数据为空的时候,select内部的监听线程就会阻塞。
所以我们说NIO是一个典型的同步非阻塞的I/O。而底层的IO模型是采用的epoll方式的多路复用模型。(在UNIX和Linux下)
总结一下,epoll模型相比select/poll模型的一些优点
select/poll模型
epoll模型
NIO这个概念,早在jdk1.4的时候就支持了,我们完全可以通过jdk中的nio功能模块去实现一个NIO框架的服务。下面给出一个简单的例子
public class NioDemo {
public static void main(String[] args)
{
try {
initServer(9999);
listenSelector();
} catch (IOException e) {
e.printStackTrace();
}
}
private static Selector selector;
public static void initServer(int port) throws IOException {
//init一个通道,并且打开对连接的接收
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//设置为非阻塞
serverSocketChannel.configureBlocking(false);
//绑定端口
serverSocketChannel.socket().bind(new InetSocketAddress(port));
//打开多路复用器,监控监听fd
selector = Selector.open();
//注册监听器,SelectionKey.OP_ACCEPT OP_CONNECT OP_READ OP_WRITE
serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);
System.out.println("服务端启动成功");
}
public static void listenSelector() throws IOException {
while (true) {
System.out.println("select开始阻塞");
selector.select();
Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
System.out.println("获取到新的key");
while (keyIterator.hasNext()) {
SelectionKey selectionKey = keyIterator.next();
//删除已选的key,防止重复处理
keyIterator.remove();
try {
handler(selectionKey,keyIterator);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
public static void handler(SelectionKey selectionKey,Iterator<SelectionKey> keyIterator) throws IOException {
if(selectionKey.isAcceptable()) {
System.out.println("新的客户端连接");
//有新的客户端连接则注册到读就就绪事件
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
SocketChannel channel = serverSocketChannel.accept();
channel.configureBlocking(false);
channel.register(selector,SelectionKey.OP_READ);
} else if(selectionKey.isReadable()) {
//通道可读说明可以从buffer里取数据
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int readData = socketChannel.read(buffer);
if(readData>0) {
String msg = new String(buffer.array(),"GBK").trim();
System.out.println("服务端收到消息"+msg);
ByteBuffer byteBuffer = ByteBuffer.wrap("我收到你的消费了".getBytes("UTF-8"));
socketChannel.write(byteBuffer);
} else {
System.out.println("客户端关闭");
selectionKey.cancel();
}
} else {
System.out.println(selectionKey.isValid());
}
}
}
当我telnet 192.168.0.101 9999时,会打印
服务端启动成功
select开始阻塞
获取到新的key
新的客户端连接
select开始阻塞
当我通过浏览器访问时,会打印
服务端启动成功
select开始阻塞
获取到新的key
新的客户端连接
select开始阻塞
获取到新的key
新的客户端连接
select开始阻塞
获取到新的key
新的客户端连接
服务端收到消息GET / HTTP/1.1
Host: 192.168.0.101:9999
Connection: keep-alive
Cache-Control: max-age=0
Upgrade-Insecure-Requests: 1
User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.103 Safari/537.36
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3
Accept-Encoding: gzip, deflate
Accept-Language: zh-CN,zh;q=0.9,en;q=0.8
select开始阻塞
获取到新的key
客户端关闭
select开始阻塞
演示完了上面的代码,可以大概的看到他的执行思路是优先注册链接事件,然后监听这个事件,收到事件后处理完成后,又向select注册接下来的读取就绪、写入就绪事件。我们称这种开发模型为Reactor模型,也叫反应堆模型
事件驱动模型,有一个或多个并发输入源,有一个Service Handler,有多个Request Handlers
Netty是基于NIO实现的网络通信框架,在rpc框架中被广为应用(dubbo、jsf),同时Netty可以支持多种协议的开发,非常适合高并发的网络编程(弹幕、游戏服务器等)
下面是Netty对JDK原生NIO的一些增强
这里以弹幕为例,介绍一下Netty在实际项目中的应用。
这里我们使用WebSocket+Netty的方式来实践弹幕的推送,设想一下,如果我们不用Netty能不能实现弹幕系统的功能?肯定是可以实现的:
简单暴力的方案:Ajax轮询去服务器取消息。客户端按照某个时间间隔不断地向服务端发送请求,请求服务端的最新数据然后更新客户端显示。这种方式实际上浪费了大量流量并且对服务端造成了很大压力。
以上方案的弊端:
使用WebSocket,可以很好的解决http协议带来的问题。
webSocket特点如下:
直接看代码吧
WebsocketDanmuServer.clss
/**
* 基于Websocket的弹幕服务
*/
public class WebsocketDanmuServer {
private static final Integer port = 7777;
public static void main(String[] args) {
ServerBootstrap bootstrap = new ServerBootstrap();
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new WebsocketDanmuChannelInitializer());
try {
ChannelFuture channelFuture = bootstrap.bind("127.0.0.1", port).sync();
System.out.println("弹幕服务器启动,网址是 : " + "http://127.0.0.1:" + port);
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
WebsocketDanmuChannelInitializer
/**
* 弹幕服务的上下行handler
*/
public class WebsocketDanmuChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline channelPipeline = socketChannel.pipeline();
channelPipeline.addLast("http-decode", new HttpRequestDecoder());//解码
channelPipeline.addLast("http-encode", new HttpResponseEncoder());//编码
channelPipeline.addLast("http-aggregator", new HttpObjectAggregator(65536));
channelPipeline.addLast("http-chunked", new ChunkedWriteHandler());
channelPipeline.addLast("http-request",new HttpRequestHandler("/ws"));
channelPipeline.addLast("WebSocket-protocol",new WebSocketServerProtocolHandler("/ws"));
channelPipeline.addLast("WebSocket-request",new TextWebSocketFrameHandler());
}
}
HttpRequestHandler
public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
private final String wsUri;
private static final File INDEX;
static {
URL location = HttpRequestHandler.class.getProtectionDomain().getCodeSource().getLocation();
try {
String path = location.toURI() + "WebsocketDanMu.html";
path = !path.contains("file:") ? path : path.substring(5);
INDEX = new File(path);
} catch (URISyntaxException e) {
throw new IllegalStateException("Unable to locate WebsocketChatClient.html", e);
}
}
public HttpRequestHandler(String wsUri) {
this.wsUri = wsUri;
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, FullHttpRequest fullHttpRequest) throws Exception {
if (wsUri.equalsIgnoreCase(fullHttpRequest.getUri())) {
channelHandlerContext.fireChannelRead(fullHttpRequest.retain());
} else {
if (HttpHeaders.is100ContinueExpected(fullHttpRequest)) {
send100Continue(channelHandlerContext);
}
RandomAccessFile file = new RandomAccessFile(INDEX, "r");
HttpResponse response = new DefaultHttpResponse(fullHttpRequest.getProtocolVersion(), HttpResponseStatus.OK);
response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/html; charset=UTF-8");
boolean keepAlive = HttpHeaders.isKeepAlive(fullHttpRequest);
if (keepAlive) {
response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, file.length());
response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
}
channelHandlerContext.write(response);
if (channelHandlerContext.pipeline().get(SslHandler.class) == null) {
channelHandlerContext.write(new DefaultFileRegion(file.getChannel(), 0, file.length()));
} else {
channelHandlerContext.write(new ChunkedNioFile(file.getChannel()));
}
ChannelFuture future = channelHandlerContext.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
if (!keepAlive) {
future.addListener(ChannelFutureListener.CLOSE);
}
file.close();
}
}
private static void send100Continue(ChannelHandlerContext ctx) {
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);
ctx.writeAndFlush(response);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Channel incoming = ctx.channel();
System.out.println("Client:" + incoming.remoteAddress() + "异常");
// 当出现异常就关闭连接
cause.printStackTrace();
ctx.close();
}
TextWebSocketFrameHandler
public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {
Channel incoming = channelHandlerContext.channel();
for (Channel channel : channels) {
if (channel != incoming) {
channel.writeAndFlush(new TextWebSocketFrame(textWebSocketFrame.text()));
} else {
channel.writeAndFlush(new TextWebSocketFrame(textWebSocketFrame.text()));
}
}
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel incoming = ctx.channel();
channels.writeAndFlush(new TextWebSocketFrame("[SERVER] - " + incoming.remoteAddress() + " 加入"));
channels.add(incoming);
System.out.println("Client:" + incoming.remoteAddress() + "加入");
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel incoming = ctx.channel();
channels.writeAndFlush(new TextWebSocketFrame("[SERVER] - " + incoming.remoteAddress() + " 离开"));
System.err.println("Client:" + incoming.remoteAddress() + "离开");
// 不需要手动remove"channels.remove(ctx.channel());"
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel incoming = ctx.channel();
System.out.println("Client:" + incoming.remoteAddress() + "在线");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel incoming = ctx.channel();
System.out.println("Client:" + incoming.remoteAddress() + "掉线");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Channel incoming = ctx.channel();
System.err.println("Client:" + incoming.remoteAddress() + "异常");
// 当出现异常就关闭连接
cause.printStackTrace();
ctx.close();
}
}