编辑推荐: |
本文主要介绍了.Redis的IO多路复用原理解析相关知识。 希望对您的学习有所帮助
。
本文来自于开发者社区,由火龙果软件Linda编辑、推荐。 |
|
多路复用要解决的问题
并发多客户端连接,在多路复用之前最简单和典型的方案:同步阻塞网络IO模型
这种模式的特点就是用一个进程来处理一个网络连接(一个用户请求),比如一段典型的示例代码如下。
直接调用 recv 函数从一个 socket 上读取数据。
int main()
{
…
recv(sock, …) //从用户角度来看非常简单,一个recv一用,要接收的数据就到我们手里了。
}
|
我们来总结一下这种方式:
优点就是这种方式非常容易让人理解,写起代码来非常的自然,符合人的直线型思维。
缺点就是性能差,每个用户请求到来都得占用一个进程来处理,来一个请求就要分配一个进程跟进处理,
类似一个学生配一个老师,一位患者配一个医生,可能吗?进程是一个很笨重的东西。一台服务器上创建不了多少个进程。
结论
进程在 Linux 上是一个开销不小的家伙,先不说创建,光是上下文切换一次就得几个微秒。所以为了高效地对海量用户提供服务,必须要让一个进程能同时处理很多个
tcp 连接才行。现在假设一个进程保持了 10000 条连接,那么如何发现哪条连接上有数据可读了、哪条连接可写了
?
我们当然可以采用循环遍历的方式来发现 IO 事件,但这种方式太低级了。
我们希望有一种更高效的机制,在很多连接中的某条上有 IO 事件发生的时候直接快速把它找出来。
其实这个事情 Linux 操作系统已经替我们都做好了,它就是我们所熟知的 IO 多路复用机制。
这里的复用指的就是对进程的复用
IO多路复用模型
是什么?
IO:网络IO
多路:多个客户端连接(连接就是套接字描述符,即socket或者channel),指的是多条TCP连接
复用:用一个进程来处理多条的连接,使用单进程就可以实现同时处理多个客户端的连接
一句话:使用单进程就实现了多个客户端的连接,IO多路复用类似于一个接口,规范、落地实现,可以从select->poll->epoll三个阶段来实现
Redis单线程如何处理那么多并发客户端连接,为什么单线程,为什么快?
Redis的IO多路复用
Redis利用epoll来实现IO多路复用,将连接信息和事件放到队列中,一次放到文件事件分派器,事件分派器将事件分发给事件处理器。
Redis 是跑在单线程中的,所有的操作都是按照顺序线性执行的,但是由于读写操作等待用户输入或输出都是阻塞的,所以
I/O 操作在一般情况下往往不能直接返回,这会导致某一文件的 I/O 阻塞导致整个进程无法对其它客户提供服务,而
I/O 多路复用就是为了解决这个问题而出现
所谓 I/O 多路复用机制,就是说通过一种机制,可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或写就绪),能够通知程序进行相应的读写操作。这种机制的使用需要
select 、 poll 、 epoll 来配合。多个连接共用一个阻塞对象,应用程序只需要在一个阻塞对象上等待,无需阻塞等待所有连接。当某条连接有新的数据可以处理时,操作系统通知应用程序,线程从阻塞状态返回,开始进行业务处理。
Redis 服务采用 Reactor 的方式来实现文件事件处理器(每一个网络连接其实都对应一个文件描述符)
Redis基于Reactor模式开发了网络事件处理器,这个处理器被称为文件事件处理器。它的组成结构为4部分:
多个套接字、
IO多路复用程序、
文件事件分派器、
事件处理器。
因为文件事件分派器队列的消费是单线程的,所以Redis才叫单线程模型
Redis设计与实现
从Redis6开始,将网络数据读写、请求协议解析通过多个IO线程的来处理
,对于真正的命令执行来说,仍然使用单线程操作,一举两得,便宜占尽。
Unix网络编程中几种常用模型
阻塞IO
非阻塞IO
IO多路复用
从吃米线开始,读读read
上午开会,错过了公司食堂的饭点, 中午就和公司的首席架构师一起去楼下的米线店去吃米线。我们到了一看,果然很多人在排队。
架构师马上发话了:嚯,请求排队啊!你看这位收银点菜的,像不像nginx的反向代理?只收请求,不处理,把请求都发给后厨去处理。
我们交了钱,拿着号离开了点餐收银台,找了个座位坐下等餐。
架构师:你看,这就是异步处理,我们下了单就可以离开等待,米线做好了会通过小喇叭“回调”我们去取餐;
如果同步处理,我们就得在收银台站着等餐,后面的请求无法处理,客户等不及肯定会离开了。
接下里架构师盯着手中的纸质号牌。
架构师:你看,这个纸质号牌在后厨“服务器”那里也有,这不就是表示会话的ID吗?
有了它就可以把大家给区分开,就不会把我的排骨米线送给别人了。过了一会, 排队的人越来越多,已经有人表示不满了,可是收银员已经满头大汗,忙到极致了。
架构师:你看他这个系统缺乏弹性扩容, 现在这么多人,应该增加收银台,可以没有其他收银设备,老板再着急也没用。
老板看到在收银这里帮不了忙,后厨的订单也累积得越来越多, 赶紧跑到后厨亲自去做米线去了。
架构师又发话了:幸亏这个系统的后台有并行处理能力,可以随意地增加资源来处理请求(做米线)。
我说:他就这点儿资源了,除了老板没人再会做米线了。
不知不觉,我们等了20分钟, 但是米线还没上来。
架构师:你看,系统的处理能力达到极限,超时了吧。
这时候收银台前排队的人已经不多了,但是还有很多人在等米线。
老板跑过来让这个打扫卫生的去收银,让收银小妹也到后厨帮忙。打扫卫生的做收银也磕磕绊绊的,没有原来的小妹灵活。
架构师:这就叫服务降级,为了保证米线的服务,把别的服务都给关闭了。
又过了20分钟,后厨的厨师叫道:237号, 您点的排骨米线没有排骨了,能换成番茄的吗?
架构师低声对我说:瞧瞧, 人太多, 系统异常了。然后他站了起来:不行,系统得进行补偿操作:退费。
说完,他拉着我,饿着肚子,头也不回地走了。
同步
调用者要一直等待调用结果的通知后才能进行后续的执行,现在就要,我可以等,等出结果为止
异步
指被调用方先返回应答让调用者返回,然后再计算调用结果,计算完最终结果后再通知并返回给调用方。
异步调用要想获得结果一般通过回调
同步和异步的理解
同步、异步的讨论对象是被调用者(服务提供者),重点在于获得调用结果的消息通知方式上
阻塞
调用方一直在等待而且别的事情什么都不做,当前进程线程被挂起,啥也不干
非阻塞
调用在发出去后,调用方先去忙别的事情,不会阻塞当前进程线程,而会立即返回
阻塞和非阻塞的理解
阻塞和非阻塞的讨论对象是调用者(服务请求者)重点在于等消息时候的行为,带哦用着是否能干其它事
JAVA实践
背景
一个redisServer + 2个Client
BIO
当用户进程调用了recvfrom这个系统调用,kernel就开始了IO的第一个阶段:准备数据(对于网络IO来说,很多时候数据在一开始还没有到达。比如,还没有收到一个完整的UDP包。这个时候kernel就要等待足够的数据到来)。这个过程需要等待,也就是说数据被拷贝到操作系统内核的缓冲区中是需要一个过程的。而在用户进程这边,整个进程会被阻塞(当然,是进程自己选择的阻塞)。当kernel一直等到数据准备好了,它就会将数据从kernel中拷贝到用户内存,然后kernel返回结果,用户进程才解除block的状态,重新运行起来。所以,BIO的特点就是在IO执行的两个阶段都被block了。
accept监听
RedisServer
public class RedisServer
{
public static void main(String[] args) throws IOException
{
byte[] bytes = new byte[1024];
ServerSocket serverSocket = new ServerSocket(6379);
while(true)
{
System.out.println("-----111 等待连接");
Socket socket = serverSocket.accept();
System.out.println("-----222 成功连接");
}
}
}
|
RedisClient01
public class RedisClient01
{
public static void main(String[] args) throws IOException
{
System.out.println("------RedisClient01 start");
Socket socket = new Socket("127.0.0.1", 6379);
}
}
|
RedisClient02
public class RedisClient02
{
public static void main(String[] args) throws IOException
{
System.out.println("------RedisClient02 start");
Socket socket = new Socket("127.0.0.1", 6379);
}
}
|
Read读取
先启动RedisServerBIO,再启动RedisClient01验证后再启动02客户端
RedisServerBIO
public class RedisServerBIO
{
public static void main(String[] args) throws IOException
{
ServerSocket serverSocket = new ServerSocket(6379);
while(true)
{
System.out.println("-----111 等待连接");
Socket socket = serverSocket.accept();//阻塞1 ,等待客户端连接
System.out.println("-----222 成功连接");
InputStream inputStream = socket.getInputStream();
int length = -1;
byte[] bytes = new byte[1024];
System.out.println("-----333 等待读取");
while((length = inputStream.read(bytes)) != -1)//阻塞2 ,等待客户端发送数据
{
System.out.println("-----444 成功读取"+new String(bytes,0,length));
System.out.println("====================");
System.out.println();
}
inputStream.close();
socket.close();
}
}
}
|
RedisClient01
public class RedisClient01
{
public static void main(String[] args) throws IOException
{
Socket socket = new Socket("127.0.0.1",6379);
OutputStream outputStream = socket.getOutputStream();
//socket.getOutputStream().write("RedisClient01".getBytes());
while(true)
{
Scanner scanner = new Scanner(System.in);
String string = scanner.next();
if (string.equalsIgnoreCase("quit")) {
break;
}
socket.getOutputStream().write(string.getBytes());
System.out.println("------input quit keyword to finish......");
}
outputStream.close();
socket.close();
}
}
|
RedisClient02
public class RedisClient02
{
public static void main(String[] args) throws IOException
{
Socket socket = new Socket("127.0.0.1",6379);
OutputStream outputStream = socket.getOutputStream();
//socket.getOutputStream().write("RedisClient01".getBytes());
while(true)
{
Scanner scanner = new Scanner(System.in);
String string = scanner.next();
if (string.equalsIgnoreCase("quit")) {
break;
}
socket.getOutputStream().write(string.getBytes());
System.out.println("------input quit keyword to finish......");
}
outputStream.close();
socket.close();
}
}
|
存在的问题
上面的模型存在很大的问题,如果客户端与服务端建立了连接,如果这个连接的客户端迟迟不发数据,程就会一直堵塞在read()方法上,这样其他客户端也不能进行连接,也就是一次只能处理一个客户端,对客户很不友好
知道问题所在了,请问如何解决??
解决
利用多线程
只要连接了一个socket,操作系统分配一个线程来处理,这样read()方法堵塞在每个具体线程上而不堵塞主线程,就能操作多个socket了,哪个线程中的socket有数据,就读哪个socket,各取所需,灵活统一。
程序服务端只负责监听是否有客户端连接,使用 accept() 阻塞
客户端1连接服务端,就开辟一个线程(thread1)来执行 read() 方法,程序服务端继续监听
客户端2连接服务端,也开辟一个线程(thread2)来执行 read() 方法,程序服务端继续监听
客户端3连接服务端,也开辟一个线程(thread3)来执行 read() 方法,程序服务端继续监听
。。。。。。
任何一个线程上的socket有数据发送过来,read()就能立马读到,cpu就能进行处理。
RedisServerBIOMultiThread
public class RedisServerBIOMultiThread
{
public static void main(String[] args) throws IOException
{
ServerSocket serverSocket = new ServerSocket(6379);
while(true)
{
//System.out.println("-----111 等待连接");
Socket socket = serverSocket.accept();//阻塞1 ,等待客户端连接
//System.out.println("-----222 成功连接");
new Thread(() -> {
try {
InputStream inputStream = socket.getInputStream();
int length = -1;
byte[] bytes = new byte[1024];
System.out.println("-----333 等待读取");
while((length = inputStream.read(bytes)) != -1)//阻塞2 ,等待客户端发送数据
{
System.out.println("-----444 成功读取"+new String(bytes,0,length));
System.out.println("====================");
System.out.println();
}
inputStream.close();
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
},Thread.currentThread().getName()).start();
System.out.println(Thread.currentThread().getName());
}
}
}
|
RedisClient01
public class RedisClient01
{
public static void main(String[] args) throws IOException
{
Socket socket = new Socket("127.0.0.1",6379);
OutputStream outputStream = socket.getOutputStream();
//socket.getOutputStream().write("RedisClient01".getBytes());
while(true)
{
Scanner scanner = new Scanner(System.in);
String string = scanner.next();
if (string.equalsIgnoreCase("quit")) {
break;
}
socket.getOutputStream().write(string.getBytes());
System.out.println("------input quit keyword to finish......");
}
outputStream.close();
socket.close();
}
}
|
RedisClient02
public class RedisClient02
{
public static void main(String[] args) throws IOException
{
Socket socket = new Socket("127.0.0.1",6379);
OutputStream outputStream = socket.getOutputStream();
//socket.getOutputStream().write("RedisClient01".getBytes());
while(true)
{
Scanner scanner = new Scanner(System.in);
String string = scanner.next();
if (string.equalsIgnoreCase("quit")) {
break;
}
socket.getOutputStream().write(string.getBytes());
System.out.println("------input quit keyword to finish......");
}
outputStream.close();
socket.close();
}
}
|
存在问题
多线程模型
每来一个客户端,就要开辟一个线程,如果来1万个客户端,那就要开辟1万个线程。
在操作系统中用户态不能直接开辟线程,需要调用内核来创建的一个线程,这其中还涉及到用户状态的切换(上下文的切换),十分耗资源。
知道问题所在了,请问如何解决??
第一个办法:使用线程池
这个在客户端连接少的情况下可以使用,但是用户量大的情况下,你不知道线程池要多大,太大了内存可能不够,也不可行。
第二个办法:NIO(非阻塞式IO)方式
因为read()方法堵塞了,所有要开辟多个线程,如果什么方法能使read()方法不堵塞,这样就不用开辟多个线程了,这就用到了另一个IO模型,NIO(非阻塞式IO)
tomcat7之前就是用BIO多线程来解决多连接
两个痛点
accept
read
在阻塞式IO模型中,应用程序在从调用recvfrom开始到它返回有数据报准备好这段时间是阻塞的,recvfrom返回成功后,应用进程才能开始处理数据报。
思考:每个线程分配一个连接,必然会产生多个,既然是多个socket连接必然需要放入容器,纳入统一管理 |