| 注册
请输入搜索内容

热门搜索

Java Linux MySQL PHP JavaScript Hibernate jQuery Nginx
dwd4
10年前发布

通过JAVA NIO实现Socket服务器与客户端功能

package niocommunicate;     import java.io.IOException;  import java.net.InetSocketAddress;  import java.nio.ByteBuffer;  import java.nio.channels.CancelledKeyException;  import java.nio.channels.SelectionKey;  import java.nio.channels.Selector;  import java.nio.channels.ServerSocketChannel;  import java.nio.channels.SocketChannel;  import java.util.Arrays;  import java.util.Iterator;  import java.util.Map;  import java.util.concurrent.ArrayBlockingQueue;  import java.util.concurrent.ConcurrentHashMap;  import java.util.concurrent.ThreadPoolExecutor;  import java.util.concurrent.TimeUnit;     public class Server {         private Selector selector = getSelector();      private ServerSocketChannel ss = null;      private static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(10, 10, 500, TimeUnit.MILLISECONDS,              new ArrayBlockingQueue<Runnable>(20));         private static Map<Integer, SelectionKey> selectionKeyMap = new ConcurrentHashMap<>();         public Selector getSelector() {          try {              return Selector.open();          } catch (IOException e) {              e.printStackTrace();          }          return null;      }         /**       * 创建非阻塞服务器绑定5555端口       */      public Server() {          try {              ss = ServerSocketChannel.open();              ss.bind(new InetSocketAddress(5555));              ss.configureBlocking(false);              if (selector == null) {                  selector = Selector.open();              }              ss.register(selector, SelectionKey.OP_ACCEPT);          } catch (Exception e) {              e.printStackTrace();              close();          }      }         /**       * 关闭服务器       */      private void close() {          threadPool.shutdown();          try {              if (ss != null) {                  ss.close();              }              if (selector != null) {                  selector.close();              }          } catch (IOException e) {              e.printStackTrace();          }      }         /**       * 启动选择器监听客户端事件       */      private void start() {          threadPool.execute(new Runnable() {                 @Override              public void run() {                  try {                      while (true) {                          if (selector.select(10) == 0) {                              continue;                          }                          Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();                          while (iterator.hasNext()) {                              SelectionKey selectedKey = iterator.next();                              iterator.remove();                              try {                                  if (selectedKey.isReadable()) {                                         if (selectionKeyMap.get(selectedKey.hashCode()) != selectedKey) {                                          selectionKeyMap.put(selectedKey.hashCode(), selectedKey);                                          threadPool.execute(new ReadClientSocketHandler(selectedKey));                                      }                                     } else if (selectedKey.isWritable()) {                                      Object responseMessage = selectedKey.attachment();                                      SocketChannel serverSocketChannel = (SocketChannel) selectedKey.channel();                                      selectedKey.interestOps(SelectionKey.OP_READ);                                      if (responseMessage != null) {                                          threadPool.execute(new WriteClientSocketHandler(serverSocketChannel,                                                  responseMessage));                                      }                                  } else if (selectedKey.isAcceptable()) {                                      ServerSocketChannel ssc = (ServerSocketChannel) selectedKey.channel();                                      SocketChannel clientSocket = ssc.accept();                                      if (clientSocket != null) {                                          clientSocket.configureBlocking(false);                                          clientSocket.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);                                      }                                  }                              } catch (CancelledKeyException cc) {                                  selectedKey.cancel();                                  selectionKeyMap.remove(selectedKey.hashCode());                              }                          }                         }                  } catch (Exception e) {                      e.printStackTrace();                      close();                  }              }             });      }         /**       * 响应数据给客户端线程       * @author haoguo       *       */      private class WriteClientSocketHandler implements Runnable {          SocketChannel client;          Object respnoseMessage;             WriteClientSocketHandler(SocketChannel client, Object respnoseMessage) {              this.client = client;              this.respnoseMessage = respnoseMessage;          }             @Override          public void run() {              byte[] responseByteData = null;              String logResponseString = "";              if (respnoseMessage instanceof byte[]) {                  responseByteData = (byte[]) respnoseMessage;                  logResponseString = new String(responseByteData);              } else if (respnoseMessage instanceof String) {                  logResponseString = (String) respnoseMessage;                  responseByteData = logResponseString.getBytes();              }              if (responseByteData == null || responseByteData.length == 0) {                  System.out.println("响应的数据为空");                  return;              }              try {                  client.write(ByteBuffer.wrap(responseByteData));                  System.out.println("server响应客户端[" + client.keyFor(selector).hashCode() + "]数据 :[" + logResponseString                          + "]");              } catch (IOException e) {                  e.printStackTrace();                  try {                      client.close();                  } catch (IOException e1) {                         e1.printStackTrace();                  }              }          }      }         /**       * 读客户端发送数据线程       * @author haoguo       *       */      private class ReadClientSocketHandler implements Runnable {          private SocketChannel client;          private ByteBuffer tmp = ByteBuffer.allocate(1024);          private SelectionKey selectionKey;             ReadClientSocketHandler(SelectionKey selectionKey) {              this.selectionKey = selectionKey;              this.client = (SocketChannel) selectionKey.channel();          }             @Override          public void run() {              try {                  tmp.clear();                  byte[] data = new byte[0];                  int len = -1;                  while ((len = client.read(tmp)) > 0) {                      data = Arrays.copyOf(data, data.length + len);                      System.arraycopy(tmp.array(), 0, data, data.length - len, len);                      tmp.rewind();                  }                  if (data.length == 0) {                      return;                  }                  System.out.println("接收到客户端[" + client.keyFor(selector).hashCode() + "]数据 :[" + new String(data) + "]");                  // dosomthing                  byte[] response = "response".getBytes();                  client.register(selector, SelectionKey.OP_WRITE, response);              } catch (IOException e) {                     System.out.println("客户端[" + selectionKey.hashCode() + "]关闭了连接");                  try {                      SelectionKey selectionKey = client.keyFor(selector);                      selectionKey.cancel();                      client.close();                  } catch (IOException e1) {                         e1.printStackTrace();                  }              } finally {                  selectionKeyMap.remove(selectionKey.hashCode());              }          }      }         public static void main(String[] args) {          Server server = new Server();          server.start();      }  }

package niocommunicate;     import java.io.IOException;  import java.net.InetAddress;  import java.net.InetSocketAddress;  import java.nio.ByteBuffer;  import java.nio.channels.ClosedChannelException;  import java.nio.channels.SelectionKey;  import java.nio.channels.Selector;  import java.nio.channels.SocketChannel;  import java.util.Arrays;  import java.util.Iterator;  import java.util.LinkedList;  import java.util.List;     public class Client {      SocketChannel client;      Selector selctor = getSelector();         private volatile boolean run = true;         private List<Object> messageQueue = new LinkedList<>();         public Selector getSelector() {          try {              return Selector.open();          } catch (IOException e) {              e.printStackTrace();          }          return null;      }         public Client() {          try {              client = SocketChannel.open();              client.configureBlocking(false);              client.connect(new InetSocketAddress(InetAddress.getLocalHost(), 5555));              client.register(selctor, SelectionKey.OP_CONNECT);          } catch (IOException e) {              e.printStackTrace();          }          new Thread(new Runnable() {                 @Override              public void run() {                  while (run) {                      try {                          if (selctor.select(20) == 0) {                              continue;                          }                          Iterator<SelectionKey> iterator = selctor.selectedKeys().iterator();                          while (iterator.hasNext()) {                              SelectionKey selectionKey = iterator.next();                              iterator.remove();                              if (selectionKey.isConnectable()) {                                  SocketChannel sc = (SocketChannel) selectionKey.channel();                                  sc.finishConnect();                                  sc.register(selctor, SelectionKey.OP_READ);                              } else if (selectionKey.isWritable()) {                                  selectionKey.interestOps(SelectionKey.OP_READ);                                  Object requestMessage = selectionKey.attachment();                                  SocketChannel writeSocketChannel = (SocketChannel) selectionKey.channel();                                  byte[] requestByteData = null;                                  if (requestMessage instanceof byte[]) {                                      requestByteData = (byte[]) requestMessage;                                  } else if (requestMessage instanceof String) {                                      requestByteData = ((String) requestMessage).getBytes();                                      System.out.println("client send Message:[" + requestMessage + "]");                                  } else {                                      System.out.println("unsupport send Message Type" + requestMessage.getClass());                                  }                                  System.out.println("requestMessage:" + requestMessage);                                  if (requestByteData != null && requestByteData.length > 0) {                                      try {                                          writeSocketChannel.write(ByteBuffer.wrap(requestByteData));                                      } catch (IOException e) {                                          e.printStackTrace();                                      }                                  }                              } else if (selectionKey.isReadable()) {                                  SocketChannel readSocketChannel = (SocketChannel) selectionKey.channel();                                  ByteBuffer tmp = ByteBuffer.allocate(1024);                                  int len = -1;                                  byte[] data = new byte[0];                                  if ((len = readSocketChannel.read(tmp)) > 0) {                                      data = Arrays.copyOf(data, data.length + len);                                      System.arraycopy(tmp.array(), 0, data, data.length - len, len);                                      tmp.rewind();                                  }                                  if (data.length > 0) {                                      System.out.println("客户端接收到数据:[" + new String(data) + "]");                                  }                              }                          }                      } catch (IOException e1) {                          e1.printStackTrace();                          close();                      }                  }              }          }).start();          try {              Thread.sleep(200);          } catch (InterruptedException e) {                 e.printStackTrace();          }      }         public void close() {          try {              SelectionKey selectionKey = client.keyFor(selctor);              selectionKey.cancel();              client.close();              run = false;          } catch (IOException e) {                 e.printStackTrace();          }      }         public void writeData(String data) {          messageQueue.add(data);          while (messageQueue.size() > 0) {              Object firstSendData = messageQueue.remove(0);              try {                  client.register(selctor, SelectionKey.OP_WRITE, firstSendData);              } catch (ClosedChannelException e) {                  e.printStackTrace();              }              try {                  Thread.sleep(40);              } catch (InterruptedException e) {                  e.printStackTrace();              }          }      }         public static void main(String[] args) {             Client client = new Client();          long t1 = System.currentTimeMillis();          for (int i = 10; i < 200; i++) {              client.writeData(i + "nimddddddddddsssssssssssssssssssssssssssssssssssscccccccccccccccccccccccc"                      + "ccccccccccccccccccccccccccccccccccccccccccccccccccccccccdddddddddddd"                      + "dddddddddddddddddwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwaaaaaaaaaaaaaa"                      + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaddddddddddddddddddddddddddddddd"                      + "ddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddrrrr"                      + "jjjjjjjjjjjjjjjjjjjjjjjjjjjjrrrrrrrrrrrrrrrrrrrrrrrrrrrkkkkkkkkkkkkkkkkkkkk"                      + "kkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkjjjjkkkkkklllllllllllllllllllllllllll"                      + "lllllllldddddddddddddmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmddaei"                      + "nimddddddddddsssssssssssssssssssssssssssssssssssscccccccccccccccccccccccc"                      + "ccccccccccccccccccccccccccccccccccccccccccccccccccccccccdddddddddddd"                      + "dddddddddddddddddwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwaaaaaaaaaaaaaa"                      + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaddddddddddddddddddddddddddddddd"                      + "ddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddrrrr"                      + "jjjjjjjjjjjjjjjjjjjjjjjjjjjjrrrrrrrrrrrrrrrrrrrrrrrrrrrkkkkkkkkkkkkkkkkkkkk"                      + "kkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkjjjjkkkkkklllllllllllllllllllllllllll"                      + "lllllllldddddddddddddmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmddaei" + i);          }          long t2 = System.currentTimeMillis();          System.out.println("总共耗时:" + (t2 - t1) + "ms");          client.close();      }  }

package niocommunicate;     import java.io.IOException;  import java.net.InetSocketAddress;  import java.nio.ByteBuffer;  import java.nio.channels.CancelledKeyException;  import java.nio.channels.SelectionKey;  import java.nio.channels.Selector;  import java.nio.channels.ServerSocketChannel;  import java.nio.channels.SocketChannel;  import java.util.Arrays;  import java.util.Iterator;  import java.util.LinkedList;  import java.util.List;  import java.util.Map;  import java.util.concurrent.ArrayBlockingQueue;  import java.util.concurrent.ConcurrentHashMap;  import java.util.concurrent.ThreadPoolExecutor;  import java.util.concurrent.TimeUnit;     public class Server {         private Selector selector = getSelector();      private ServerSocketChannel ss = null;      private ThreadPoolExecutor threadPool = new ThreadPoolExecutor(10, 10, 500, TimeUnit.MILLISECONDS,              new ArrayBlockingQueue<Runnable>(20));         private Map<Integer, SelectionKey> selectionKeyMap = new ConcurrentHashMap<>();      private Map<Integer, List<Object>> responseMessageQueue = new ConcurrentHashMap<>();      private volatile boolean run = true;      private volatile boolean isClose = false;         public Selector getSelector() {          try {              return Selector.open();          } catch (IOException e) {              e.printStackTrace();          }          return null;      }         /**       * 创建非阻塞服务器绑定5555端口       */      public Server() {          try {              ss = ServerSocketChannel.open();              ss.bind(new InetSocketAddress(5555));              ss.configureBlocking(false);              if (selector == null) {                  selector = Selector.open();              }              ss.register(selector, SelectionKey.OP_ACCEPT);          } catch (Exception e) {              e.printStackTrace();              close();          }      }         public boolean isClose() {          return isClose;      }         /**       * 关闭服务器       */      private void close() {          run = false;          isClose = true;          threadPool.shutdown();          try {              if (ss != null) {                  ss.close();              }              if (selector != null) {                  selector.close();              }          } catch (IOException e) {              e.printStackTrace();          }      }         /**       * 启动选择器监听客户端事件       */      private void start() {          threadPool.execute(new Runnable() {                 @Override              public void run() {                  try {                      while (run) {                          if (selector.select(10) == 0) {                              continue;                          }                          Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();                          while (iterator.hasNext()) {                              SelectionKey selectedKey = iterator.next();                              iterator.remove();                              try {                                  if (selectedKey.isReadable()) {                                         if (selectionKeyMap.get(selectedKey.hashCode()) != selectedKey) {                                          selectionKeyMap.put(selectedKey.hashCode(), selectedKey);                                          threadPool.execute(new ReadClientSocketHandler(selectedKey));                                      }                                     } else if (selectedKey.isWritable()) {                                      SocketChannel serverSocketChannel = (SocketChannel) selectedKey.channel();                                      selectedKey.interestOps(SelectionKey.OP_READ);                                      List<Object> list = responseMessageQueue.get(selectedKey.hashCode());                                      if (list == null) {                                          list = new LinkedList<Object>();                                          responseMessageQueue.put(selectedKey.hashCode(), list);                                      }                                      while (list.size() > 0) {                                          Object responseMessage = list.remove(0);                                          if (responseMessage != null) {                                              threadPool.execute(new WriteClientSocketHandler(serverSocketChannel,                                                      responseMessage));                                          }                                      }                                  } else if (selectedKey.isAcceptable()) {                                      ServerSocketChannel ssc = (ServerSocketChannel) selectedKey.channel();                                      SocketChannel clientSocket = ssc.accept();                                      if (clientSocket != null) {                                          clientSocket.configureBlocking(false);                                          clientSocket.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);                                      }                                  }                              } catch (CancelledKeyException cc) {                                  selectedKey.cancel();                                  int hashCode = selectedKey.hashCode();                                  selectionKeyMap.remove(hashCode);                                  responseMessageQueue.remove(hashCode);                              }                          }                         }                  } catch (Exception e) {                      e.printStackTrace();                      close();                  }              }             });      }         /**       * 响应数据给客户端线程       *       * @author haoguo       *       */      private class WriteClientSocketHandler implements Runnable {          SocketChannel client;          Object respnoseMessage;             WriteClientSocketHandler(SocketChannel client, Object respnoseMessage) {              this.client = client;              this.respnoseMessage = respnoseMessage;          }             @Override          public void run() {              byte[] responseByteData = null;              String logResponseString = "";              if (respnoseMessage instanceof byte[]) {                  responseByteData = (byte[]) respnoseMessage;                  logResponseString = new String(responseByteData);              } else if (respnoseMessage instanceof String) {                  logResponseString = (String) respnoseMessage;                  responseByteData = logResponseString.getBytes();              }              if (responseByteData == null || responseByteData.length == 0) {                  System.out.println("响应的数据为空");                  return;              }              try {                  client.write(ByteBuffer.wrap(responseByteData));                  System.out.println("server响应客户端[" + client.keyFor(selector).hashCode() + "]数据 :[" + logResponseString                          + "]");              } catch (IOException e) {                  e.printStackTrace();                  try {                      SelectionKey selectionKey = client.keyFor(selector);                      if (selectionKey != null) {                          selectionKey.cancel();                          int hashCode = selectionKey.hashCode();                          responseMessageQueue.remove(hashCode);                      }                      if (client != null) {                          client.close();                      }                  } catch (IOException e1) {                      e1.printStackTrace();                  }              }          }      }         /**       * 读客户端发送数据线程       *       * @author haoguo       *       */      private class ReadClientSocketHandler implements Runnable {          private SocketChannel client;          private ByteBuffer tmp = ByteBuffer.allocate(1024);          private SelectionKey selectionKey;          int hashCode;             ReadClientSocketHandler(SelectionKey selectionKey) {              this.selectionKey = selectionKey;              this.client = (SocketChannel) selectionKey.channel();              this.hashCode = selectionKey.hashCode();          }             @Override          public void run() {              try {                  tmp.clear();                  byte[] data = new byte[0];                  int len = -1;                  while ((len = client.read(tmp)) > 0) {                      data = Arrays.copyOf(data, data.length + len);                      System.arraycopy(tmp.array(), 0, data, data.length - len, len);                      tmp.rewind();                  }                  if (data.length == 0) {                      return;                  }                  String readData = new String(data);                  System.out.println("接收到客户端[" + hashCode + "]数据 :[" + readData.substring(0, 3) + "]");                  // dosomthing                  byte[] response = ("response" + readData.substring(0, 3)).getBytes();                  List<Object> list = responseMessageQueue.get(hashCode);                  list.add(response);                  client.register(selector, SelectionKey.OP_WRITE);                  // client.register(selector, SelectionKey.OP_WRITE, response);              } catch (IOException e) {                  System.out.println("客户端[" + selectionKey.hashCode() + "]关闭了连接");                  try {                      SelectionKey selectionKey = client.keyFor(selector);                      if (selectionKey != null) {                          selectionKey.cancel();                      }                      if (client != null) {                          client.close();                      }                  } catch (IOException e1) {                      e1.printStackTrace();                  }              } finally {                  selectionKeyMap.remove(hashCode);              }          }      }         public static void main(String[] args) {          Server server = new Server();          server.start();      }  }