NIO之Selector选择器

Selector选择器

旧版本中socket相关的api中对于每个连接都需要有一个线程来控制,此时IO操作是阻塞的。

传统io的操作

为了解决上面的问题,我们通过Selector可以管理多个channel的IO状态,一个线程可以管理一个Selector,这样看来一个线程就可以管理多个channel了,减少了多个线程之间上下文切换的开销。此时的IO就不是阻塞的了,Selector是非阻塞的核心部分。多个Channel注册到1个selector上面,当有事件发生之后,selector会获取并进行处理。

Selector和Channel的基本使用

我们可以利用Selector中的open方法创建一个Selector对象

Selector selector = Selector.open();

当需要关闭Selector的时候,调用其close方法即可。

Selector.close();

将Channel注册到Selector示例代码,需要注意的是要讲Channel设置为非阻塞,否则会抛IllegalBlockingModeException异常。并不是所有的Channel都能注册到Selector,只有是SelectableChannel的子类才可以。

//获取Selector对象
Selector selector = Selector.open();
//获取通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//设置为非阻塞
serverSocketChannel.configureBlocking(false);
//绑定连接
serverSocketChannel.bind(new InetSocketAddress(8888));
//将Channel注册到Selector上
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

上面注册register方法中的第二个参数表示要监听的事件:

  • 读 : SelectionKey.OP_READ (1)
  • 写 : SelectionKey.OP_WRITE (4)
  • 连接 : SelectionKey.OP_CONNECT (8)
  • 接收 : SelectionKey.OP_ACCEPT (16)

如果要注册多个事件,可以使用位或运算符连接

serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT|SelectionKey.OP_CONNECT);

将Channel注册到Selector上之后,我们可以通过Selector的select()方法,查询出已经就绪的Channel操作,这些就绪的状态放到了Set集合中,利用Selector中的selectedKeys()方法可以获取该集合。

//获取状态set集合
Set selectedKeys = selector.selectedKeys();

Iterator keyIterator = selectedKeys.iterator();
//遍历集合
while(keyIterator.hasNext()) {

    SelectionKey key = keyIterator.next();
	//判断状态
    if(key.isAcceptable()) {
		//接收
    } else if (key.isConnectable()) {
		//连接
    } else if (key.isReadable()) {
		//读
    } else if (key.isWritable()) {
		//写
    }
	
    //处理完成之后要移除,防止下次循环时重复处理。
    keyIterator.remove();

}

案例:聊天系统

下面利用NIO的一些类实现简易的聊天系统,创建Server端:

package com.monkey1024;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

/**
 * 服务端
 */
public class Server {
    public static void main(String[] args) {
        //获取Channel
        try (ServerSocketChannel serverSocketChannel = ServerSocketChannel.open()) {
            //设置为非阻塞
            serverSocketChannel.configureBlocking(false);

            serverSocketChannel.bind(new InetSocketAddress(8888));
            //获取Selector
            Selector selector = Selector.open();

            //将channel注册到selector上, 传入要监听的事件
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

            //获取迭代器
            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();

            //判断是否有Channel就绪
            while (selector.select() > 0) {
                while (iter.hasNext()) {
                    SelectionKey selectionKey = iter.next();
                    //判断是否是可接收的
                    if (selectionKey.isAcceptable()) {
                        //获取客户端连接
                        SocketChannel socketChannel = serverSocketChannel.accept();
                        //设置为非阻塞模式
                        socketChannel.configureBlocking(false);
                        //将该通道注册到选择器上
                        socketChannel.register(selector, SelectionKey.OP_READ);
                    } else if (selectionKey.isReadable()) {
                        //判断是否是可读的
                        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();

                        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                        int length;
                        while ((length = socketChannel.read(byteBuffer)) > 0) {
                            byteBuffer.flip();
                            System.out.println(new String(byteBuffer.array(), 0, length));
                            byteBuffer.clear();
                        }
                    }
                    //移除,防止重复处理
                    iter.remove();
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

创建Client端

package com.monkey1024;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Scanner;

/**
  客户端
 */
public class Client {

   public static void main(String[] args) {
      //获取通道
      try (SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 8888))) {
         //设置非阻塞
         socketChannel.configureBlocking(false);

         ByteBuffer buf = ByteBuffer.allocate(1024);

         Scanner scan = new Scanner(System.in);
         //获取用户输入的内容
         while (scan.hasNext()) {
            String input = scan.nextLine();
            buf.put(input.getBytes());
            buf.flip();
            socketChannel.write(buf);
            buf.clear();
         }

      } catch (Exception e) {
         e.printStackTrace();
      }
   }
}