BIO | NIO | AIO | |
---|---|---|---|
IO模型 | 同步阻塞 | 同步非阻塞(多路复用) | 异步非阻塞 |
编程难度 | 简单 | 复杂 | 复杂 |
可靠性 | 差 | 好 | 好 |
吞吐量 | 低 | 高 | 高 |
阻塞 IO
和 非阻塞 IO
这两个概念是 程序级别
的。主要描述是程序请求操作系统 IO 操作之后,如果 IO 资源没有准备好,那么程序如何处理问题:前者等待,后者继续执行(并且使用线程一直轮询,直到有 IO 资源准备好)
同步 IO
和 非同步IO
这两个概念是操作系统级别
的。主要描述的是操作系统在收到程序请求 IO 操作后,如果 IO 资源没有准备好,该如何相应程序的问题:前者不响应,后者返回一个标记,当 IO 资源准备好之后,在用事件机制返回给程序。
Java BIO:同步并阻塞(传统阻塞性),应用程序中进程在发起 IO 调用后至内核执行 IO 操作返回结果之前,若发起系统调用的线程一直处于等待状态,则此次 IO 操作为阻塞 IO。阻塞 IO 简称 BIO,Blocking IO。
以前大多数网络通信方式都是阻塞模式,即:
package com.atguigu.bio;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class BIOServer {
public static void main(String[] args) throws Exception {
//线程池机制
//思路
//1. 创建一个线程池
//2. 如果有客户端连接,就创建一个线程,与之通讯(单独写一个方法)
ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
//创建ServerSocket
ServerSocket serverSocket = new ServerSocket(6666);
System.out.println("服务器启动了");
while (true) {
System.out.println("线程信息id = " + Thread.currentThread().getId() + "名字 = " + Thread.currentThread().getName());
//监听,等待客户端连接
System.out.println("等待连接....");
//会阻塞在accept()
final Socket socket = serverSocket.accept();
System.out.println("连接到一个客户端");
//就创建一个线程,与之通讯(单独写一个方法)
newCachedThreadPool.execute(new Runnable() {
public void run() {//我们重写
//可以和客户端通讯
handler(socket);
}
});
}
}
//编写一个handler方法,和客户端通讯
public static void handler(Socket socket) {
try {
System.out.println("线程信息id = " + Thread.currentThread().getId() + "名字 = " + Thread.currentThread().getName());
byte[] bytes = new byte[1024];
//通过socket获取输入流
InputStream inputStream = socket.getInputStream();
//循环的读取客户端发送的数据
while (true) {
System.out.println("线程信息id = " + Thread.currentThread().getId() + "名字 = " + Thread.currentThread().getName());
System.out.println("read....");
int read = inputStream.read(bytes);
if (read != -1) {
System.out.println(new String(bytes, 0, read));//输出客户端发送的数据
} else {
break;
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
System.out.println("关闭和client的连接");
try {
socket.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
传统的 IO 模型,其主要是一个 Server 对接 N 个客户端,在客户端连接之后,为每个客户端分配一个子线程。如图所示:
从图中可以看出,传统 IO 的特点在于:
如果并发量不大,运行没有问题,但是如果海量并发时候,就会出现问题:
上述说的情况只是服务器只有一个线程的情况,那么如果引入多线程是不是可以解决这个问题:
监听模式/观察模式
(等其他设计模式)通知主线程。
但是多线程解决这个问题有局限性:
为啥
serverSocket. accept()
会出现阻塞?
是因为 Java 通过 JNI 调用的系统层面的 accept0()
方法,accept0()
规定如果发现套间字从指定的端口来,就会等待。其实就是内部实现是操作系统级别的同步 IO。
了解 NIO 之前我们先来看看标准 I/O(Standard I/O)。
Standard I/O 是对字节的读写,在进行 I/O 之前,首先创建一个流对象,流对象的读写操作都是按字节,一个字节一个字节的读或者写。而 NIO 把 I/O 抽象成块,类似磁盘的读写,每次 I/O 操作的单位都是一个块,块被读入内存之后就是一个 byte[]
,NIO 一次可以读或者写多个字节。
IO 和 NIO 最重要的区别就是对数据的打包和传输的方式,IO 是以流的方式处理数据,而 NIO 以块的方式处理数据。
面向流的 IO 一次性处理一个字节数据:一个输入流产生一个字节数据,一个输出流消费一个字节数据。为流式数据创建过滤器非常容易,链接几个过滤器,以便每个过滤器只负责复杂处理机制的一部分。不利的一面是,面向流的 IO 通常处理非常慢。
面向块的 I/O 一次性处理一个数据块:按块处理数据比按流处理数据要快的多,但是面向块的 I/O 确实一些面向流的 I/O 所具有的优雅和简单。
I/O 包和 NIO 已经很好的集成了,java.io.*
中已经以 NIO 重新实现了,可以利用一些 NIO 的特性。例如:在 java.io.*
中某些类包含以块的形式读写数据的操作,这使得及时在面向流的系统中,处理数据也会更快。
Java NIO:同步非阻塞,服务器实现模式为一个线程处理多个请求(连接),即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮训到连接有 I/O 请求就进行处理。
核心概念:
flip
方法切换 Channel
是双向的 。register(Selector selector, int ops)
),一个 Selector 可以注册多个 SocketChannel。ByteBuffer
字节数据;ShortBuffer
字符串数据;CharBuffer
字符数据;IntBuffer
整数;LongBuffer
长整数;DoubleBuffer
小数;FloatBuffer
小数
Buffer 类提供了 4 个属性来提供数据元素信息:capacity(容量)
:缓存区的最大容量,Limit(终点)
:缓存区最大可操作位置,Position(位置)
:缓存区当前在操作的位置,Mark(标记)
:标记位置
public abstract class Buffer{
public final int capacity();
public final int position();
public final Buffer position(int newPosition);
public final int limit();
public final Buffer limit(int newLimit);
}
//其中比较常用的就是ByteBuffer(二进制数据),该类主要有以下方法
public abstract class ByteBuffer(){
public static ByteBuffer allocateDirect(int capacity);//直接创建缓冲区
public static ByteBuffer allocate(int capacity);//设置缓冲区的初始容量
public static ByteBuffer wrap(byte[] array);//把一个数组放入到缓冲区使用
//构造初始化位置offset和上界length的缓冲区
public static ByteBuffer wrap(byte[] array,int offset,int length);
//缓冲区读取相关API
public abstract byte get();//从当前位置position上get,get之后,positon会+1
public abstract byte get(int index);//从绝对位置获取
public abstract ByteBuffer put(byte b);//当前位置上put,put之后,position会+1
public abstract ByteBuffer put(int index,byte b);//从绝对位置put
}
状态变量的改变过程举例:
① 新建一个大小为 8 个字节的缓冲区,此时 position 为 0,而 limit = capacity = 8。capacity 变量不会改变,下面的讨论会忽略它。
② 从输入通道中读取 5 个字节数据写入缓冲区中,此时 position 移动设置为 5,limit 保持不变。
③ 在将缓冲区的数据写到输出通道之前,需要先调用 flip() 方法,这个方法将 limit 设置为当前 position,并将 position 设置为 0。
④ 从缓冲区中取 4 个字节到输出缓冲中,此时 position 设为 4。
⑤ 最后需要调用 clear() 方法来清空缓冲区,此时 position 和 limit 都被设置为最初位置。
以下展示了使用 NIO 快速复制文件的实例:
public static void fastCopy(String src, String dist) throws IOException {
/* 获得源文件的输入字节流 */
FileInputStream fin = new FileInputStream(src);
/* 获取输入字节流的文件通道 */
FileChannel fcin = fin.getChannel();
/* 获取目标文件的输出字节流 */
FileOutputStream fout = new FileOutputStream(dist);
/* 获取输出字节流的通道 */
FileChannel fcout = fout.getChannel();
/* 为缓冲区分配 1024 个字节 */
ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
while (true) {
/* 从输入通道中读取数据到缓冲区中 */
int r = fcin.read(buffer);
/* read() 返回 -1 表示 EOF */
if (r == -1) {
break;
}
/* 切换读写 */
buffer.flip();
/* 把缓冲区的内容写入输出文件中 */
fcout.write(buffer);
/* 清空缓冲区 */
buffer.clear();
}
}
通道类似流,但是有如下区别:
Channel 在 NIO 中是一个接口 public interface Channle extends Closeable{}
。其中,常用的 Channel 类有:
FileChannel
:用于文件的数据读写;DatagramChannel
:用于 UDP 的数据读写;ServerSocketChannel
:可以监听新来的连接,对每一个新进来的连接都会创建一个 SocketChannel。只有通过这个通道,应用程序才能箱操作系统注册支持“多路复用 IO”的端口减轻。支持 TCP 和 UDP 协议;SocketChannel
:TCP Socket 套接字的监听通道,用于 TCP 的数据读写
对本地文件进行 IO 操作,常用方法及实例应用:
//从通道读取数据并放到缓冲区内
public int read(ByteBuffer content);
//从缓冲区写数据到通道中
public int write(ByteBuffer content);
//从目标通道中复制数据到当前通道内
public long transferFrom(ReadableByteChannel src,long position,long count);
//把数据从当前通道复制到目标通道
public long transferTo(long position,long count,WritabelByteChannel target);
1 . 写入文件,使用之前 ByteBuffer
和 FileChannel
类
//使用之前ByteBuffer和FileChannel类,写入文件
import java.io.FileOutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
public class NIOFileChannel{
public static void main(String[] args) throws Exception{
String str = "hello,world";
//创一个输出流 -> channel
FileOutputStream stream = new FileOutputStream("d:\\file.txt");
//通过 stream 获取对应的 FileChannel
//这个 fileChannel 真实类型是 FileChannelImpl
FileChannel fileChannel = stream.getChannel();
//创建一个缓冲区 ByteBuffer
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
//将 str 放入到缓冲区
byteBuffer.put(str.getBytes());
//对 byteBuffer 进行 flip
byteBuffer.flip();
//将 byteBuffer 写入到 fileChannel
fileChannel.write(byteBuffer);
fileOutputStream.close();
}
}
2 . 读取文件数据并展示,使用之前 ByteBuffer
和 FileChannel
类
//读取本地文件
import java.io.FileOutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
public class NIOFileChannel{
public static void main(String[] args) throws Exception{
//创一个输出流 -> channel
File file = new File("d:\\file.txt");
FileOutputStream stream = new FileOutputStream(file);
//通过 stream 获取对应的 FileChannel
//这个 fileChannel 真实类型是 FileChannelImpl
FileChannel fileChannel = stream.getChannel();
//创建一个缓冲区 ByteBuffer
ByteBuffer byteBuffer = ByteBuffer.allocate((int)file.length());
//将 byteBuffer 写入到 fileChannel
fileChannel.read(byteBuffer);
//将 byteBuffer的字节转化成String
System.out.println(new String(byteBuffer.array()));
fileOutputStream.close();
}
}
3 . 使用一个 Buffer
完成文件的读取、写入
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
public class NIOFileChannel03 {
public static void main(String[] args) throws Exception {
FileInputStream fileInputStream = new FileInputStream("1.txt");
FileChannel fileChannel01 = fileInputStream.getChannel();
FileOutputStream fileOutputStream = new FileOutputStream("2.txt");
FileChannel fileChannel02 = fileOutputStream.getChannel();
ByteBuffer byteBuffer = ByteBuffer.allocate(512);
while (true) { //循环读取
//这里有一个重要的操作,一定不要忘了
/*
public final Buffer clear() {
position = 0;
limit = capacity;
mark = -1;
return this;
}
*/
byteBuffer.clear(); //清空 buffer
int read = fileChannel01.read(byteBuffer);
System.out.println("read = " + read);
if (read == -1) { //表示读完
break;
}
//将 buffer 中的数据写入到 fileChannel02--2.txt
byteBuffer.flip();
fileChannel02.write(byteBuffer);
}
//关闭相关的流
fileInputStream.close();
fileOutputStream.close();
}
}
4 . 拷贝文件transferFrom 方法
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.nio.channels.FileChannel;
public class NIOFileChannel04 {
public static void main(String[] args) throws Exception {
//创建相关流
FileInputStream fileInputStream = new FileInputStream("d:\\a.jpg");
FileOutputStream fileOutputStream = new FileOutputStream("d:\\a2.jpg");
//获取各个流对应的 FileChannel
FileChannel sourceCh = fileInputStream.getChannel();
FileChannel destCh = fileOutputStream.getChannel();
//使用 transferForm 完成拷贝
destCh.transferFrom(sourceCh, 0, sourceCh.size());
//关闭相关通道和流
sourceCh.close();
destCh.close();
fileInputStream.close();
fileOutputStream.close();
}
}
1. ByteBuffer 支持类型化的 put 和 get,put 放什么,get 取出什么,不然出现 BufferUnderflowException 异常
import java.nio.ByteBuffer;
public class NIOByteBufferPutGet {
public static void main(String[] args) {
//创建一个 Buffer
ByteBuffer buffer = ByteBuffer.allocate(64);
//类型化方式放入数据
buffer.putInt(100);
buffer.putLong(9);
buffer.putChar('尚');
buffer.putShort((short) 4);
//取出
buffer.flip();
System.out.println();
System.out.println(buffer.getInt());
System.out.println(buffer.getLong());
System.out.println(buffer.getChar());
System.out.println(buffer.getShort());
}
}
2. 普通 Buffer 转成只读 Buffer
import java.nio.ByteBuffer;
public class ReadOnlyBuffer {
public static void main(String[] args) {
//创建一个 buffer
ByteBuffer buffer = ByteBuffer.allocate(64);
for (int i = 0; i < 64; i++) {
buffer.put((byte) i);
}
//读取
buffer.flip();
//得到一个只读的 Buffer
ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer();
System.out.println(readOnlyBuffer.getClass());
//读取
while (readOnlyBuffer.hasRemaining()) {
System.out.println(readOnlyBuffer.get());
}
readOnlyBuffer.put((byte) 100); //ReadOnlyBufferException
}
}
3. NIO 中 MappedByteBuffer,可以让文件直接在堆外内存修改
import java.io.RandomAccessFile;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
/**
* 说明 1.MappedByteBuffer 可让文件直接在内存(堆外内存)修改,操作系统不需要拷贝一次
*/
public class MappedByteBufferTest {
public static void main(String[] args) throws Exception {
RandomAccessFile randomAccessFile = new RandomAccessFile("1.txt", "rw");
//获取对应的通道
FileChannel channel = randomAccessFile.getChannel();
/**
* 参数 1:FileChannel.MapMode.READ_WRITE 使用的读写模式
* 参数 2:0:可以直接修改的起始位置
* 参数 3:5: 是映射到内存的大小(不是索引位置),即将 1.txt 的多少个字节映射到内存
* 可以直接修改的范围就是 0-5
* 实际类型 DirectByteBuffer
*/
MappedByteBuffer mappedByteBuffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, 5);
mappedByteBuffer.put(0, (byte) 'H');
mappedByteBuffer.put(3, (byte) '9');
mappedByteBuffer.put(5, (byte) 'Y');//IndexOutOfBoundsException
randomAccessFile.close();
System.out.println("修改成功~~");
}
}
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
/**
* Scattering:将数据写入到 buffer 时,可以采用 buffer 数组,依次写入 [分散]
* Gathering:从 buffer 读取数据时,可以采用 buffer 数组,依次读
*/
public class ScatteringAndGatheringTest {
public static void main(String[] args) throws Exception {
//使用 ServerSocketChannel 和 SocketChannel 网络
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
InetSocketAddress inetSocketAddress = new InetSocketAddress(7000);
//绑定端口到 socket,并启动
serverSocketChannel.socket().bind(inetSocketAddress);
//创建 buffer 数组
ByteBuffer[] byteBuffers = new ByteBuffer[2];
byteBuffers[0] = ByteBuffer.allocate(5);
byteBuffers[1] = ByteBuffer.allocate(3);
//等客户端连接 (telnet)
SocketChannel socketChannel = serverSocketChannel.accept();
int messageLength = 8; //假定从客户端接收 8 个字节
//循环的读取
while (true) {
int byteRead = 0;
while (byteRead < messageLength) {
long l = socketChannel.read(byteBuffers);
byteRead += l; //累计读取的字节数
System.out.println("byteRead = " + byteRead);
//使用流打印,看看当前的这个 buffer 的 position 和 limit
Arrays.asList(byteBuffers).stream().map(buffer -> "position = " + buffer.position() + ", limit = " + buffer.limit()).forEach(System.out::println);
}
//将所有的 buffer 进行 flip
Arrays.asList(byteBuffers).forEach(buffer -> buffer.flip());
//将数据读出显示到客户端
long byteWirte = 0;
while (byteWirte < messageLength) {
long l = socketChannel.write(byteBuffers);//
byteWirte += l;
}
//将所有的buffer进行clear
Arrays.asList(byteBuffers).forEach(buffer -> {
buffer.clear();
});
System.out.println("byteRead = " + byteRead + ", byteWrite = " + byteWirte + ", messagelength = " + messageLength);
}
}
}
NIO 常常被叫做非阻塞 IO,主要是因为 NIO 在网络通信中的非阻塞特性被广泛使用。NIO 实现了 IO 多路复用中的 Reator 模型,一个线程 Thread 使用一个选择器 Selector 通过轮询的方式去监听多个 Channel 上的事件,从而让一个线程能够处理多个事件。
通过配置监听的通道 Channel 为非阻塞,那么当 Channel 上的 IO 事件还未到达时,就不会进入到阻塞状态一直等待,而是鸡血轮询其他 Channel,找到 IO 事件已经到达的 Channel 执行。
以为创建和切换线程的开销很大,因此使用一个线程处理多个事件显然比一个线程处理一个事件具有更好的性能。
多线程之间的上下文切换导致的开销
//Selector 类是一个抽象类,常用方法和说明如下:
public abstract class Selector implements Closeable{
public static Selector open();//监控所有注册的通道,当其中有IO操作可以进行时,将SelectionKey加入到内部的集合中并返回,参数用来设置超时时间
public Set<SelectionKey> selectedKey();//从内部集合中得到所有的SelectionKey
}
Selector selector = Selector.open();
ServerSocketChannel ssChannel = ServerSocketChannel.open();
ssChannel.configureBlocking(false);
ssChannel.register(selector,SelectionKey.OP_ACCEPT);
将通道注册到选择器上,还需要指定要注册的具体事件,主要有以下几类:
SelectionKey. OP_CONNECT
、SelectionKey. OP_ACCEPT
、SelectionKey. OP_READ
、 SelectionKey. OP_WRITE
他们在 SelectionKey 的定义如下:
public static final int OP_READ = 1 << 0;
public static final int OP_WRITE = 1 << 2;
public static final int OP_CONNECT = 1 << 3;
public static final int OP_ACCEPT = 1 << 4;
可以看出每个事件都能当成一个位域,从而组成事件集整数。例如:
int intersetSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
int num = selector.select();
使用 select()
方法来监听到达的事件,它会一直阻塞知道有至少一件事件到达。
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = keys.iterator();
while(keyIterator.hasNext()){
SelectionKey keyu = keyIterator.next();
if(key.isAcceptabnle()){
// ...
}else if(key.isReadable()){
// ...
}
keyIterator.remove();
}
因为一次 select() 调动不能处理完所有的事件,并且服务器端有可能需要一直监听事件,因此服务器端处理时间的代码一般会放在一个死循环内。
while (true) {
int num = selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = keys.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if (key.isAcceptable()) {
// ...
} else if (key.isReadable()) {
// ...
}
keyIterator.remove();
}
}
public class NIOServer {
public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
ServerSocketChannel ssChannel = ServerSocketChannel.open();
ssChannel.configureBlocking(false);
ssChannel.register(selector, SelectionKey.OP_ACCEPT);
ServerSocket serverSocket = ssChannel.socket();
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 8888);
serverSocket.bind(address);
while(true){
selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = keys.iterator();
while (keyIterator.hasNext()){
SelectionKey key = keyIterator.next();
if (key.isAcceptable()) {
ServerSocketChannel ssChannel1 = (ServerSocketChannel) key.channel();// 服务器会为每个新连接创建一个 SocketChannel SocketChannel sChannel = ssChannel1.accept();
sChannel.configureBlocking(false);// 这个新连接主要用于从客户端读取数据
sChannel.register(selector, SelectionKey.OP_READ);
}else if (key.isReadable()) {
SocketChannel sChannel = (SocketChannel) key.channel();
System.out.println(readDataFromSocketChannel(sChannel));
sChannel.close();
}
keyIterator.remove();
}
}
}
private static String readDataFromSocketChannel(SocketChannel sChannel) throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(1024);
StringBuilder data = new StringBuilder();
while(true) {
buffer.clear();
int n = sChannel.read(buffer);
if (n == -1) {
break;
}
buffer.flip();
int limit = buffer.limit();
char[] dst = new char[limit];
for (int i = 0;i < limit;i++) {
dst[i] = (char) buffer.get(i);
}
data.append(dst);
buffer.clear();
}
return data.toString();
}
}
public class NIOClient {
public static void main(String[] args) throws IOException{
Socket socket = new Socket("127.0.0.1",8888);
OutputStream out = socket.getOutputStream();
String s = "hello world";
out.write(s.getBytes());
out.close();
}
}
目前流程的多路复用 IO 实现主要宝库了四种:select、poll、epoll、kqueue。以下是其特性及区别:
IO 模型 | 相对性能 | 关键思路 | 操作系统 | Java 支持情况 |
---|---|---|---|---|
select | 较高 | Reactor | Win/Linux | 支持,Reactor 模式(反应器设计模式)。Linux kernels 2.4 内核版本之前,默认用的是 select ;目前 windows 下对吧同步 IO 的支持,都是 select 模型 |
poll | 较高 | Reactor | Linux | Linux 下的 Java 的 NIO 框架,Linux kernels 2.6 内核版本之前使用 poll 进行支持。也是使用的 Reactor 模式 |
epoll | 高 | Reactor/Proactor | Linux | Linux kernels 2.6 内核版本之后使用 epoll 进行支持 |
kqueue | 高 | Proactor | Linux | 目前 Java 版本不支持 |
从图上可知:一个完整的 Reactor 事件驱动模型是有四个部分组成:客户端 Client,Reactor,Acceptor 和时间处理 Handler。其中 Acceptor 会不间断的接收客户端的连接请求,然后通过 Reactor 分发到不同 Handler 进行处理。改进后的 Reactor 有如下优点:
在上面的处理模型中,由于网络读写是在同一个线程里面。在高并发情况下,会出现两个瓶颈:
基于上述瓶颈,可以将业务处理和 IO 读写分离出来:
如图可以看出,相对基础 Reactor 模型,该模型有如下特点:
这种模型在接收请求进行网络读写的同时,也在进行业务处理,大大提高了系统的吞吐量。但是也有不足的地方:
由于高并发的网络读写是系统一个瓶颈,所以针对这种情况,改进了模型,如图所示:
由图可以看出,在原有 Reactor 模型上,同时将 Reactor 拆分成 mainReactor 和 subReactor 。其中 mainReactor 主要负责客户端的请求连接,subReactor 通过一个线程池进行支撑,主要负责网络读写,因为线程池的原因,可以进行多线程并发读写,大大提升了网络读写的效率。业务处理也是通过线程池进行。通过这种方式,可以进行百万级别的连接。
对于上述的 Reactor 模型,主要有三个核心需要实现:Acceptor,Reactor 和 Handler。具体实现代码如下:
public class Reactor implements Runnable{
private final Selector selector;
private final ServerSocketChannel serverSocket;
public Reactor(int port) throws IOException{
serverSocket = ServerSocketChannel.open();//创建服务端的ServerSocketChannel
serverSocket.configureBlocking(false);//设置为非阻塞模式
selector = Selector.open();//创建一个selector选择器
SelectionKey key = serverSocket.register(selector,SelectionKey.OP_ACCEPT);
serverSocket.bind(new InetSocketAddress(port));//绑定服务端端口
key.attach(new Acceptor(serverSocket));//为服务端Channel绑定一个Acceptor
}
@Override
public void run(){
try{
while(!Thread.interrupted()){
selector.select();//服务端使用一个线程不停接收连接请求
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> itetrator = keys.iterator();
while(iterator.hasNext()){
dispatch(iterator.next());
iterator.remove();
}
selector.selectNow();
}
}catch(IOException e){
e.printStackTrace();
}
}
private void dispatch(SelectionKey key) throws IOException{
//这里的attachment也即前面的为服务端Channel绑定的Acceptor,调用其run()方法进行分发
Runnable attachment = (Runable)key.attachment();
attachment.run();
}
}
这里Reactor首先开启了一个ServerSocketChannel,然后将其绑定到指定的端口,并且注册到了一个多路复用器上。接着在一个线程中,其会在多路复用器上等待客户端连接。当有客户端连接到达后,Reactor就会将其派发给一个Acceptor,由该Acceptor专门进行客户端连接的获取。下面我们继续看一下Acceptor的代码:
public class Acceptor implements Runnable{
private final ExecuteorService executor = Exxcutors.newFixedThreadPool(20);
private final ServerSocketChannel serverSocket;
public Acceptor(ServerSocketChannel serverSocket){
this.serverSocket = serverSocket;
}
@Override
public void run(){
try{
SocketChannel channel = serverSocket.accept();
if(null != channel){
executor.execute(new Handler(channel));
}
}catch(IOException e){
e.printStackTrace();
}
}
}
这里可以看到,在Acceptor获取到客户端连接之后,其就将其交由线程池进行网络读写了,而这里的主线程只是不断监听客户端连接事件。下面我们看看Handler的具体逻辑:
public class Handler implements Runnable{
private volatile static Selector selector;
private final SocketChannel channel;
private SelectionKey key;
private volatile ByteBuffer input = ByteBuffer.allocate(1024);
private volatile ByteBuffer output = ByteBuffer.allocate(1024);
public Handle(SocketChannel channel) throws IOException{
this.channel = channel;
channel.configureBlocking(false);//设置客户端连接为非阻塞模式
selector = Selector.open();//为客户端创建一个选择器
key = channel.register(selector,SelectionKey.OP_READ);//注册客户端Channel的读事件
}
@Override
public void run(){
try{
while(selector.isOpen() && channel.isOpen()){
Set<SelectionKey> keys = select();//等待客户端事件发生
Iterator<SelectionKey> iterator = keys.iterator();
while(iterator.hasNext()){
SelectionKey key = iterator.next();
iterator.remove();
//如果当前是读事件,则读取数据
if(key.isReadable()){
read(key);
}else if(key.isWritable()){
write(key)
}
}
}
}catch(IOException e){
e.printStackTrace();
}
}
//读取客户端发送的数据
private void read(SelectionKey key) throws IOException{
channel.read(input);
if(input.position() == 0){
return ;
}
input.flip();
process();//对读数据进行业务处理
input.clear();
key.interstOps(SelectionKey.OP_WRITE);//读取完成后监听写入事件
}
private void write(SelectionKey key) throws IOException{
output.flip();
if(channel.isOpen()){
channel.write(output);//当有写入事件时,将业务处理的结果写入到客户端Channel中
key.channel();
channel.close();
output.clear();
}
}
//进行业务处理,并且获取处理结果。本质上,基于Reactor模型,如果这里成为处理瓶颈,则将处理过程放入到线程池里面即可,并且使用一个Future获取处理结果,最后写入到客户端Channel中
private void process(){
byte[] bytes = new byte[input.remaining()];
input.get(bytes);
String message = new String(bytes,CharsetUtil.UTF_8);
System.out.println("receive message from client: \n" +message);
output.put("hello client".getBytes());
}
}
在 Handler 中,主要进行的就是每个客户端 Channel 创建一个 Selector,并且监听该 Channel 的网络读写事件。当有事件到达时,进行数据的读写,而业务操作交友具体的业务线程池处理。
之前主要介绍了阻塞式同步 IO,非阻塞式同步 IO,多路复用 IO 这三种 IO 模型。而异步 IO 是采用“订阅-通知”模式,即应用程序向操作系统注册 IO 监听,然后继续做自己的事情。当操作系统发生 IO 事件,并且准备好数据后,主动通知应用程序,触发相应的函数:
和同步 IO 一样,异步 IO 也是由操作系统进行支持的。Windows 系统提供了一种异步 IO 技术:IOCP(I/O Completion Port,I/O 完成端口);
Linux 下由于没有这种异步 IO 技术,所以使用的是 epoll(上文介绍过的一种多路复用 IO 技术的实现)对异步 IO 进行模拟。
以上结构主要是说明 JAVA AIO 中类设计和操作系统的相关性。
上述所有代码仓库地址:https://github.com/z1gui/netty_io
参考资料: