| 注册
请输入搜索内容

热门搜索

Java Linux MySQL PHP JavaScript Hibernate jQuery Nginx
381355763
7年前发布

Java NIO之【Scalable IO in Java】

   <p>看完了并发网的NIO教程,是否有种意犹未尽的感觉。正常情况下,答案应该是肯定的。那我们下面来看下Doug Lea大神写的 Scalable IO in Java ,直接可以下载英文版pdf。这边就当边学习边翻译了。</p>    <h3>网络服务</h3>    <p>大部分网路服务有着相同的体系:</p>    <ul>     <li>读取请求(Read request)</li>     <li>对请求进行解码(Decode request)</li>     <li>处理业务逻辑(Process service)</li>     <li>对返回值进行编码(Encode reply)</li>     <li>发送返回值(Send reply)</li>    </ul>    <p>下面我们来看下传统的设计模型:</p>    <p style="text-align:center"><img src="https://simg.open-open.com/show/5e5d79c891d27dd941609e4cde490568.png"></p>    <p>其中,每一个handler有可能都要新起一个线程去执行。用伪代码模拟如下:</p>    <pre>  <code class="language-java">public classServce{      publicstaticvoidmain(String[] args)throwsIOException{          ServerSocket ss = new ServerSocket(1234);          while (!Thread.interrupted()) {              new Thread(new Handler(ss.accept())).start();          }      }        static classHandlerimplementsRunnable{          final Socket socket;          Handler(Socket socket) {              this.socket = socket;          }            @Override          publicvoidrun(){              try {                  byte[] input = new byte[1024];                  socket.getInputStream().read(input);                  byte[] output = process(input);                  socket.getOutputStream().write(output);              } catch (IOException e) {                  e.printStackTrace();              }          }            private byte[] process(byte[] cmd) {              return null;          }      }  }  </code></pre>    <p>从伪代码可以看出传统I/O模型的雏形,需要为每一个接收到的socket连接新建一个线程去执行具体的业务逻辑。</p>    <h3>可扩展性的目标</h3>    <p>首先,肯定是不满意上面传统的I/O设计模型,才有接下来的讨论。无休止地新建线程去执行具体业务逻辑,最终无疑会拖垮整个系统。当然,也很容易想到,可以用线程池,但是这样虽然可以限制线程数量,但是并发数也因此被限制了,所以并不是解决之道。那我们就来看下可扩展性I/O的目标是什么:</p>    <ul>     <li>高负载情况下的优雅降级</li>     <li>硬件的升级能持续地给系统带来性能提升</li>     <li>当然也包含可用性和性能的目标:低延迟、高负载等</li>    </ul>    <h3>分治法(Divide and Conquer)</h3>    <p>分治法一般是解决可扩展性的最好的途径。将处理流程分成一些小的任务,每一个任务都包含一个非阻塞操作。当任务准备好的时候去执行它。这里,一个I/O事件通常被作为触发器。比如下面:</p>    <p><img src="https://simg.open-open.com/show/e6109984f26dc4f84241ae8d9dbf8341.png"></p>    <p>说实话,上面这一话配上这张图,不是很能理解。被分成的小任务是整个handler,还是比如说read这样一个操作。感觉是把handler拆成一个个小任务,再往下学吧,应该会越来越清晰。</p>    <p>java.nio提供如下基本的机制:</p>    <ul>     <li>非阻塞的读和写</li>     <li>与感兴趣的I/O事件相关联的任务分配机制</li>    </ul>    <h3>事件驱动设计</h3>    <p>一系列事件驱动设计使得无限可能。这种方式通常比其他方案更有效,原因如下:</p>    <ul>     <li>占用资源少:不需要为每个客户端开启一个新线程</li>     <li>开销少:减少上下文切换的开销,减少锁的使用</li>    </ul>    <p>但是,通常也更难编码,原因如下:</p>    <ul>     <li>必须拆分成许多小的非阻塞单元,但是无法消除所有的阻塞动作,比如说GC、页错误等</li>     <li>必须持续追踪服务的逻辑状态</li>    </ul>    <h3>Reactor模式</h3>    <p>Reactor模式有如下几个特征:</p>    <ul>     <li>Reactor通过调度相应的处理程序来相应I/O事件</li>     <li>处理程序执行非阻塞操作</li>     <li>通过绑定处理程序来管理事件。</li>    </ul>    <p>我们先来看下单个线程版本的模型图:</p>    <p style="text-align:center"><img src="https://simg.open-open.com/show/7222ffd6927a5f78febaee6a140bd0e4.png"></p>    <p>java.nio中的Channel、Buffer、Selector、SelectionKey类可以支持该模型。上图如果第一眼不能很好地理解的话,先来看下代码,涉及到两个类。</p>    <pre>  <code class="language-java">public classReactorimplementsRunnable{      final Selector selector;      final ServerSocketChannel serverSocketChannel;        Reactor(int port) throws IOException {          // 初始化ServerSocketChannel,以非阻塞模式运行          serverSocketChannel = ServerSocketChannel.open();          serverSocketChannel.bind(new InetSocketAddress(port));          serverSocketChannel.configureBlocking(false);            // 初始化Selector          selector = Selector.open();          // 将ServerSocketChannel注册到Selector上          SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);          // 在SelectionKey上绑定一个附属对象Acceptor          selectionKey.attach(new Acceptor());      }        @Override      publicvoidrun(){          try {              while (!Thread.interrupted()) {                  // 阻塞直至事件就绪                  selector.select();                  Set selected = selector.selectedKeys();                  Iterator it = selected.iterator();                  while (it.hasNext()) {                      // 分发                      dispatch((SelectionKey)(it.next()));                  }                  // 需要自己清除selectedKeys                  selected.clear();              }          }catch (IOException ex) {            }      }        voiddispatch(SelectionKey k){          /**           * 获取SelectionKey中的attachment,并执行该attachment的run()方法           * 拿第一个到达的socket连接来看,该attachment为一个Acceptor实例           */          Runnable r = (Runnable)(k.attachment());          if (r != null) {              r.run();          }      }        classAcceptorimplementsRunnable{          publicvoidrun(){              try {                  // 获取新连接的SocketChannel                  SocketChannel socketChannel = serverSocketChannel.accept();                  if (socketChannel != null) {                      // 具体的处理逻辑                      new Handler(selector, socketChannel);                  }              } catch (IOException ex) {                }          }      }  }  </code></pre>    <pre>  <code class="language-java">public classHandlerimplementsRunnable{      final SocketChannel socket;      final SelectionKey sk;      ByteBuffer input = ByteBuffer.allocate(1024);      ByteBuffer output = ByteBuffer.allocate(1024);      static final int READING = 0, SENDING = 1;      int state = READING;        Handler(Selector sel, SocketChannel c) throws IOException {          socket = c;          socket.configureBlocking(false);          // 继续在Selector上注册读事件,此时attachment为当前Handler实例          sk = socket.register(sel, SelectionKey.OP_READ, this);          // 使选择器上的第一个还没有返回的选择操作立即返回          sel.wakeup();      }        booleaninputIsComplete(){          return true;      }      booleanoutputIsComplete(){          return true;      }      voidprocess(){          System.out.println("Handle processing...");      }        @Override          publicvoidrun(){              try {                  if (state == READING) {                      read();                  } else if (state == SENDING) {                      send();                  }              } catch (IOException ex) { /* ... */ }          }        voidread()throwsIOException{          System.out.println("Handle reading...");          socket.read(input);          if (inputIsComplete()) {              process();              state = SENDING;              // 在SelectionKey上注册写事件              sk.interestOps(SelectionKey.OP_WRITE);          }      }      voidsend()throwsIOException{          System.out.println("Handle writing...");          socket.write(output);          if (outputIsComplete()) {              //write完就结束了, 关闭select key              sk.cancel();          }      }  }  </code></pre>    <p>结合模型图和代码,直观的感受是单个线程可以同时处理多个客户端请求了。下面列举下Reactor模式的一些概念:</p>    <ul>     <li>Reactor:负责响应I/O事件,当检测到一个新的事件,将其发送给相应的Handler去处理</li>     <li>Handler:负责处理非阻塞的行为,同时将handler与事件绑定</li>    </ul>    <p>Reactor为单个线程,需要处理accept连接,同时发送请求到处理器中。由于只有单个线程,所以handler中的业务需要能够快速处理完。当然,还能再改进,可以将具体的业务逻辑放到单独的线程池中去跑,这儿就不实习了。同时,NIO暂时也就看到这里,主要是了解下相关知识,为下面学习Netty做个准备。</p>    <p> </p>    <p>来自:http://bboyjing.github.io/2017/04/05/Java-NIO之【Scalable-IO-in-Java】/</p>    <p> </p>    
 本文由用户 381355763 自行上传分享,仅供网友学习交流。所有权归原作者,若您的权利被侵害,请联系管理员。
 转载本站原创文章,请注明出处,并保留原始链接、图片水印。
 本站是一个以用户分享为主的开源技术平台,欢迎各类分享!
 本文地址:https://www.open-open.com/lib/view/open1491448603429.html
Java IO NIO Java开发