ThinkChat🤖让你学习和工作更高效,注册即送10W Token,即刻开启你的AI之旅 广告
[TOC] # 简介 **nio优势不在于数据传送的速度** nio是new io的简称,在jdk1.4里面提供的新api 为所有的原始类型提供(Buffer)缓存支持 字符集编码解码解决方案 channel:一个新的原始i/o抽象 支持锁和内存映射文件的文件访问接口 提供多路(non-bloking)非阻塞式的高伸缩性网络I/O # socket /nio原理 1. 阻塞和非阻塞 阻塞和非阻塞是进程在访问数据的时候,数据是否准备就绪的一种处理方式。 当数据没有准备好的时候, 阻塞:往往需要等待缓冲区中的数据准备好之后才处理,否则一直等待。 非阻塞:当我们的进程访问我们的数据缓冲区的时候,数据没有准备好的时候,直接返回,不需要等待。有数据的时候,也直接返回 2. 同步和异步 同步和异步都是基于应用程序和操作系统处理IO事件所采用的方式: 同步:应用程序要直接参与IO事件的操作; 异步:所有的IO读写事件交给操作系统去处理; 同步的方式在处理IO事件的时候,必须阻塞在某个方法上面等待我们的IO事件完成(阻塞在IO事件或者通过轮询IO事件的方式);对于异步来说,所有的IO读写都交给了操作系统,这个时候,我们可以去做其他的事情,并不需要去完成真正的IO操作。当操作系统完成IO之后,给我们的应用程序一个通知就可以了。 同步有两种实现模式: 1. 阻塞到IO事件 阻塞到read 或者 write 方法上,这个时候我们就完全不能做自己的事情。(在这种情况下,我们只能把读写方法放置到线程中,然后阻塞线程的方式来实现并发服务,对线程的性能开销比较大) 2. IO事件的轮询 --在linux c语言编程中叫做多路复用技术(select模式) 读写事件交给一个专门的线程来处理,这个线程完成IO事件的注册功能,还有就是不断地去轮询我们的读写缓冲区(操作系统),看是否有数据准备好,然后通知我们的相应的业务处理线程。这样的话,我们的业务处理线程就可以做其他的事情。在这种模式下,阻塞的不是所有的IO线程,而是阻塞的只是select线程 比喻说明: Client Selector 管家 BOSS 当客人来的时候,就给管家说,我来了(注册),管家得到这个注册信息后,就给BOSS说,我这里有一个或者多个客人。BOSS就说你去给某人A这件东西(IO数据),给另外一个人B另一件东西。这个时候,客人是可以去做自己的事情(比如看看花园等等),当管家知道BOSS给他任务后,他就会去找对应的某人(根据客人的注册信息),告诉他BOSS给他了某样东西。 # JAVA IO模型 基于以上4中IO模型,JAVA对应的实现有: BIO--同步阻塞: JDK1.4以前我们使用的都是BIO 阻塞到我们的读写方法,阻塞到线程来提高并发性能,但是效果不是很好 NIO--同步非阻塞:JDK1.4 linux多路复用技术(select模式) 实现IO事件的轮询方式:同步非阻塞的模式,这种方式目前是主流的网络通信模式 mina netty ——网络通信框架,比自己写NIO要容易些,并且代码可读性更好 AIO:JDK1.7(NIO2)真正的异步非阻塞IO(基于linux的epoll模式) 小结: 1)BIO阻塞的IO 2)NIO select多路复用+非阻塞 同步非阻塞 3)AIO异步非阻塞IO # NIO详解 New IO成功的解决了上述问题,它是怎样解决的呢? IO处理客户端请求的最小单位是线程 而NIO使用了比线程还小一级的单位:通道(Channel) 可以说,NIO中只需要一个线程就能完成所有接收,读,写等操作 要学习NIO,首先要理解它的三大核心 Selector,选择器 Buffer,缓冲区 Channel,通道 **Buffer** 首先要知道什么是Buffer 在NIO中数据交互不再像IO机制那样使用流 而是使用Buffer(缓冲区) 可以看出Buffer在整个工作流程中的位置 buffer实际上是一个容器,一个连续数组,它通过几个变量来保存这个数据的当前位置状态: 1. capacity:容量,缓冲区能容纳元素的数量 2. position:当前位置,是缓冲区中下一次发生读取和写入操作的索引,当前位置通过大多数读写操作向前推进 3. limit:界限,是缓冲区中最后一个有效位置之后下一个位置的索引 如图: ![](https://box.kancloud.cn/28959b3f44c5a9aa5bf630ca7dc817c8_1442x492.png) 几个常用方法: ~~~ .flip() //将limit设置为position,然后position重置为0,返回对缓冲区的引用 .clear() //清空调用缓冲区并返回对缓冲区的引用 ~~~ 来点实际点的,上面图中的具体代码如下: 1. 首先给Buffer分配空间,以字节为单位 ~~~ ByteBuffer byteBuffer = ByteBuffer.allocate(1024); ~~~ 创建一个ByteBuffer对象并且指定内存大小 2. 向Buffer中写入数据: ~~~ 1).数据从Channel到Buffer:channel.read(byteBuffer); 2).数据从Client到Buffer:byteBuffer.put(...); ~~~ 3. 从Buffer中读取数据: ~~~ 1).数据从Buffer到Channel:channel.write(byteBuffer); 2).数据从Buffer到Server:byteBuffer.get(...); ~~~ **Selector** 选择器是NIO的核心,它是channel的管理者 通过执行select()阻塞方法,监听是否有channel准备好 一旦有数据可读,此方法的返回值是SelectionKey的数量 所以服务端通常会死循环执行select()方法,直到有channl准备就绪,然后开始工作 每个channel都会和Selector绑定一个事件,然后生成一个SelectionKey的对象 需要注意的是: channel和Selector绑定时,channel必须是非阻塞模式 而FileChannel不能切换到非阻塞模式,因为它不是套接字通道,所以FileChannel不能和Selector绑定事件 在NIO中一共有四种事件: 1. SelectionKey.OP_CONNECT:连接事件 2. SelectionKey.OP_ACCEPT:接收事件 3. SelectionKey.OP_READ:读事件 4. SelectionKey.OP_WRITE:写事件 **Channel** 共有四种通道: FileChannel:作用于IO文件流 DatagramChannel:作用于UDP协议 SocketChannel:作用于TCP协议 ServerSocketChannel:作用于TCP协议 本篇文章通过常用的TCP协议来讲解NIO 我们以ServerSocketChannel为例: 打开一个ServerSocketChannel通道 ~~~ ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); ~~~ 关闭ServerSocketChannel通道: ~~~ serverSocketChannel.close(); ~~~ 循环监听SocketChannel: ~~~ while(true){ SocketChannel socketChannel = serverSocketChannel.accept(); clientChannel.configureBlocking(false); } ~~~ `clientChannel.configureBlocking(false);`语句是将此通道设置为非阻塞,也就是异步 自由控制阻塞或非阻塞便是NIO的特性之一 **SelectionKey** SelectionKey是通道和选择器交互的核心组件 比如在SocketChannel上绑定一个Selector,并注册为连接事件: ~~~ SocketChannel clientChannel = SocketChannel.open(); clientChannel.configureBlocking(false); clientChannel.connect(new InetSocketAddress(port)); clientChannel.register(selector, SelectionKey.OP_CONNECT); ~~~ 核心在register()方法,它返回一个SelectionKey对象 来检测channel事件是那种事件可以使用以下方法: ~~~ selectionKey.isAcceptable(); selectionKey.isConnectable(); selectionKey.isReadable(); selectionKey.isWritable(); ~~~ 服务端便是通过这些方法 在轮询中执行相对应操作 当然通过Channel与Selector绑定的key也可以反过来拿到他们 ~~~ Channel channel = selectionKey.channel(); Selector selector = selectionKey.selector(); ~~~ 在Channel上注册事件时,我们也可以顺带绑定一个Buffer: ~~~ clientChannel.register(key.selector(), SelectionKey.OP_READ,ByteBuffer.allocateDirect(1024)); ~~~ 或者绑定一个Object: ~~~ selectionKey.attach(Object); Object anthorObj = selectionKey.attachment(); ~~~ # NIO代码 **编解码类CharsetHelper** ~~~ package com.nio; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.charset.CharacterCodingException; import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; import java.nio.charset.CharsetEncoder; public class CharsetHelper { private static final String UTF_8 = "UTF-8"; private static CharsetEncoder encoder = Charset.forName(UTF_8).newEncoder(); private static CharsetDecoder decoder = Charset.forName(UTF_8).newDecoder(); public static ByteBuffer encode(CharBuffer in) throws CharacterCodingException { return encoder.encode(in); } public static CharBuffer decode(ByteBuffer in) throws CharacterCodingException{ return decoder.decode(in); } } ~~~ **服务端类NioServer** ~~~ package com.nio; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.channels.*; import java.util.Iterator; public class NioServer { //和操作系统交互的缓存 private ByteBuffer readBuffer; //轮询器 private Selector selector; public static void main(String[] args) { NioServer server = new NioServer(); server.init(); System.out.println("server started --> 8383"); server.listen(); } private void init() { //分配个缓存 readBuffer = ByteBuffer.allocate(1024); ServerSocketChannel serverSocketChannel; try { //创建一个socket channel; channel是nio中对通信通道的抽象,不分入站出站方向 serverSocketChannel = ServerSocketChannel.open(); //设置通道为非阻塞的方式 serverSocketChannel.configureBlocking(false); //将通道绑定在服务器的ip地址和某个端口上 serverSocketChannel.socket().bind(new InetSocketAddress(8383)); //打开一个多路复用器 selector = Selector.open(); //将上面创建好的socket channel注册到selector多路复用器上 //对于复用端来说,一定要先注册一个OP_ACCEPT事件用来响应客户端的连接请求 //将上述的通道管理器和通道绑定,并为该通道注册OP_ACCEPT事件 //注册事件后,当该事件到达时,selector.select()会返回(一个key),如果该事件没到达selector.select()会一直阻塞 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); }catch (Exception e) { e.printStackTrace(); } } private void listen() { while(true) { try { //去询问一次selector选择器 //这是一个阻塞方法,一直等待直到有数据可读,返回值是key的数量(可以有多个) selector.select(); //拿到事件的key //如果channel有数据了,将生成的key访入keys集合中,得到这个keys集合的迭代器 Iterator<SelectionKey> ite = selector.selectedKeys().iterator(); //使用迭代器遍历集合 while (ite.hasNext()) { //遍历到一个事件key,得到集合中的一个key实例 SelectionKey key = ite.next(); //确保不重复处理 //拿到当前key实例之后记得在迭代器中将这个元素删除,非常重要,否则会出错 ite.remove(); //处理事件 handlekey(key); } }catch (Exception e) { e.printStackTrace(); } } } //处理事件 private void handlekey(SelectionKey key) { SocketChannel channel = null; try { //这个key是可接受连接的吗 if (key.isAcceptable()) { ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel(); //接收连接请求 channel = serverChannel.accept(); channel.configureBlocking(false); //对读的事件感兴趣 channel.register(selector,SelectionKey.OP_READ); //这个事件是可读的吗 }else if(key.isReadable()) { channel = (SocketChannel) key.channel(); //先清空一下buffer,因为要用,以防是老的数据 readBuffer.clear(); /** * 当客户端channel关闭后,会不断收到read事件,但没有消息,即read方法返回-1 * 所以这时服务器端也需要关闭channel,避免无限无效的处理 * 把消息read到readBuffer中 */ int count = channel.read(readBuffer); if (count > 0) { // 一定需要调用flip函数,否则读取错误数据 // 简单来说,flip操作就是让读写指针、limit指针复位到正确的位置 readBuffer.flip(); /* * 使用CharBuffer配合取出正确的数据; * String question = new String(readBuffer.array());可能会出错, * 因为前面readBuffer.clear();并未真正清理数据 只是重置缓冲区的position, * limit, mark, 而readBuffer.array()会返回整个缓冲区的内容。 * decode方法只取readBuffer的position到limit数据。 * 例如,上一次读取到缓冲区的是"where", clear后position为0,limit为 1024, * 再次读取“bye"到缓冲区后,position为3,limit不变, * flip后position为0,limit为3,前三个字符被覆盖了,但"re"还存在缓冲区中, 所以 new * String(readBuffer.array()) 返回 "byere", * 而decode(readBuffer)返回"bye"。 */ CharBuffer charBuffer = CharsetHelper.decode(readBuffer); String question = charBuffer.toString(); // 根据客户端的请求,调用相应的业务方法获取业务结果 String answer = getAnswer(question); channel.write(CharsetHelper.encode(CharBuffer.wrap(answer))); } else { // 这里关闭channel,因为客户端已经关闭channel或者异常了 channel.close(); } } }catch (Exception e) { e.printStackTrace(); } } private String getAnswer(String question) { String answer = null; if ("who".equals(question)) { answer = "我是凤姐\n"; } else if ("what".equals(question)) { answer = "我是来帮你解闷的\n"; } else if ("where".equals(question)) { answer = "我来自外太空\n"; } else if ("hi".equals(question)) { answer = "hello\n"; } else if ("bye".equals(question)) { answer = "88\n"; } else { answer = "请输入 who, 或者what, 或者where"; } return answer; } } ~~~ **客户端NioClient** ~~~ package com.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Random; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; public class NioClient implements Runnable{ private BlockingQueue<String> words; private Random random; public static void main(String[] args) { // 多个线程发起Socket客户端连接请求 for(int i=0; i<10; i++){ NioClient c = new NioClient(); c.init(); new Thread(c).start(); } } @Override public void run() { SocketChannel channel = null; Selector selector = null; try { channel = SocketChannel.open(); channel.configureBlocking(false); // 主动请求连接 channel.connect(new InetSocketAddress("localhost", 8383)); selector = Selector.open(); channel.register(selector, SelectionKey.OP_CONNECT); boolean isOver = false; while(! isOver){ selector.select(); Iterator<SelectionKey> ite = selector.selectedKeys().iterator(); while(ite.hasNext()){ SelectionKey key = ite.next(); ite.remove(); if(key.isConnectable()){ if(channel.isConnectionPending()){ if(channel.finishConnect()){ //只有当连接成功后才能注册OP_READ事件 key.interestOps(SelectionKey.OP_READ); channel.write(CharsetHelper.encode(CharBuffer.wrap(getWord()))); sleep(); } else{ key.cancel(); } } } else if(key.isReadable()){ ByteBuffer byteBuffer = ByteBuffer.allocate(128); channel.read(byteBuffer); byteBuffer.flip(); CharBuffer charBuffer = CharsetHelper.decode(byteBuffer); String answer = charBuffer.toString(); System.out.println(Thread.currentThread().getId() + "---" + answer); String word = getWord(); if(word != null){ channel.write(CharsetHelper.encode(CharBuffer.wrap(word))); } else{ isOver = true; } sleep(); } } } } catch (IOException e) { // TODO } finally{ if(channel != null){ try { channel.close(); } catch (IOException e) { e.printStackTrace(); } } if(selector != null){ try { selector.close(); } catch (IOException e) { e.printStackTrace(); } } } } private void init() { words = new ArrayBlockingQueue<String>(5); try { words.put("hi"); words.put("who"); words.put("what"); words.put("where"); words.put("bye"); } catch (InterruptedException e) { e.printStackTrace(); } random = new Random(); } private String getWord(){ return words.poll(); } private void sleep() { try { TimeUnit.SECONDS.sleep(random.nextInt(3)); } catch (InterruptedException e) { e.printStackTrace(); } } private void sleep(long l) { try { TimeUnit.SECONDS.sleep(l); } catch (InterruptedException e) { e.printStackTrace(); } } } ~~~