Java NIO基本使用
NIO是Java提供的非阻塞I/O API.
非阻塞的意义在于可以使用一个线程对大量的数据连接进行处理,非常适用于"短数据长连接"的应用场景,例如即时通讯软件.
在一个阻塞C/S系统中,服务器要为每一个客户连接开启一个线程阻塞等待客户端发送的消息.若使用非阻塞技术,服务器可以使用一个线程对连接进行轮 询,无须阻塞等待.这大大减少了内存资源的浪费,也避免了服务器在客户线程中不断切换带来的CPU消耗,服务器对CPU的有效使用率大大提高.
其核心概念包括Channel,Selector,SelectionKey,Buffer.
Channel是I/O通道,可以向其注册Selector,应用成功可以通过select操作获取当前通道已经准备好的可以无阻塞执行的操作.这由SelectionKey表示.
SelectionKey的常量字段SelectionKey.OP_***分别对应Channel的几种操作例如connect(),accept(),read(),write().
select操作后得到SelectionKey.OP_WRITE或者READ即可在Channel上面无阻塞调用read和write方 法,Channel的读写操作均需要通过Buffer进行.即读是讲数据从通道中读入Buffer然后做进一步处理.写需要先将数据写入Buffer然后 通道接收Buffer.
下面是一个使用NIO的基本C/S示例.该示例只为显示如何使用基本的API而存在,其代码的健壮性,合理性都不具参考价值.
这个示例,实现一个简单的C/S,客户端想服务器端发送消息,服务器将收到的消息打印到控制台.现实的应用中需要定义发送数据使用的协议,以帮助服 务器解析消息.本示例只是无差别的使用默认编码将收到的字节转换字符并打印.通过改变初始分配的ByteBuffer的容量,可以看到打印消息的变化.容 量越小,对一条消息的处理次数就越多,容量大就可以在更少的循环次数内读完整个消息.所以真是的应用场景,要考虑适当的缓存大小以提高效率.
首先是Server
package hadix.demo.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.*; import java.util.concurrent.ConcurrentHashMap; /** * User: hAdIx * Date: 11-11-2 * Time: 上午11:26 */ public class Server { private Selector selector; private ByteBuffer readBuffer = ByteBuffer.allocate(8);//调整缓存的大小可以看到打印输出的变化 private Map<SocketChannel, byte[]> clientMessage = new ConcurrentHashMap<>(); public void start() throws IOException { ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false); ssc.bind(new InetSocketAddress("localhost", 8001)); selector = Selector.open(); ssc.register(selector, SelectionKey.OP_ACCEPT); while (!Thread.currentThread().isInterrupted()) { selector.select(); Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> keyIterator = keys.iterator(); while (keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); if (!key.isValid()) { continue; } if (key.isAcceptable()) { accept(key); } else if (key.isReadable()) { read(key); } keyIterator.remove(); } } } private void read(SelectionKey key) throws IOException { SocketChannel socketChannel = (SocketChannel) key.channel(); // Clear out our read buffer so it's ready for new data this.readBuffer.clear(); // Attempt to read off the channel int numRead; try { numRead = socketChannel.read(this.readBuffer); } catch (IOException e) { // The remote forcibly closed the connection, cancel // the selection key and close the channel. key.cancel(); socketChannel.close(); clientMessage.remove(socketChannel); return; } byte[] bytes = clientMessage.get(socketChannel); if (bytes == null) { bytes = new byte[0]; } if (numRead > 0) { byte[] newBytes = new byte[bytes.length + numRead]; System.arraycopy(bytes, 0, newBytes, 0, bytes.length); System.arraycopy(readBuffer.array(), 0, newBytes, bytes.length, numRead); clientMessage.put(socketChannel, newBytes); System.out.println(new String(newBytes)); } else { String message = new String(bytes); System.out.println(message); } } private void accept(SelectionKey key) throws IOException { ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); SocketChannel clientChannel = ssc.accept(); clientChannel.configureBlocking(false); clientChannel.register(selector, SelectionKey.OP_READ); System.out.println("a new client connected"); } public static void main(String[] args) throws IOException { System.out.println("server started..."); new Server().start(); } }然后是Client
package hadix.demo.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Scanner; import java.util.Set; /** * User: hAdIx * Date: 11-11-2 * Time: 上午11:26 */ public class Client { public void start() throws IOException { SocketChannel sc = SocketChannel.open(); sc.configureBlocking(false); sc.connect(new InetSocketAddress("localhost", 8001)); Selector selector = Selector.open(); sc.register(selector, SelectionKey.OP_CONNECT); Scanner scanner = new Scanner(System.in); while (true) { selector.select(); Set<SelectionKey> keys = selector.selectedKeys(); System.out.println("keys=" + keys.size()); Iterator<SelectionKey> keyIterator = keys.iterator(); while (keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); keyIterator.remove(); if (key.isConnectable()) { sc.finishConnect(); sc.register(selector, SelectionKey.OP_WRITE); System.out.println("server connected..."); break; } else if (key.isWritable()) { System.out.println("please input message"); String message = scanner.nextLine(); ByteBuffer writeBuffer = ByteBuffer.wrap(message.getBytes()); sc.write(writeBuffer); } } } } public static void main(String[] args) throws IOException { new Client().start(); } }
此外有一个代码写得更好的例子,非常值得参考.http://rox-xmlrpc.sourceforge.net/niotut/index.html
这个例子里面的客户端将消息发送给服务器,服务器收到后立即写回给客户端.例子中代码虽然也没有做有意义的处理,但是其结构比较合理,值得以此为基础进行现实应用的扩展开发.