Java NIO 知识总结

概念

NIO(Non-blocking I/O,在Java领域,也称为New I/O),是一种同步非阻塞的 I/O 模型,也是 I/O 多路复用的基础,已经被越来越多地应用到大型应用服务器,成为解决高并发与大量连接、I/O 处理问题的有效方式。

通道 (Channel)

通道 Channel 是原 I/O 包中的流,可以通过它读取和写入数据。通道与流的不同之处在于:

  1. 流只能在一个方向上移动(一个流必须是 InputStream 或者 OutputStream 的子类),而通道是双向的,可以用于读、写或者同时用于读写;
  2. Channel 可以异步读写,Stream则是阻塞地同步读写;
  3. Channel 总是从 Buffer 读取数据,或将数据写入到 Buffer 中。

Channel 接口源码如下:

1
2
3
4
public interface Channel extends Closeable {
public boolean isOpen();
public void close() throws IOException;
}

基本的 Channel 示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static void test2() throws IOException{

//获取文件
RandomAccessFile file=new RandomAccessFile("C:\\WY\\hello.txt","rw");
FileChannel fileChannel=file.getChannel();
//获取Buffer
ByteBuffer byteBuffer=ByteBuffer.allocate(48);
int read=fileChannel.read(byteBuffer);
//读取过程
while (read!=-1){
byteBuffer.flip() ;
while (byteBuffer.hasRemaining()){
System.out.print((char) byteBuffer.get());
}
byteBuffer.clear();
read=fileChannel.read(byteBuffer);
}

fileChannel.close();
}

Channel有四种分类,如下:

  • FileChannel:从文件中读写数据
  • SocketChannel:通过TCP读写网络中的数据
  • DatagramChannel:通过UDP读写网络中的数据
  • ServerSocketChannel:监听新进来的TCP连接,对每一个新的连接都会创建一个SocketChannel

FileChannel

FileChannel 是操作文件的 Channel,我们可以通过 FileChannel 从一个文件中读取数据,也可以将数据写入到文件中。
注意,FileChannel 只能为阻塞模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
//    写入文件
public static void test1() throws IOException{
// RandomAccessFile file=new RandomAccessFile("C:\\WY\\hello.txt","rw");
File file1=new File("C:\\WY\\hello.txt");
FileOutputStream outputStream=new FileOutputStream(file1);
FileChannel fileChannel=outputStream.getChannel();

String string="Hello world! ++++++++++++++++++"+System.currentTimeMillis();
// 操作 position
// long pos=fileChannel.position();
// fileChannel.position(pos+12);

ByteBuffer byteBuffer=ByteBuffer.allocate(48);
byteBuffer.put(string.getBytes());

byteBuffer.flip();

while (byteBuffer.hasRemaining()){
fileChannel.write(byteBuffer);
}
}

// 读取文件
public static void test2() throws IOException{

//获取文件
RandomAccessFile file=new RandomAccessFile("C:\\WY\\hello.txt","rw");
FileChannel fileChannel=file.getChannel();
//获取Buffer
ByteBuffer byteBuffer=ByteBuffer.allocate(48);
int read=fileChannel.read(byteBuffer);
//读取过程
while (read!=-1){
byteBuffer.flip() ;
while (byteBuffer.hasRemaining()){
System.out.print((char) byteBuffer.get());
}
byteBuffer.clear();
read=fileChannel.read(byteBuffer);
}

fileChannel.close();
}
}

SocketChannel

SocketChannel 是一个客户端用来进行 TCP 连接的 Channel。
创建一个 SocketChannel 的方法有两种:

  1. 打开一个 SocketChannel,然后将其连接到某个服务器中;
  2. 当一个 ServerSocketChannel 接受到连接请求时,会返回一个 SocketChannel 对象。

可以设置 SocketChannel 为 非阻塞模式,这样我们的 connectreadwrite 为非阻塞。

1
2
3
4
5
6
socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress("http://example.com", 80));

while(! socketChannel.finishConnect() ){
//wait, or do something else...
}

在非阻塞模式中,或许连接还没有建立,connect 方法就返回了,因此需要检查当前是否是连接到了主机,因此通过一个 while 循环来判断。

ServerSocketChannel

ServerSocketChannel 顾名思义,是用在服务器为端的,可以监听客户端的 TCP 连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public static void test4() throws IOException{
//打开ServerSocketChannel
ServerSocketChannel serverSocketChannel=ServerSocketChannel.open();
// 绑定端口
serverSocketChannel.socket().bind(new InetSocketAddress(8080));
// 设置非阻塞
serverSocketChannel.configureBlocking(false);
try {
while (true){
// 非阻塞模式下,没有连接时返回null
SocketChannel socketChannel=serverSocketChannel.accept();
if(socketChannel != null){
//do something with socketChannel...
}
}
}catch (IOException e){
e.printStackTrace();
}finally {
serverSocketChannel.close();
}
}

DatagramChannel

DatagramChannel 是用来处理 UDP 连接的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public static void test5() throws IOException{
// 打开DatagramChannel
DatagramChannel datagramChannel=DatagramChannel.open();

//因为 UDP 是非连接的, 因此这个的 connect 并不是向 TCP 一样真正意义上的连接,
// 而是它会将 DatagramChannel 锁住, 因此我们仅仅可以从指定的地址中读取或写入数据
datagramChannel.socket().bind(new InetSocketAddress(9999));

// 读取数据
ByteBuffer byteBuffer=ByteBuffer.allocate(48);
datagramChannel.receive(byteBuffer);

// 写入数据
String newData = "String to write to file..."
+ System.currentTimeMillis();

ByteBuffer byteBuffer1 = ByteBuffer.allocate(48);
byteBuffer1.clear();
byteBuffer1.put(newData.getBytes());
byteBuffer1.flip();

int bytesSent = datagramChannel.send(byteBuffer1, new InetSocketAddress("example.com", 80));

}

Scatter/Gather

Java NIO 开始支持 scatter/gather,scatter/gather 用于描述从 Channel 中读取或者写入到 Channel 的操作。

Scatter

分散(scatter) 从 Channel 中读取是指在读操作时将读取的数据写入多个 buffer 中。因此,Channel将从Channel中读取的数据“分散(scatter)”到多个Buffer中。

1
2
3
4
5
6
ByteBuffer header = ByteBuffer.allocate(128);
ByteBuffer body = ByteBuffer.allocate(1024);

ByteBuffer[] bufferArray = { header, body };

channel.read(bufferArray);

注意 buffer 首先被插入到数组,然后再将数组作为 channel.read() 的输入参数。read() 方法按照 buffer 在数组中的顺序将从 channel 中读取的数据写入到 buffer,当一个 buffer 被写满后,channel 紧接着向另一个 buffer 中写。Scattering Reads在移动下一个buffer前,必须填满当前的buffer,这也意味着它不适用于动态消息(消息大小不固定)

Gather

聚集(gather) 写入Channel是指在写操作时将多个 buffer 的数据写入同一个 Channel,因此,Channel 将多个 Buffer 中的数据“聚集(gather)”后发送到 Channel。

1
2
3
4
5
6
7
8
ByteBuffer header = ByteBuffer.allocate(128);
ByteBuffer body = ByteBuffer.allocate(1024);

//write data into buffers

ByteBuffer[] bufferArray = { header, body };

channel.write(bufferArray);

buffers 数组是 write() 方法的入参,write() 方法会按照 buffer 在数组中的顺序,将数据写入到 channel,注意只有 position 和 limit 之间的数据才会被写入。因此与 Scattering Reads 相反,Gathering Writes 能较好的处理动态消息。

Channel 之间的数据传输

在Java NIO中,如果两个通道中有一个是 FileChannel,那你可以直接将数据从一个 Channel 传输到另外一个 Channel。

FileChannel 的 transferFrom() 方法可以将数据从源通道传输到 FileChannel 中。

1
2
3
4
5
6
7
8
9
10
RandomAccessFile  fromFile = new RandomAccessFile("fromFile.txt", "rw");
FileChannel fromChannel = fromFile.getChannel();

RandomAccessFile toFile = new RandomAccessFile("toFile.txt", "rw");
FileChannel toChannel = toFile.getChannel();

long position = 0;
long count = fromChannel.size();

toChannel.transferFrom(position, count, fromChannel);

transferFrom 方法的输入参数 position 表示从 position 处开始向目标文件写入数据,count 表示最多传输的字节数。如果源通道的剩余空间小于 count 个字节,则所传输的字节数要小于请求的字节数。此外要注意,在 SocketChannel 的实现中,SocketChannel 只会传输此刻准备好的数据(可能不足 count 字节)。因此,SocketChannel 可能不会将请求的所有数据(count 个字节)全部传输到 FileChannel 中。

transferTo() 方法将数据从 FileChannel 传输到其他的 Channel 中。下面是一个简单的例子:

1
2
3
4
5
6
7
8
9
10
RandomAccessFile fromFile = new RandomAccessFile("fromFile.txt", "rw");
FileChannel fromChannel = fromFile.getChannel();

RandomAccessFile toFile = new RandomAccessFile("toFile.txt", "rw");
FileChannel toChannel = toFile.getChannel();

long position = 0;
long count = fromChannel.size();

fromChannel.transferTo(position, count, toChannel);

上面所说的关于 SocketChannel 的问题在 transferTo() 方法中同样存在。SocketChannel 会一直传输数据直到目标 buffer 被填满。

内存映射文件

缓冲区 (Buffer)

一个 Buffer 其实就是一块内存区域,,我们可以在这个内存区域中进行数据的读写。NIO Buffer 其实是这样的内存块的一个封装,并提供了一些操作方法让我们能够方便地进行数据的读写。

分类

  • ByteBuffer
  • CharBuffer
  • ShortBuffer
  • IntBuffer
  • LongBuffer
  • FloatBuffer
  • DoubleBuffer
  • MappedByteBuffer (ByteBuffer子类)

属性

1
2
3
4
5
6
7
8
    //标记:一个备忘位置。调用 mark( ) 来设定 mark=postion 。调用reset( ) 设定 position= mark。标记在设定前是未定义的(undefined)
private int mark = -1;
//位置:下一个要被读或写的元素的索引。位置会自动由相应的 get() 和 put() 函数更新。
private int position = 0;
//上界:缓冲区的第一个不能被读或写的元素。或者说,缓冲区中现存元素的计数。
private int limit ;
//容量:缓冲区能够容纳的数据元素的最大数量。这一容量在缓冲区创建时被设定,并且不能被改变
private int capacity ;

属性之间的关系:0 <= mark <= position <= limit <= capacity

方法

  1. 基本方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    //返回容量
    public final int capacity();
    //返回位置
    public final int position();
    //设置容量
    public final Buffer position(int newPosition);
    //返回上届
    public final int limit() ;
    //标记当前position为mark
    public final Buffer mark();
    //重回mark位置
    public final Buffer reset();
    //一般在把数据写入Buffer前调用
    public final Buffer clear() {
    position = 0;
    limit = capacity;
    mark = -1;
    return this;
    }
    //翻转:将缓存字节数组的指针设置为数组的开始序列即数组下标0。这样就可以从buffer开头,对该buffer进行遍历(读取)
    public final Buffer flip() {
    limit = position;
    position = 0;
    mark = -1;
    return this;
    }
    // 一般在把数据重写入Buffer前调用
    public final Buffer rewind() {
    position = 0;
    mark = -1;
    return this;
    }
    // limit - position
    public final int remaining();
    // position < limit
    public final boolean hasRemaining();
    public abstract boolean isReadOnly();
  2. 存取方法

    1
    2
    3
    4
    public abstract byte get( );
    public abstract byte get (int index);
    public abstract ByteBuffer put (byte b);
    public abstract ByteBuffer put (int index, byte b);
  3. 压缩方法

    1
    2
    //将position到limit之间的数据迁移至0开始处,然后limit=capacity
    public abstract ByteBuffer compact( );
  4. 比较方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10

    // true: 两个 Buffer 是相同类型
    // 两个 Buffer 的剩余的数据个数相同
    // 两个 Buffer 的剩余的数据都是相同的
    public boolean equals (Object ob)

    // 比较是针对每个缓冲区内 剩余数据 进行的,与它们在equals()中的方式相同
    // 直到不相等的元素被发现或者到达缓冲区的上界。
    // 如果一个缓冲区在不相等元素发现前已经被耗尽,较短 的缓冲区被认为是小于较长的缓冲区
    public int compareTo (Object ob)

直接缓冲区 VS 非直接缓冲区

  • 非直接缓冲区:通过 allocate() 方法分配缓冲区,直接在 JVM 堆上进行内存的分配,本质上是 byte[] 数组的封装。非直接缓冲区如下图:

当程序想要从硬盘中读取数据,先从物理硬盘把数据读取到物理内存中,再将内容复制到JVM的内存中,然后读取应用程序才可以读取到内容。读写都是需要复制这一个动作,当遇到大文本的文件时,效率低下。

  • 直接缓冲区:通过 allocateDirect() 方法分配直接缓冲区,所分配的内存不在 JVM 堆上,不受 GC 的管理,当进行一些底层的系统 IO 操作时,效率会比较高,因为此时 JVM 不需要拷贝 buffer 中的内存到中间临时缓冲区中。直接缓冲区如下图:

内核地址空间和用户地址空间之间形成了一个物理内存映射文件,减少了之间的复制过程。

  1. 如果为直接缓冲区,则 JVM 会尽最大努力直接在此缓冲区上执行本机 I/O 操作。也就是说,在每次调用操作系统的一个本机 I/O 操作之前(或之后),虚拟机都会尽量避免将缓冲区的内容复制到中间缓冲区中(或从中间缓冲区中复制内容)。
  2. 直接缓冲区可以通过调用 allocateDirect() 工厂方法来创建,此方法实现的缓冲区分配和取消分配所需成本通常高于非直接缓冲区 。直接缓冲区的内容可以驻留在常规的垃圾回收堆之外,因此,它们对应用程序的内存需求量造成的影响并不明显。
  3. 直接缓冲区还可以通过 FileChannel 的 map() 方法将文件区域直接映射到内存中来创建,该方法返回 MappedByteBuffer 。Java平台的实现有助于通过 JNI 从本机代码创建直接缓冲区。
  4. 字节缓冲区是直接缓冲区还是非直接缓冲区可通过调用其 isDirect() 方法来确定,提供此方法是为了能够在性能关键的代码中执行显式缓冲区管理。

ByteBuffer.allocateDirect() 和 MappedByteBuffer.load() 区别
DirectBuffer GC 所分配的内存不在 JVM 堆上,不受 GC 的管理。(但是 Direct Buffer 的 Java 对象是由 GC 管理的,因此当发生 GC,对象被回收时,Direct Buffer 也会被释放)

选择器 (Selector)

Selector(选择器)是 Java NIO 中能够检测一到多个 NIO 通道,并能够知晓通道是否为诸如读写事件做好准备的组件。这样,一个单独的线程可以管理多个 Channel,从而管理多个网络连接。

为了使用 Selector, 我们首先需要将 Channel 注册到 Selector 中, 随后调用 Selector 的 select()方法, 这个方法会阻塞直到注册在 Selector 中的 Channel 发送可读写事件。当这个方法返回后, 当前的这个线程就可以处理 Channel 的事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

public abstract class Selector implements Closeable {

protected Selector() { }
//实例化Selector
public static Selector open() throws IOException {
return SelectorProvider.provider().openSelector();
}

public abstract SelectorProvider provider();
//返回 注册到它们之上的通道的集合,不可以直接修改的
public abstract Set<SelectionKey> keys();
//返回 就绪的键.已注册的键的集合的子集,这个集合的每个成员都是相关的通道被选择器(在前一个选择操作中)判断为已经准备好的,并且包含于键的interest集合中的操作/
public abstract Set<SelectionKey> selectedKeys();
//非阻塞
public abstract int selectNow() throws IOException;
//设定时间内阻塞
public abstract int select(long timeout) throws IOException;
//完全阻塞
public abstract int select() throws IOException;
//停止阻塞中的select方法
public abstract Selector wakeup();
//测试一个选择器是否处于被打开的状态
public abstract boolean isOpen();
//释放它可能占用的资源并将所有相关的选择键设置为无效,一个在选择操作中阻塞的线程都将被唤醒,就像wakeup()方法被调用了一样。与选择器相关的通道将被注销,而键将被取消
public abstract void close() throws IOException;
}

创建选择器

通过 Selector.open() 方法, 我们可以创建一个选择器:

1
Selector selector = Selector.open();

将 Channel 注册到选择器中

为了使用选择器管理 Channel,我们需要将 Channel 注册到选择器中:

1
2
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);

注意,如果一个 Channel 要注册到 Selector 中,那么这个 Channel 必须是非阻塞的, 即 channel.configureBlocking(false);。因为 Channel 必须要是非阻塞的,因此 FileChannel 是不能够使用选择器的,因为 FileChannel 都是阻塞的。

另外,在使用 Channel.register() 方法时,第二个参数指定了我们对 Channel 的什么类型的事件感兴趣,这些事件有:

  • Connect, 即连接事件(TCP 连接),对应于 SelectionKey.OP_CONNECT,某个Channel成功连接到另一个服务器;
  • Accept,即确认事件,对应于SelectionKey.OP_ACCEPT,表示准备好接收新进入的连接;
  • Read,即读事件,对应于SelectionKey.OP_READ,表示 buffer 可读
  • Write,即写事件,对应于SelectionKey.OP_WRITE,表示 buffer 可写

我们可以使用 或运算 | 来组合多个事件,例如:

1
int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;

注意,一个 Channel 仅仅可以被注册到一个 Selector 一次,如果将 Channel 注册到 Selector 多次,那么其实就是相当于更新 SelectionKey 的 interest set。
例如:

1
2
channel.register(selector, SelectionKey.OP_READ);
channel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);

上面的 channel 注册到同一个 Selector 两次,那么第二次的注册其实就是相当于更新这个 Channel 的 interest set 为 SelectionKey.OP_READ | SelectionKey.OP_WRITE。

Selector 维护的 selection key

  • key set 包含着所有 selectionKey,当前所有注册到 Selector 中的Channel 返回的 SelectionKey 都包含在内,这个集合可以通过 selector.keys() 方法返回。
  • selected-key set 是 key set 的子集,其中的每个 selectionKey 所关联的 Channel 在 selection operation 期间被检测出至少准备好了一个可以在兴趣集中匹配到的操作。这个集合可以通过调用 selector.selectedKeys() 方法返回。
  • cancelled-key set 也是 key set 的子集,其中的每个 selectionKey 都已经被取消,但是所关联 Channel 还没有被撤销登记。cancelled-key set 不能够被直接返回。

对于一个新创建的 Selector,这三个集合都是空的。

通过 Channel 的 register 方法,一个 selectionKey 被增加到 Selector 的 key set 中。

当通过 channel.close()selectionKey.cancel() 来取消一个selectionKey,这个 selectionKey 都会被立即添加到 Selector 的 cancelled-key set 中,但是所关联的 Channel 并没有立即被撤销登记,直到发生下次 selection operations,这些 Channel 才被从 Selector 中撤销登记,与此同时这些 Cancelled keys 才会从 Selector 的所有 selectionKey set(可能是 key set_、selected-key set、cancelled-key set)中移除,但是不会影响这些集合本身。

在 selection operations 期间,一些 selectionKey 会被选中添加到 selected-key set 中。其中的每个 key 可以通过 selectiedKeys.remove()selectiedKeys.iterator().remove() 直接从 selected-key set 中移除,除此之外不能够通过任何方式被直接移除。特殊的,selected-key set 中的keys 还可以在 selection operations 期间被间接移除。但是是不可以直接向 selected-key set 添加 key 的。

Selector 如何选择就绪 Channel

每次 selection operation 期间, keys 都可以添加到 selected-key set 或从 selected-key set 中移除,同时也可以从它的 key set 和 cancelled-key set 中移除。 selection operation 通过执行 selector.select()selector.select(long),和 selector.selectNow() 方法被触发,并且这些方法涉及到以下三个步骤:

  1. 首先每个位于 cancelled-key set 中的 key 会从每个包含它的 key 集合中被移除,并且对应的 Channel 会被撤销登记。这个步骤使得 cancelled-key set 为空。

  2. 查询底层操作系统来获得 Selector 中剩余 Channel 的就续事件从 selection operation 开始到此刻的更新情况,只要存在 Channel 的就续事件的更新部分有至少一个与兴趣集中的操作匹配上,那么将会执行以下两个动作:

    • 如果这个 Channel 对应的 SelectionKey 没有存在于 selected-key set中,那么将它添加到这个集合中,并将它的就绪操作集(ready set)修改成目前就绪的操作,任何先前记录在就绪操作集中的就绪信息都会被丢弃。
    • 否则,如果这个 Channel 对应的 SelectionKey 存在于 selected-key set 中,那么则保留就绪操作集中先前的就绪信息,并将这些刚刚就绪的操作 写入进去;总而言之,系统底层会通过按位与 & 操作更新当前就绪集。

如果这些 SelectionKey 的兴趣集( interest set )为空,那么 selected-key set 和 所有这些 key 的就续集(ready set)都不会被更新。

  1. 如果在步骤 2 正在进行时将任何 key 添加到 cancelled-key set,则按步骤 1 处理它们。

selection operations 是否会阻塞等待一个或多个通道准备就绪,以及等待多长时间,这是三种选择方法之间唯一的本质区别。

Selector 是否线程安全

多线程并发情况下 Selectors 本身是线程安全的,但是他们所持有的 key set不是线程安全的。

selection operations 在 selector 本身,key set 和 selected-key set 上是同步的,还在上面的步骤 1 和 3 中涉及的 canceled-key set 上同步。

在 selection operations 期间改变 SelectionKey 的兴趣集,对于本次操作将不会产生任何影响;影响将会在下次 selection operations 期间发生。

SelectionKey 可能会被取消,Channel 可能随时关闭。 因此,在一个或多个选择器的 key 集合中存在并不意味着 SelectionKey 有效或其 Channel 是开放的。有可能另一个线程取消 SelectionKey 或关闭一个 Channel,应用程序代码应该小心地同步并检查这些条件。

一个线程通过 selector.select()selector.select(long) 方法产生的阻塞可以被其他线程用以下三种方式的任意一种来中断:

  1. 调用 selector.wakeup() 方法;
  2. 调用 selector.close() 方法;
  3. 调用阻塞线程的 interrupt() 方法,此时线程会中断,selector 的wakeup() 方法会被调用。

selector.close() 在 selection operations 期间会顺序同步 selector 和三个 key set。

一个 selector 的 key set 和 selected-key set 通常情况下是线程不安全的。如果一个线程想要修改这个集合,需要同步控制它。通过 key set 的 iterator() 方法返回的 Iterators 提供了 fail-fast:如果在创建迭代器之后修改了 set,除了通过调用迭代器自己的 remove() 方法之外,将抛出 ConcurrentModificationException 。

SelectionKey

当我们使用 register 注册一个 Channel 时,会返回一个 SelectionKey 对象,这个对象包含了如下内容:

  • interest set
  • ready set
  • channel
  • selector
  • attached object,可选的附加对象
interest set

即我们感兴趣的事件集, 即在调用 register 注册 channel 时所设置的 interest set。我们可以通过如下方式获取 interest set:

1
2
3
4
5
6
int interestSet = selectionKey.interestOps();

boolean isInterestedInAccept = interestSet & SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
boolean isInterestedInRead = interestSet & SelectionKey.OP_READ;
boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE;
ready set

ready set 代表了 Channel 所准备好的操作。我们可以像判断 interest set 一样操作 ready set,但是我们还可以使用如下方法进行判断:

1
2
3
4
5
6
int readySet = selectionKey.readyOps();

selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWritable();

Channel 和 Selector

我们可以通过 SelectionKey 获取相对应的 Channel 和 Selector:

1
2
Channel  channel  = selectionKey.channel();
Selector selector = selectionKey.selector();

Attaching Object

我们可以在 selectionKey 中附加一个对象:

1
2
selectionKey.attach(theObject);
Object attachedObj = selectionKey.attachment();

或者在注册时直接附加:

1
SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);

通过 Selector 选择 Channel

我们可以通过 Selector.select() 方法获取对某件事件准备好的 Channel,即如果我们在注册 Channel 时,对其的 可写事件 感兴趣,那么当 select() 返回时,我们就可以获取 Channel 了。注意,select() 方法返回的值表示有多少个 Channel 可操作。

获取可操作的 Channel

如果 select()方法返回值表示有多个 Channel 准备好,那么我们可以通过 Selected key set 访问这个 Channel:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

Set<SelectionKey> selectedKeys = selector.selectedKeys();

Iterator<SelectionKey> keyIterator = selectedKeys.iterator();

while(keyIterator.hasNext()) {

SelectionKey key = keyIterator.next();

if(key.isAcceptable()) {
// a connection was accepted by a ServerSocketChannel.

} else if (key.isConnectable()) {
// a connection was established with a remote server.

} else if (key.isReadable()) {
// a channel is ready for reading

} else if (key.isWritable()) {
// a channel is ready for writing
}

keyIterator.remove();
}

在每次迭代时,最后要调用 keyIterator.remove() 将这个 key 从迭代器中删除,因为 select() 方法仅仅是简单地将就绪的 IO 操作放到 selectedKeys 集合中,因此如果我们从 selectedKeys 获取到一个 key,但是没有将它删除, 那么下一次 select 时, 这个 key 所对应的 IO 事件还在 selectedKeys 中。注意,我们可以动态更改 SekectedKeys 中的 key 的 interest set。例如在 OP_ACCEPT 中,我们可以将 interest set 更新为 OP_READ,这样 Selector 就会将这个 Channel 的 读 IO 就绪事件包含进来了。

Selector 的基本使用流程

  1. 通过 Selector.open() 打开一个 Selector
  2. 将 Channel 注册到 Selector 上,并设置需要监听的事件(interest set)
  3. 不断重复:
    • 调用 select() 方法
    • 调用 selector.selectedKeys() 获取 selected keys
    • 迭代每个 selected key:
      • 从 selected key 中获取 对应的 Channel 和附加信息(如果存在)
      • 判断是哪些 IO 事件已经就绪,然后处理。如果是 OP_ACCEPT 事件, 则调用 SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept() 获取 SocketChannel,并将它设置为非阻塞的,然后将这个 Channel 注册到 Selector 中。
      • 根据需要更改 selected key 的监听事件。
      • 将已经处理过的 key 从 selected keys 集合中删除。

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
public class SelectorTest {

private static final int READ_BUF_SIZE = 1024;
private static final int WRITE_BUF_SIZE = 256;
private static final int TIMEOUT = 3000;

public static void main(String[] args) throws IOException {

// 打开ServerSocketChannel
ServerSocketChannel serverSocketChannel=ServerSocketChannel.open();
// 打开 Selector
Selector selector=Selector.open();
// 服务端 Socket 监听8080端口, 并配置为非阻塞模式
serverSocketChannel.socket().bind(new InetSocketAddress("127.0.0.1",9980));
serverSocketChannel.configureBlocking(false);

// 将 channel 注册到 selector 中.
// 通常我们都是先注册一个 OP_ACCEPT 事件, 然后在 OP_ACCEPT 到来时, 再将这个 Channel 的 OP_READ
// 注册到 Selector 中
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

ByteBuffer readBuffer=ByteBuffer.allocate(READ_BUF_SIZE);

ByteBuffer writeBuffer=ByteBuffer.allocate(WRITE_BUF_SIZE);


while (true){
// 调用 select 方法阻塞等待
if(selector.select(TIMEOUT)==0){
System.out.println("继续等待...");
continue;
}

// 获取 I/O 操作就绪的 SelectionKey, 通过 SelectionKey 可以知道哪些 Channel 的哪类 I/O 操作已经就绪.
Iterator<SelectionKey> selectionKeyIterator=selector.keys().iterator();

while (selectionKeyIterator.hasNext()){
SelectionKey key = selectionKeyIterator.next();
// 当获取一个 SelectionKey 后, 就要将它删除, 表示我们已经对这个 IO 事件进行了处理.
selectionKeyIterator.remove();

if (key.isAcceptable()){
// 当 OP_ACCEPT 事件到来时, 我们就有从 ServerSocketChannel 中获取一个 SocketChannel,
// 代表客户端的连接
// 注意, 在 OP_ACCEPT 事件中, 从 key.channel() 返回的 Channel 是 ServerSocketChannel.
// 而在 OP_WRITE 和 OP_READ 中, 从 key.channel() 返回的是 SocketChannel.
SocketChannel socketChannel=((ServerSocketChannel)key.channel()).accept();
socketChannel.configureBlocking(false);
//在 OP_ACCEPT 到来时, 再将这个 Channel 的 OP_READ 注册到 Selector 中.
// 注意, 这里我们如果没有设置 OP_READ 的话, 即 interest set 仍然是 OP_CONNECT 的话, 那么 select 方法会一直直接返回.
socketChannel.register(selector, OP_READ);
}else if (key.isReadable()){
SocketChannel socketChannel=(SocketChannel) key.channel();
readBuffer.clear();
long read=socketChannel.read(readBuffer);
// -1 表示读到末尾,关闭Channel
if (read==-1){
socketChannel.close();
}else if(read>0){
readBuffer.flip();
System.out.println("received : " + new String(readBuffer.array()));
key.interestOps(SelectionKey.OP_WRITE| OP_READ);
}
}else if(key.isWritable()){
writeBuffer.rewind();
SocketChannel socketChannel = (SocketChannel) key.channel();
socketChannel.write(writeBuffer);
key.interestOps(SelectionKey.OP_READ);
}
}
}
}
}

BIO 与 NIO 区别

IO NIO
阻塞 非阻塞
面向流 面向块
选择器

基于 Stream 与基于 Buffer

传统的 IO 是面向字节流或字符流的,而在 NIO 中,我们抛弃了传统的 IO 流,而是引入 Channel 和 Buffer 的概念。在 NIO 中,我只能从 Channel 中读取数据到 Buffer 中或将数据从 Buffer 中写入到 Channel。
那什么是基于 Stream 呢?在一般的 Java IO 操作中,我们以流式的方式顺序从一个 Stream 中读取一个或多个字节,因此我们不能随意改变读取指针的位置。而基于 Buffer 就显得有点不同了。我们首先需要从 Channel 中读取数据到 Buffer 中,当 Buffer 中有数据后,我们就可以对这些数据进行操作了。不像 IO 那样是顺序操作,NIO 中我们可以随意地读取任意位置的数据。

阻塞和非阻塞

Java 提供的各种 Stream 操作都是阻塞的,例如我们调用一个 read 方法读取一个文件的内容,那么调用 read 的线程会被阻塞住,直到 read 操作完成。
而 NIO 的非阻塞模式允许我们非阻塞地进行 IO 操作。例如我们需要从网络中读取数据,在 NIO 的非阻塞模式中,当我们调用 read 方法时,如果此时有数据,则 read 读取并返回;如果此时没有数据,则 read 直接返回,而不会阻塞当前线程。

Selector

Selector 是 NIO 中才有的概念,它是 Java NIO 之所以可以非阻塞地进行 IO 操作的关键。通过 Selector,一个线程可以监听多个 Channel 的 IO 事件,当我们向一个 Selector 中注册了 Channel 后,Selector 内部的机制就可以自动地为我们不断地查询(select)这些注册的 Channel 是否有已就绪的 IO 事件(例如可读、可写、网络连接完成等)。通过这样的 Selector 机制,我们就可以很简单地使用一个线程高效地管理多个 Channel。

管道 (Pipe)

Java NIO Pipe是线程之间的单向数据连接。在多线程编程中除了 wait()、notify()、notifyAll()等,增加了一种新的线程间通讯方式。Pipe 有一个 source 通道和一个 sink 通道。数据会被写到 sink 通道,从 source 通道读取。

开启管道

1
Pipe pipe = Pipe.open();

写入数据

1
2
3
4
5
Pipe.SinkChannel sinkChannel = pipe.sink();

sinkChannel.write(ByteBuffer src) ;
sinkChannel.write(ByteBuffer[] srcs) ;
sinkChannel.write(ByteBuffer[] srcs, int offset, int length) ;

读取数据

1
2
3
4
5
Pipe.SourceChannel sourceChannel = pipe.source();

sourceChannel.read(ByteBuffer dst);
sourceChannel.read(ByteBuffer[] dsts);
sourceChannel.read(ByteBuffer[] dsts, int offset, int length);

Path

Java 中的 Path 表示文件系统的路径,可以指向文件或文件夹,也有相对路径和绝对路径之分。
在很多方面,java.nio.file.Path 接口和 java.io.File 有相似性,但也有一些细微的差别。在很多情况下,可以用 Path 来代替 File 类。

创建Path实例

使用 Paths 类的静态方法 Paths.get() 来产生一个实例。

1
Path path = Paths.get("c:\\data\\hello.txt");

创建绝对路径Path

1
Path path = Paths.get("/home/wy/myfile.txt");

创建相对路径Path

可以通过 Paths.get(basePath, relativePath) 创建一个相对路径 Path。

1
Path projects = Paths.get("d:\\data", "projects");

Path.normalize()

Path 的 normalize() 方法可以标准化路径。标准化的含义是路径中的 . 和 .. 都去掉,指向真正的路径目录地址。

1
Path path2 = path1.normalize();

Files

Java NIO Files类 (java.nio.file.Files) 提供了操作文件的相关方法。

参考

  1. Java NIO-阅读笔记及总结
  2. Java NIO 的前生今世 之一 简介
  3. Java NIO 的前生今世 之二 NIO Channel 小结
  4. Java NIO 的前生今世 之三 NIO Buffer 详解
  5. Java NIO 的前生今世 之四 NIO Selector 详解
  6. Java NIO笔记
  7. Java NIO系列教程(十一) Pipe
  8. NIO之坑:完全理解NIO Selector
  9. Java NIO 之 Selector(选择器)