| 注册
请输入搜索内容

热门搜索

Java Linux MySQL PHP JavaScript Hibernate jQuery Nginx

Java NIO基本使用

8
Java HTML C/C++ Go 31371 次浏览

NIO是Java提供的非阻塞I/O API.

非阻塞的意义在于可以使用一个线程对大量的数据连接进行处理,非常适用于"短数据长连接"的应用场景,例如即时通讯软件.

在一个阻塞C/S系统中,服务器要为每一个客户连接开启一个线程阻塞等待客户端发送的消息.若使用非阻塞技术,服务器可以使用一个线程对连接进行轮 询,无须阻塞等待.这大大减少了内存资源的浪费,也避免了服务器在客户线程中不断切换带来的CPU消耗,服务器对CPU的有效使用率大大提高.

 

其核心概念包括Channel,Selector,SelectionKey,Buffer.

Channel是I/O通道,可以向其注册Selector,应用成功可以通过select操作获取当前通道已经准备好的可以无阻塞执行的操作.这由SelectionKey表示.

SelectionKey的常量字段SelectionKey.OP_***分别对应Channel的几种操作例如connect(),accept(),read(),write().

 

select操作后得到SelectionKey.OP_WRITE或者READ即可在Channel上面无阻塞调用read和write方 法,Channel的读写操作均需要通过Buffer进行.即读是讲数据从通道中读入Buffer然后做进一步处理.写需要先将数据写入Buffer然后 通道接收Buffer.

 

下面是一个使用NIO的基本C/S示例.该示例只为显示如何使用基本的API而存在,其代码的健壮性,合理性都不具参考价值.

这个示例,实现一个简单的C/S,客户端想服务器端发送消息,服务器将收到的消息打印到控制台.现实的应用中需要定义发送数据使用的协议,以帮助服 务器解析消息.本示例只是无差别的使用默认编码将收到的字节转换字符并打印.通过改变初始分配的ByteBuffer的容量,可以看到打印消息的变化.容 量越小,对一条消息的处理次数就越多,容量大就可以在更少的循环次数内读完整个消息.所以真是的应用场景,要考虑适当的缓存大小以提高效率.

首先是Server

package hadix.demo.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

/**
 * User: hAdIx
 * Date: 11-11-2
 * Time: 上午11:26
 */
public class Server {
    private Selector selector;
    private ByteBuffer readBuffer = ByteBuffer.allocate(8);//调整缓存的大小可以看到打印输出的变化
    private Map<SocketChannel, byte[]> clientMessage = new ConcurrentHashMap<>();

    public void start() throws IOException {
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);
        ssc.bind(new InetSocketAddress("localhost", 8001));
        selector = Selector.open();
        ssc.register(selector, SelectionKey.OP_ACCEPT);
        while (!Thread.currentThread().isInterrupted()) {
            selector.select();
            Set<SelectionKey> keys = selector.selectedKeys();
            Iterator<SelectionKey> keyIterator = keys.iterator();
            while (keyIterator.hasNext()) {
                SelectionKey key = keyIterator.next();
                if (!key.isValid()) {
                    continue;
                }
                if (key.isAcceptable()) {
                    accept(key);
                } else if (key.isReadable()) {
                    read(key);
                }
                keyIterator.remove();
            }
        }
    }

    private void read(SelectionKey key) throws IOException {
        SocketChannel socketChannel = (SocketChannel) key.channel();

        // Clear out our read buffer so it's ready for new data
        this.readBuffer.clear();

        // Attempt to read off the channel
        int numRead;
        try {
            numRead = socketChannel.read(this.readBuffer);
        } catch (IOException e) {
            // The remote forcibly closed the connection, cancel
            // the selection key and close the channel.
            key.cancel();
            socketChannel.close();
            clientMessage.remove(socketChannel);
            return;
        }

        byte[] bytes = clientMessage.get(socketChannel);
        if (bytes == null) {
            bytes = new byte[0];
        }
        if (numRead > 0) {
            byte[] newBytes = new byte[bytes.length + numRead];
            System.arraycopy(bytes, 0, newBytes, 0, bytes.length);
            System.arraycopy(readBuffer.array(), 0, newBytes, bytes.length, numRead);
            clientMessage.put(socketChannel, newBytes);
            System.out.println(new String(newBytes));
        } else {
            String message = new String(bytes);
            System.out.println(message);
        }
    }

    private void accept(SelectionKey key) throws IOException {
        ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
        SocketChannel clientChannel = ssc.accept();
        clientChannel.configureBlocking(false);
        clientChannel.register(selector, SelectionKey.OP_READ);
        System.out.println("a new client connected");
    }


    public static void main(String[] args) throws IOException {
        System.out.println("server started...");
        new Server().start();
    }
}
然后是Client
package hadix.demo.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;

/**
 * User: hAdIx
 * Date: 11-11-2
 * Time: 上午11:26
 */
public class Client {

    public void start() throws IOException {
        SocketChannel sc = SocketChannel.open();
        sc.configureBlocking(false);
        sc.connect(new InetSocketAddress("localhost", 8001));
        Selector selector = Selector.open();
        sc.register(selector, SelectionKey.OP_CONNECT);
        Scanner scanner = new Scanner(System.in);
        while (true) {
            selector.select();
            Set<SelectionKey> keys = selector.selectedKeys();
            System.out.println("keys=" + keys.size());
            Iterator<SelectionKey> keyIterator = keys.iterator();
            while (keyIterator.hasNext()) {
                SelectionKey key = keyIterator.next();
                keyIterator.remove();
                if (key.isConnectable()) {
                    sc.finishConnect();
                    sc.register(selector, SelectionKey.OP_WRITE);
                    System.out.println("server connected...");
                    break;
                } else if (key.isWritable()) {

                    System.out.println("please input message");
                    String message = scanner.nextLine();
                    ByteBuffer writeBuffer = ByteBuffer.wrap(message.getBytes());
                    sc.write(writeBuffer);
                }
            }
        }
    }

    public static void main(String[] args) throws IOException {
        new Client().start();
    }
}

 此外有一个代码写得更好的例子,非常值得参考.http://rox-xmlrpc.sourceforge.net/niotut/index.html

这个例子里面的客户端将消息发送给服务器,服务器收到后立即写回给客户端.例子中代码虽然也没有做有意义的处理,但是其结构比较合理,值得以此为基础进行现实应用的扩展开发.

16个答案

0
0

拜登给乌克兰“打气”后表态不派兵水泵乌克兰国防部等网站遭攻击关闭水泵厂世卫:奥密克戎亚变体传染性增30%水泵公司普京:从乌克兰边境撤出部分部队球阀官方回应女子穿和服被骂滚出大理 焊接球阀 全焊接球阀 直埋全焊接球阀 埋地全焊接球阀  水泵 离心泵 上海离心泵 水泵厂 水泵厂家 泵厂家 帕特 螺杆泵 离心泵厂家 污泥螺杆泵13 上海螺杆泵 球阀厂10 Fully Welded Ball Valve all Welded Ball Valve 全焊接阀门 焊接阀 全焊接球阀 焊接球阀 泵厂家  螺杆泵 帕特

0

给力

0

不错

0

767867689789

0

很好

0

一直没有搞清楚阻塞和非阻塞   同步和异步的区别

0
不错,学习了
0
恩 不错,
0

是真的嘛 非常的给力呀,真的非常的好

0
受教了!

0
不错,学习了
0
io中不是有bufferedReader
0
NIO里主要包括两个方面的内容吧,作者说的是其中的一个,就是在网络连接方面,借鉴了操作系统(类似epoll方式)来注册事件机制; 还有一方面的内容应该是文件I/O吧,原来是只能一个字节一个字节的读(读出来后自己可以做buffer),而NIO中可以批量一次性读出或写入,也是蛮重要的,哈哈。
0
need more solid examples
0
需要深入研究NIO