Netty 源码分析之 番外篇 Java NIO 的前生今世
<h2><strong>简介</strong></h2> <p>Java NIO 是由 Java 1.4 引进的异步 IO.</p> <p>Java NIO 由以下几个核心部分组成:</p> <ul> <li>Channel</li> <li>Buffer</li> <li>Selector</li> </ul> <h3><strong>NIO 和 IO 的对比</strong></h3> <p>IO 和 NIO 的区别主要体现在三个方面:</p> <ul> <li>IO 基于流(Stream oriented), 而 NIO 基于 Buffer (Buffer oriented)</li> <li>IO 操作是阻塞的, 而 NIO 操作是非阻塞的</li> <li>IO 没有 selector 概念, 而 NIO 有 selector 概念.</li> </ul> <p><strong>基于 Stream 与基于 Buffer</strong></p> <p>传统的 IO 是面向字节流或字符流的, 而在 NIO 中, 我们抛弃了传统的 IO 流, 而是引入了 <strong>Channel</strong> 和 <strong>Buffer</strong> 的概念. 在 NIO 中, 我只能从 Channel 中读取数据到 Buffer 中或将数据从 Buffer 中写入到 Channel.</p> <p>那么什么是 <strong>基于流</strong> 呢? 在一般的 Java IO 操作中, 我们以流式的方式顺序地从一个 Stream 中读取一个或多个字节, 因此我们也就不能随意改变读取指针的位置.</p> <p>而 <strong>基于 Buffer</strong> 就显得有点不同了. 我们首先需要从 Channel 中读取数据到 Buffer 中, 当 Buffer 中有数据后, 我们就可以对这些数据进行操作了. 不像 IO 那样是顺序操作, NIO 中我们可以随意地读取任意位置的数据.</p> <p><strong>阻塞和非阻塞</strong></p> <p>Java 提供的各种 Stream 操作都是阻塞的, 例如我们调用一个 read 方法读取一个文件的内容, 那么调用 read 的线程会被阻塞住, 直到 read 操作完成.</p> <p>而 NIO 的非阻塞模式允许我们非阻塞地进行 IO 操作. 例如我们需要从网络中读取数据, 在 NIO 的非阻塞模式中, 当我们调用 read 方法时, 如果此时有数据, 则 read 读取并返回; 如果此时没有数据, 则 read 直接返回, 而不会阻塞当前线程.</p> <p><strong>selector</strong></p> <p>selector 是 NIO 中才有的概念, 它是 Java NIO 之所以可以非阻塞地进行 IO 操作的关键.</p> <p>通过 Selector, 一个线程可以监听多个 Channel 的 IO 事件, 当我们向一个 Selector 中注册了 Channel 后, Selector 内部的机制就可以自动地为我们不断地查询(select) 这些注册的 Channel 是否有已就绪的 IO 事件(例如可读, 可写, 网络连接完成等). 通过这样的 Selector 机制, 我们就可以很简单地使用一个线程高效地管理多个 Channel 了.</p> <h2><strong>Java NIO Channel</strong></h2> <p>通常来说, 所有的 NIO 的 I/O 操作都是从 Channel 开始的. 一个 channel 类似于一个 stream.</p> <p>java Stream 和 NIO Channel 对比</p> <ul> <li>我们可以在同一个 Channel 中执行读和写操作, 然而同一个 Stream 仅仅支持读或写.</li> <li>Channel 可以异步地读写, 而 Stream 是阻塞的同步读写.</li> <li>Channel 总是从 Buffer 中读取数据, 或将数据写入到 Buffer 中.</li> </ul> <p>Channel 类型有:</p> <ul> <li>FileChannel, 文件操作</li> <li>DatagramChannel, UDP 操作</li> <li>SocketChannel, TCP 操作</li> <li>ServerSocketChannel, TCP 操作, 使用在服务器端.</li> </ul> <p>这些通道涵盖了 UDP 和 TCP网络 IO以及文件 IO.</p> <p>基本的 Channel 使用例子:</p> <pre> <code class="language-erlang">public static void main( String[] args ) throws Exception { RandomAccessFile aFile = new RandomAccessFile("/Users/xiongyongshun/settings.xml", "rw"); FileChannel inChannel = aFile.getChannel(); ByteBuffer buf = ByteBuffer.allocate(48); int bytesRead = inChannel.read(buf); while (bytesRead != -1) { buf.flip(); while(buf.hasRemaining()){ System.out.print((char) buf.get()); } buf.clear(); bytesRead = inChannel.read(buf); } aFile.close(); }</code></pre> <h3><strong>FileChannel</strong></h3> <p>FileChannel 是操作文件的Channel, 我们可以通过 FileChannel 从一个文件中读取数据, 也可以将数据写入到文件中.</p> <p>注意 , FileChannel 不能设置为非阻塞模式.</p> <p><strong>打开 FileChannel</strong></p> <pre> <code class="language-erlang">RandomAccessFile aFile = new RandomAccessFile("data/nio-data.txt", "rw"); FileChannel inChannel = aFile.getChannel();</code></pre> <p><strong>从 FileChannel 中读取数据</strong></p> <pre> <code class="language-erlang">ByteBuffer buf = ByteBuffer.allocate(48); int bytesRead = inChannel.read(buf);</code></pre> <p><strong>写入数据</strong></p> <pre> <code class="language-erlang">String newData = "New String to write to file..." + System.currentTimeMillis(); ByteBuffer buf = ByteBuffer.allocate(48); buf.clear(); buf.put(newData.getBytes()); buf.flip(); while(buf.hasRemaining()) { channel.write(buf); }</code></pre> <p><strong>关闭</strong></p> <p>当我们对 FileChannel 的操作完成后, 必须将其关闭</p> <pre> <code class="language-erlang">channel.close();</code></pre> <p><strong>设置 position</strong></p> <pre> <code class="language-erlang">long pos channel.position(); channel.position(pos +123);</code></pre> <p><strong>文件大小</strong></p> <p>我们可以通过 channel.size()获取关联到这个 Channel 中的文件的大小. <strong>注意</strong> , 这里返回的是文件的大小, 而不是 Channel 中剩余的元素个数.</p> <p><strong>截断文件</strong></p> <pre> <code class="language-erlang">channel.truncate(1024);</code></pre> <p>将文件的大小截断为1024字节.</p> <p><strong>强制写入</strong></p> <p>我们可以强制将缓存的未写入的数据写入到文件中:</p> <pre> <code class="language-erlang">channel.force(true);</code></pre> <h3><strong>SocketChannel</strong></h3> <p>SocketChannel 是一个客户端用来进行 TCP 连接的 Channel.</p> <p>创建一个 SocketChannel 的方法有两种:</p> <ul> <li>打开一个 SocketChannel, 然后将其连接到某个服务器中</li> <li>当一个 ServerSocketChannel 接受到连接请求时, 会返回一个 SocketChannel 对象.</li> </ul> <p><strong>打开 SocketChannel</strong></p> <pre> <code class="language-erlang">SocketChannel socketChannel = SocketChannel.open(); socketChannel.connect(new InetSocketAddress("http://example.com", 80));</code></pre> <p><strong>关闭</strong></p> <pre> <code class="language-erlang">socketChannel.close();</code></pre> <p><strong>读取数据</strong></p> <pre> <code class="language-erlang">ByteBuffer buf = ByteBuffer.allocate(48); int bytesRead = socketChannel.read(buf);</code></pre> <p>如果 read()返回 <strong>-1</strong> , 那么表示连接中断了.</p> <p><strong>写入数据</strong></p> <pre> <code class="language-erlang">String newData = "New String to write to file..." + System.currentTimeMillis(); ByteBuffer buf = ByteBuffer.allocate(48); buf.clear(); buf.put(newData.getBytes()); buf.flip(); while(buf.hasRemaining()) { channel.write(buf); }</code></pre> <p><strong>非阻塞模式</strong></p> <p>我们可以设置 SocketChannel 为异步模式, 这样我们的 connect, read, write 都是异步的了.</p> <p><strong>连接</strong></p> <pre> <code class="language-erlang">socketChannel.configureBlocking(false); socketChannel.connect(new InetSocketAddress("http://example.com", 80)); while(! socketChannel.finishConnect() ){ //wait, or do something else... }</code></pre> <p>在异步模式中, 或许连接还没有建立, connect 方法就返回了, 因此我们需要检查当前是否是连接到了主机, 因此通过一个 while 循环来判断.</p> <p><strong>读写</strong></p> <p>在异步模式下, 读写的方式是一样的.</p> <p>在读取时, 因为是异步的, 因此我们必须检查 read 的返回值, 来判断当前是否读取到了数据.</p> <h3><strong>ServerSocketChannel</strong></h3> <p>ServerSocketChannel 顾名思义, 是用在服务器为端的, 可以监听客户端的 TCP 连接, 例如:</p> <pre> <code class="language-erlang">ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.socket().bind(new InetSocketAddress(9999)); while(true){ SocketChannel socketChannel = serverSocketChannel.accept(); //do something with socketChannel... }</code></pre> <p><strong>打开 关闭</strong></p> <pre> <code class="language-erlang">ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.close();</code></pre> <p><strong>监听连接</strong></p> <p>我们可以使用ServerSocketChannel.accept()方法来监听客户端的 TCP 连接请求, accept()方法会阻塞, 直到有连接到来, 当有连接时, 这个方法会返回一个 SocketChannel 对象:</p> <pre> <code class="language-erlang">while(true){ SocketChannel socketChannel = serverSocketChannel.accept(); //do something with socketChannel... }</code></pre> <p><strong>非阻塞模式</strong></p> <p>在非阻塞模式下, accept()是非阻塞的, 因此如果此时没有连接到来, 那么 accept()方法会返回null:</p> <pre> <code class="language-erlang">ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.socket().bind(new InetSocketAddress(9999)); serverSocketChannel.configureBlocking(false); while(true){ SocketChannel socketChannel = serverSocketChannel.accept(); if(socketChannel != null){ //do something with socketChannel... } }</code></pre> <h3><strong>DatagramChannel</strong></h3> <p>DatagramChannel 是用来处理 UDP 连接的.</p> <p><strong>打开</strong></p> <pre> <code class="language-erlang">DatagramChannel channel = DatagramChannel.open(); channel.socket().bind(new InetSocketAddress(9999));</code></pre> <p><strong>读取数据</strong></p> <pre> <code class="language-erlang">ByteBuffer buf = ByteBuffer.allocate(48); buf.clear(); channel.receive(buf);</code></pre> <p><strong>发送数据</strong></p> <pre> <code class="language-erlang">String newData = "New String to write to file..." + System.currentTimeMillis(); ByteBuffer buf = ByteBuffer.allocate(48); buf.clear(); buf.put(newData.getBytes()); buf.flip(); int bytesSent = channel.send(buf, new InetSocketAddress("example.com", 80));</code></pre> <p><strong>连接到指定地址</strong></p> <p>因为 UDP 是非连接的, 因此这个的 connect 并不是向 TCP 一样真正意义上的连接, 而是它会讲 DatagramChannel 锁住, 因此我们仅仅可以从指定的地址中读取或写入数据.</p> <pre> <code class="language-erlang">channel.connect(new InetSocketAddress("example.com", 80));</code></pre> <h2><strong>Java NIO Buffer</strong></h2> <p>当我们需要与 NIO Channel 进行交互时, 我们就需要使用到 NIO Buffer, 即数据从 Buffer读取到 Channel 中, 并且从 Channel 中写入到 Buffer 中.</p> <p>实际上, 一个 Buffer 其实就是一块内存区域, 我们可以在这个内存区域中进行数据的读写. NIO Buffer 其实是这样的内存块的一个封装, 并提供了一些操作方法让我们能够方便地进行数据的读写.</p> <p>Buffer 类型有:</p> <ul> <li>ByteBuffer</li> <li>CharBuffer</li> <li>DoubleBuffer</li> <li>FloatBuffer</li> <li>IntBuffer</li> <li>LongBuffer</li> <li>ShortBuffer</li> </ul> <p>这些 Buffer 覆盖了能从 IO 中传输的所有的 Java 基本数据类型.</p> <h3><strong>NIO Buffer 的基本使用</strong></h3> <p>使用 NIO Buffer 的步骤如下:</p> <ul> <li>将数据写入到 Buffer 中.</li> <li>调用 Buffer.flip()方法, 将 NIO Buffer 转换为读模式.</li> <li>从 Buffer 中读取数据</li> <li>调用 Buffer.clear() 或 Buffer.compact()方法, 将 Buffer 转换为写模式.</li> </ul> <p>当我们将数据写入到 Buffer 中时, Buffer 会记录我们已经写了多少的数据, 当我们需要从 Buffer 中读取数据时, 必须调用 Buffer.flip()将 Buffer 切换为读模式.</p> <p>一旦读取了所有的 Buffer 数据, 那么我们必须清理 Buffer, 让其从新可写, 清理 Buffer 可以调用 Buffer.clear() 或 Buffer.compact().</p> <p>例如:</p> <pre> <code class="language-erlang">/** * @author xiongyongshun * @Email yongshun1228@gmail.com * @version 1.0 * @created 16/8/1 13:13 */ public class Test { public static void main(String[] args) { IntBuffer intBuffer = IntBuffer.allocate(2); intBuffer.put(12345678); intBuffer.put(2); intBuffer.flip(); System.err.println(intBuffer.get()); System.err.println(intBuffer.get()); } }</code></pre> <p>上述中, 我们分配两个单位大小的 IntBuffer, 因此它可以写入两个 int 值.</p> <p>我们使用 put 方法将 int 值写入, 然后使用 flip 方法将 buffer 转换为读模式, 然后连续使用 get 方法从 buffer 中获取这两个 int 值.</p> <p>每当调用一次 get 方法读取数据时, buffer 的读指针都会向前移动一个单位长度(在这里是一个 int 长度)</p> <h3><strong>Buffer 属性</strong></h3> <p>一个 Buffer 有三个属性:</p> <ul> <li>capacity</li> <li>position</li> <li>limit</li> </ul> <p>其中 <strong>position</strong> 和 <strong>limit</strong> 的含义与 Buffer 处于读模式或写模式有关, 而 capacity 的含义与 Buffer 所处的模式无关.</p> <p><strong>Capacity</strong></p> <p>一个内存块会有一个固定的大小, 即容量(capacity), 我们最多写入 <strong>capacity</strong> 个单位的数据到 Buffer 中, 例如一个 DoubleBuffer, 其 Capacity 是100, 那么我们最多可以写入100个 double 数据.</p> <p><strong>Position</strong></p> <p>当从一个 Buffer 中写入数据时, 我们是从 Buffer 的一个确定的位置(position)开始写入的. 在最初的状态时, position 的值是0. 每当我们写入了一个单位的数据后, position 就会递增一.</p> <p>当我们从 Buffer 中读取数据时, 我们也是从某个特定的位置开始读取的. 当我们调用了 filp()方法将 Buffer 从写模式转换到读模式时, position 的值会自动被设置为0, 每当我们读取一个单位的数据, position 的值递增1.</p> <p>position表示了读写操作的位置指针.</p> <p><strong>limit</strong></p> <p>limit - position 表示此时还可以写入/读取多少单位的数据.<br> 例如在写模式, 如果此时 limit 是10, position 是2, 则表示已经写入了2个单位的数据, 还可以写入 10 - 2 = 8 个单位的数据.</p> <p><strong>例子:</strong></p> <pre> <code class="language-erlang">public class Test { public static void main(String args[]) { IntBuffer intBuffer = IntBuffer.allocate(10); intBuffer.put(10); intBuffer.put(101); System.err.println("Write mode: "); System.err.println("\tCapacity: " + intBuffer.capacity()); System.err.println("\tPosition: " + intBuffer.position()); System.err.println("\tLimit: " + intBuffer.limit()); intBuffer.flip(); System.err.println("Read mode: "); System.err.println("\tCapacity: " + intBuffer.capacity()); System.err.println("\tPosition: " + intBuffer.position()); System.err.println("\tLimit: " + intBuffer.limit()); } }</code></pre> <p>这里我们首先写入两个 int 值, 此时 capacity = 10, position = 2, limit = 10.</p> <p>然后我们调用 flip 转换为读模式, 此时 capacity = 10, position = 0, limit = 2;</p> <h3><strong>分配 Buffer</strong></h3> <p>为了获取一个 Buffer 对象, 我们首先需要分配内存空间. 每个类型的 Buffer 都有一个 allocate()方法, 我们可以通过这个方法分配 Buffer:</p> <pre> <code class="language-erlang">ByteBuffer buf = ByteBuffer.allocate(48);</code></pre> <p>这里我们分配了48 * sizeof(Byte)字节的内存空间.</p> <pre> <code class="language-erlang">CharBuffer buf = CharBuffer.allocate(1024);</code></pre> <p>这里我们分配了大小为1024个字符的 Buffer, 即 这个 Buffer 可以存储1024 个 Char, 其大小为 1024 * 2 个字节.</p> <h3><strong>关于 Direct Buffer 和 Non-Direct Buffer 的区别</strong></h3> <p>Direct Buffer:</p> <ul> <li>所分配的内存不在 JVM 堆上, 不受 GC 的管理.(但是 Direct Buffer 的 Java 对象是由 GC 管理的, 因此当发生 GC, 对象被回收时, Direct Buffer 也会被释放)</li> <li>因为 Direct Buffer 不在 JVM 堆上分配, 因此 Direct Buffer 对应用程序的内存占用的影响就不那么明显(实际上还是占用了这么多内存, 但是 JVM 不好统计到非 JVM 管理的内存.)</li> <li>申请和释放 Direct Buffer 的开销比较大. 因此正确的使用 Direct Buffer 的方式是在初始化时申请一个 Buffer, 然后不断复用此 buffer, 在程序结束后才释放此 buffer.</li> <li>使用 Direct Buffer 时, 当进行一些底层的系统 IO 操作时, 效率会比较高, 因为此时 JVM 不需要拷贝 buffer 中的内存到中间临时缓冲区中.</li> </ul> <p>Non-Direct Buffer:</p> <ul> <li>直接在 JVM 堆上进行内存的分配, 本质上是 byte[] 数组的封装.</li> <li>因为 Non-Direct Buffer 在 JVM 堆中, 因此当进行操作系统底层 IO 操作中时, 会将此 buffer 的内存复制到中间临时缓冲区中. 因此 Non-Direct Buffer 的效率就较低.</li> </ul> <h3><strong>写入数据到 Buffer</strong></h3> <pre> <code class="language-erlang">int bytesRead = inChannel.read(buf); //read into buffer. buf.put(127);</code></pre> <h3><strong>从 Buffer 中读取数据</strong></h3> <pre> <code class="language-erlang">//read from buffer into channel. int bytesWritten = inChannel.write(buf); byte aByte = buf.get();</code></pre> <h3><strong>重置 position</strong></h3> <p>Buffer.rewind()方法可以重置 position 的值为0, 因此我们可以重新读取/写入 Buffer 了.</p> <p>如果是读模式, 则重置的是读模式的 position, 如果是写模式, 则重置的是写模式的 position.</p> <p>例如:</p> <pre> <code class="language-erlang">/** * @author xiongyongshun * @Email yongshun1228@gmail.com * @version 1.0 * @created 16/8/1 13:13 */ public class Test { public static void main(String[] args) { IntBuffer intBuffer = IntBuffer.allocate(2); intBuffer.put(1); intBuffer.put(2); System.err.println("position: " + intBuffer.position()); intBuffer.rewind(); System.err.println("position: " + intBuffer.position()); intBuffer.put(1); intBuffer.put(2); System.err.println("position: " + intBuffer.position()); intBuffer.flip(); System.err.println("position: " + intBuffer.position()); intBuffer.get(); intBuffer.get(); System.err.println("position: " + intBuffer.position()); intBuffer.rewind(); System.err.println("position: " + intBuffer.position()); } }</code></pre> <p>rewind() 主要针对于读模式. 在读模式时, 读取到 limit 后, 可以调用 rewind() 方法, 将读 position 置为0.</p> <h3><strong>关于 mark()和 reset()</strong></h3> <p>我们可以通过调用 Buffer.mark()将当前的 position 的值保存起来, 随后可以通过调用 Buffer.reset()方法将 position 的值回复回来.</p> <p>例如:</p> <pre> <code class="language-erlang">/** * @author xiongyongshun * @Email yongshun1228@gmail.com * @version 1.0 * @created 16/8/1 13:13 */ public class Test { public static void main(String[] args) { IntBuffer intBuffer = IntBuffer.allocate(2); intBuffer.put(1); intBuffer.put(2); intBuffer.flip(); System.err.println(intBuffer.get()); System.err.println("position: " + intBuffer.position()); intBuffer.mark(); System.err.println(intBuffer.get()); System.err.println("position: " + intBuffer.position()); intBuffer.reset(); System.err.println("position: " + intBuffer.position()); System.err.println(intBuffer.get()); } }</code></pre> <p>这里我们写入两个 int 值, 然后首先读取了一个值. 此时读 position 的值为1.</p> <p>接着我们调用 mark() 方法将当前的 position 保存起来(在读模式, 因此保存的是读的 position), 然后再次读取, 此时 position 就是2了.</p> <p>接着使用 reset() 恢复原来的读 position, 因此读 position 就为1, 可以再次读取数据.</p> <h3><strong>flip, rewind 和 clear 的区别</strong></h3> <p><strong>flip</strong></p> <p>方法源码:</p> <pre> <code class="language-erlang">public final Buffer flip() { limit = position; position = 0; mark = -1; return this; }</code></pre> <p>Buffer 的读/写模式共用一个 position 和 limit 变量.</p> <p>当从写模式变为读模式时, 原先的 <strong>写 position</strong> 就变成了读模式的 <strong>limit</strong> .</p> <p><strong>rewind</strong></p> <p>方法源码</p> <pre> <code class="language-erlang">public final Buffer rewind() { position = 0; mark = -1; return this; }</code></pre> <p>rewind, 即倒带, 这个方法仅仅是将 position 置为0.</p> <p><strong>clear</strong></p> <p>方法源码:</p> <pre> <code class="language-erlang">public final Buffer clear() { position = 0; limit = capacity; mark = -1; return this; }</code></pre> <p>根据源码我们可以知道, clear 将 positin 设置为0, 将 limit 设置为 capacity.</p> <p>clear 方法使用场景:</p> <ul> <li>在一个已经写满数据的 buffer 中, 调用 clear, 可以从头读取 buffer 的数据.</li> <li>为了将一个 buffer 填充满数据, 可以调用 clear, 然后一直写入, 直到达到 limit.</li> </ul> <p><strong>例子:</strong></p> <pre> <code class="language-erlang">IntBuffer intBuffer = IntBuffer.allocate(2); intBuffer.flip(); System.err.println("position: " + intBuffer.position()); System.err.println("limit: " + intBuffer.limit()); System.err.println("capacity: " + intBuffer.capacity()); // 这里不能读, 因为 limit == position == 0, 没有数据. //System.err.println(intBuffer.get()); intBuffer.clear(); System.err.println("position: " + intBuffer.position()); System.err.println("limit: " + intBuffer.limit()); System.err.println("capacity: " + intBuffer.capacity()); // 这里可以读取数据了, 因为 clear 后, limit == capacity == 2, position == 0, // 即使我们没有写入任何的数据到 buffer 中. System.err.println(intBuffer.get()); // 读取到0 System.err.println(intBuffer.get()); // 读取到0</code></pre> <h3><strong>Buffer 的比较</strong></h3> <p>我们可以通过 equals() 或 compareTo() 方法比较两个 Buffer, 当且仅当如下条件满足时, 两个 Buffer 是相等的:</p> <ul> <li>两个 Buffer 是相同类型的</li> <li>两个 Buffer 的剩余的数据个数是相同的</li> <li>两个 Buffer 的剩余的数据都是相同的.</li> </ul> <p>通过上述条件我们可以发现, 比较两个 Buffer 时, 并不是 Buffer 中的每个元素都进行比较, 而是比较 Buffer 中剩余的元素.</p> <h2><strong>Selector</strong></h2> <p>Selector 允许一个单一的线程来操作多个 Channel. 如果我们的应用程序中使用了多个 Channel, 那么使用 Selector 很方便的实现这样的目的, 但是因为在一个线程中使用了多个 Channel, 因此也会造成了每个 Channel 传输效率的降低.</p> <p>使用 Selector 的图解如下:</p> <p><img src="https://simg.open-open.com/show/0d4968d8359eaf4cfb3f3f7376dd74a1.png"></p> <p>为了使用 Selector, 我们首先需要将 Channel 注册到 Selector 中, 随后调用 Selector 的 select()方法, 这个方法会阻塞, 直到注册在 Selector 中的 Channel 发送可读写事件. 当这个方法返回后, 当前的这个线程就可以处理 Channel 的事件了.</p> <h3><strong>创建选择器</strong></h3> <p>通过 Selector.open()方法, 我们可以创建一个选择器:</p> <pre> <code class="language-erlang">Selector selector = Selector.open();</code></pre> <h3><strong>将 Channel 注册到选择器中</strong></h3> <p>为了使用选择器管理 Channel, 我们需要将 Channel 注册到选择器中:</p> <pre> <code class="language-erlang">channel.configureBlocking(false); SelectionKey key = channel.register(selector, SelectionKey.OP_READ);</code></pre> <p>注意 , 如果一个 Channel 要注册到 Selector 中, 那么这个 Channel 必须是非阻塞的, 即channel.configureBlocking(false);</p> <p>因为 Channel 必须要是非阻塞的, 因此 FileChannel 是不能够使用选择器的, 因为 FileChannel 都是阻塞的.</p> <p>注意到, 在使用 Channel.register()方法时, 第二个参数指定了我们对 Channel 的什么类型的事件感兴趣, 这些事件有:</p> <ul> <li>Connect, 即连接事件(TCP 连接), 对应于SelectionKey.OP_CONNECT</li> <li>Accept, 即确认事件, 对应于SelectionKey.OP_ACCEPT</li> <li>Read, 即读事件, 对应于SelectionKey.OP_READ, 表示 buffer 可读.</li> <li>Write, 即写事件, 对应于SelectionKey.OP_WRITE, 表示 buffer 可写.</li> </ul> <p>一个 Channel发出一个事件也可以称为** 对于某个事件, Channel 准备好了 . 因此一个 Channel 成功连接到了另一个服务器也可以被称为 connect ready <strong> </strong></p> <p>我们可以使用或运算</p> <p>|**来组合多个事件, 例如:</p> <pre> <code class="language-erlang">int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;</code></pre> <p>注意, 一个 Channel 仅仅可以被注册到一个 Selector 一次, 如果将 Channel 注册到 Selector 多次, 那么其实就是相当于更新 SelectionKey 的 interest set . 例如:</p> <pre> <code class="language-erlang">channel.register(selector, SelectionKey.OP_READ); channel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);</code></pre> <p>上面的 channel 注册到同一个 Selector 两次了, 那么第二次的注册其实就是相当于更新这个 Channel 的 interest set 为 SelectionKey.OP_READ | SelectionKey.OP_WRITE.</p> <h3><strong>关于 SelectionKey</strong></h3> <p>如上所示, 当我们使用 register 注册一个 Channel 时, 会返回一个 SelectionKey 对象, 这个对象包含了如下内容:</p> <ul> <li>interest set, 即我们感兴趣的事件集, 即在调用 register 注册 channel 时所设置的 interest set.</li> <li>ready set</li> <li>channel</li> <li>selector</li> <li>attached object, 可选的附加对象</li> </ul> <p><strong>interest set</strong></p> <p>我们可以通过如下方式获取 interest set:</p> <pre> <code class="language-erlang">int interestSet = selectionKey.interestOps(); boolean isInterestedInAccept = interestSet & SelectionKey.OP_ACCEPT; boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT; boolean isInterestedInRead = interestSet & SelectionKey.OP_READ; boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE;</code></pre> <p><strong>ready set</strong></p> <p>代表了 Channel 所准备好了的操作.</p> <p>我们可以像判断 interest set 一样操作 Ready set, 但是我们还可以使用如下方法进行判断:</p> <pre> <code class="language-erlang">int readySet = selectionKey.readyOps(); selectionKey.isAcceptable(); selectionKey.isConnectable(); selectionKey.isReadable(); selectionKey.isWritable();</code></pre> <p><strong>Channel 和 Selector</strong></p> <p>我们可以通过 SelectionKey 获取相对应的 Channel 和 Selector:</p> <pre> <code class="language-erlang">Channel channel = selectionKey.channel(); Selector selector = selectionKey.selector();</code></pre> <p><strong>Attaching Object</strong></p> <p>我们可以在selectionKey中附加一个对象:</p> <pre> <code class="language-erlang">selectionKey.attach(theObject); Object attachedObj = selectionKey.attachment();</code></pre> <p>或者在注册时直接附加:</p> <pre> <code class="language-erlang">SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);</code></pre> <h3><strong>通过 Selector 选择 Channel</strong></h3> <p>我们可以通过 Selector.select()方法获取对某件事件准备好了的 Channel, 即如果我们在注册 Channel 时, 对其的 <strong>可写</strong> 事件感兴趣, 那么当 select()返回时, 我们就可以获取 Channel 了.</p> <p>注意 , select()方法返回的值表示有多少个 Channel 可操作.</p> <h3><strong>获取可操作的 Channel</strong></h3> <p>如果 select()方法返回值表示有多个 Channel 准备好了, 那么我们可以通过 Selected key set 访问这个 Channel:</p> <pre> <code class="language-erlang">Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> keyIterator = selectedKeys.iterator(); while(keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); if(key.isAcceptable()) { // a connection was accepted by a ServerSocketChannel. } else if (key.isConnectable()) { // a connection was established with a remote server. } else if (key.isReadable()) { // a channel is ready for reading } else if (key.isWritable()) { // a channel is ready for writing } keyIterator.remove(); }</code></pre> <p>注意, 在每次迭代时, 我们都调用 "keyIterator.remove()" 将这个 key 从迭代器中删除, 因为 select() 方法仅仅是简单地将就绪的 IO 操作放到 selectedKeys 集合中, 因此如果我们从 selectedKeys 获取到一个 key, 但是没有将它删除, 那么下一次 select 时, 这个 key 所对应的 IO 事件还在 selectedKeys 中.</p> <p>例如此时我们收到 OP_ACCEPT 通知, 然后我们进行相关处理, 但是并没有将这个 Key 从 SelectedKeys 中删除, 那么下一次 select() 返回时 我们还可以在 SelectedKeys 中获取到 OP_ACCEPT 的 key.</p> <p>注意, 我们可以动态更改 SekectedKeys 中的 key 的 interest set.</p> <p>例如在 OP_ACCEPT 中, 我们可以将 interest set 更新为 OP_READ, 这样 Selector 就会将这个 Channel 的 读 IO 就绪事件包含进来了.</p> <h3><strong>Selector 的基本使用流程</strong></h3> <p>我们再来回顾一下 Java NIO 中的 Selector 的使用流程:</p> <ol> <li>通过 Selector.open() 打开一个 Selector.</li> <li>将 Channel 注册到 Selector 中, 并设置需要监听的事件(interest set)</li> <li>不断重复:</li> </ol> <ul> <li>调用 select() 方法</li> <li>调用 selector.selectedKeys() 获取 selected keys</li> <li>迭代每个 selected key:</li> <li>1) 从 selected key 中获取 对应的 Channel 和附加信息(如果有的话)</li> <li>2) 判断是哪些 IO 事件已经就绪了, 然后处理它们. 如果是 OP_ACCEPT 事件, 则调用 "SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept()" 获取 SocketChannel, 并将它设置为 非阻塞的, 然后将这个 Channel 注册到 Selector 中.</li> <li>3) 根据需要更改 selected key 的监听事件.</li> <li>4) 将已经处理过的 key 从 selected keys 集合中删除.</li> </ul> <h3><strong>关闭 Selector</strong></h3> <p>当调用了 Selector.close()方法时, 我们其实是关闭了 Selector 本身并且将所有的 SelectionKey 失效, 但是并不会关闭 Channel.</p> <h3><strong>完整的 Selector 例子</strong></h3> <p> </p> <p>来自:http://www.cnblogs.com/xys1228/p/6011412.html</p> <p> </p>
本文由用户 gtot2266 自行上传分享,仅供网友学习交流。所有权归原作者,若您的权利被侵害,请联系管理员。
转载本站原创文章,请注明出处,并保留原始链接、图片水印。
本站是一个以用户分享为主的开源技术平台,欢迎各类分享!