您可以捐助,支持我们的公益事业。

1元 10元 50元





认证码:  验证码,看不清楚?请点击刷新验证码 必填



  求知 文章 文库 Lib 视频 iPerson 课程 认证 咨询 工具 讲座 Model Center   Code  
会员   
   
 
     
   
 
 订阅
Redis的IO多路复用原理解析(上)
 

   次浏览      
2024-3-4
 
编辑推荐:
本文主要介绍了.Redis的IO多路复用原理解析相关知识。 希望对您的学习有所帮助 。
本文来自于开发者社区,由火龙果软件Linda编辑、推荐。

多路复用要解决的问题

并发多客户端连接,在多路复用之前最简单和典型的方案:同步阻塞网络IO模型

这种模式的特点就是用一个进程来处理一个网络连接(一个用户请求),比如一段典型的示例代码如下。

直接调用 recv 函数从一个 socket 上读取数据。

int main()
{
recv(sock, …) //从用户角度来看非常简单,一个recv一用,要接收的数据就到我们手里了。
}

我们来总结一下这种方式:

优点就是这种方式非常容易让人理解,写起代码来非常的自然,符合人的直线型思维。

缺点就是性能差,每个用户请求到来都得占用一个进程来处理,来一个请求就要分配一个进程跟进处理,

类似一个学生配一个老师,一位患者配一个医生,可能吗?进程是一个很笨重的东西。一台服务器上创建不了多少个进程。

结论

进程在 Linux 上是一个开销不小的家伙,先不说创建,光是上下文切换一次就得几个微秒。所以为了高效地对海量用户提供服务,必须要让一个进程能同时处理很多个 tcp 连接才行。现在假设一个进程保持了 10000 条连接,那么如何发现哪条连接上有数据可读了、哪条连接可写了 ?

我们当然可以采用循环遍历的方式来发现 IO 事件,但这种方式太低级了。

我们希望有一种更高效的机制,在很多连接中的某条上有 IO 事件发生的时候直接快速把它找出来。

其实这个事情 Linux 操作系统已经替我们都做好了,它就是我们所熟知的 IO 多路复用机制。

这里的复用指的就是对进程的复用

IO多路复用模型

是什么?

IO:网络IO

多路:多个客户端连接(连接就是套接字描述符,即socket或者channel),指的是多条TCP连接

复用:用一个进程来处理多条的连接,使用单进程就可以实现同时处理多个客户端的连接

一句话:使用单进程就实现了多个客户端的连接,IO多路复用类似于一个接口,规范、落地实现,可以从select->poll->epoll三个阶段来实现

Redis单线程如何处理那么多并发客户端连接,为什么单线程,为什么快?

Redis的IO多路复用

Redis利用epoll来实现IO多路复用,将连接信息和事件放到队列中,一次放到文件事件分派器,事件分派器将事件分发给事件处理器。

Redis 是跑在单线程中的,所有的操作都是按照顺序线性执行的,但是由于读写操作等待用户输入或输出都是阻塞的,所以 I/O 操作在一般情况下往往不能直接返回,这会导致某一文件的 I/O 阻塞导致整个进程无法对其它客户提供服务,而 I/O 多路复用就是为了解决这个问题而出现

所谓 I/O 多路复用机制,就是说通过一种机制,可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或写就绪),能够通知程序进行相应的读写操作。这种机制的使用需要 select 、 poll 、 epoll 来配合。多个连接共用一个阻塞对象,应用程序只需要在一个阻塞对象上等待,无需阻塞等待所有连接。当某条连接有新的数据可以处理时,操作系统通知应用程序,线程从阻塞状态返回,开始进行业务处理。

Redis 服务采用 Reactor 的方式来实现文件事件处理器(每一个网络连接其实都对应一个文件描述符)

Redis基于Reactor模式开发了网络事件处理器,这个处理器被称为文件事件处理器。它的组成结构为4部分:

多个套接字、

IO多路复用程序、

文件事件分派器、

事件处理器。

因为文件事件分派器队列的消费是单线程的,所以Redis才叫单线程模型

Redis设计与实现

从Redis6开始,将网络数据读写、请求协议解析通过多个IO线程的来处理 ,对于真正的命令执行来说,仍然使用单线程操作,一举两得,便宜占尽。

Unix网络编程中几种常用模型

阻塞IO

非阻塞IO

IO多路复用

从吃米线开始,读读read

上午开会,错过了公司食堂的饭点, 中午就和公司的首席架构师一起去楼下的米线店去吃米线。我们到了一看,果然很多人在排队。

架构师马上发话了:嚯,请求排队啊!你看这位收银点菜的,像不像nginx的反向代理?只收请求,不处理,把请求都发给后厨去处理。

我们交了钱,拿着号离开了点餐收银台,找了个座位坐下等餐。

架构师:你看,这就是异步处理,我们下了单就可以离开等待,米线做好了会通过小喇叭“回调”我们去取餐;

如果同步处理,我们就得在收银台站着等餐,后面的请求无法处理,客户等不及肯定会离开了。

接下里架构师盯着手中的纸质号牌。

架构师:你看,这个纸质号牌在后厨“服务器”那里也有,这不就是表示会话的ID吗?

有了它就可以把大家给区分开,就不会把我的排骨米线送给别人了。过了一会, 排队的人越来越多,已经有人表示不满了,可是收银员已经满头大汗,忙到极致了。

架构师:你看他这个系统缺乏弹性扩容, 现在这么多人,应该增加收银台,可以没有其他收银设备,老板再着急也没用。

老板看到在收银这里帮不了忙,后厨的订单也累积得越来越多, 赶紧跑到后厨亲自去做米线去了。

架构师又发话了:幸亏这个系统的后台有并行处理能力,可以随意地增加资源来处理请求(做米线)。

我说:他就这点儿资源了,除了老板没人再会做米线了。

不知不觉,我们等了20分钟, 但是米线还没上来。

架构师:你看,系统的处理能力达到极限,超时了吧。

这时候收银台前排队的人已经不多了,但是还有很多人在等米线。

老板跑过来让这个打扫卫生的去收银,让收银小妹也到后厨帮忙。打扫卫生的做收银也磕磕绊绊的,没有原来的小妹灵活。

架构师:这就叫服务降级,为了保证米线的服务,把别的服务都给关闭了。

又过了20分钟,后厨的厨师叫道:237号, 您点的排骨米线没有排骨了,能换成番茄的吗?

架构师低声对我说:瞧瞧, 人太多, 系统异常了。然后他站了起来:不行,系统得进行补偿操作:退费。

说完,他拉着我,饿着肚子,头也不回地走了。

同步

调用者要一直等待调用结果的通知后才能进行后续的执行,现在就要,我可以等,等出结果为止

异步

指被调用方先返回应答让调用者返回,然后再计算调用结果,计算完最终结果后再通知并返回给调用方。

异步调用要想获得结果一般通过回调

同步和异步的理解

同步、异步的讨论对象是被调用者(服务提供者),重点在于获得调用结果的消息通知方式上

阻塞

调用方一直在等待而且别的事情什么都不做,当前进程线程被挂起,啥也不干

非阻塞

调用在发出去后,调用方先去忙别的事情,不会阻塞当前进程线程,而会立即返回

阻塞和非阻塞的理解

阻塞和非阻塞的讨论对象是调用者(服务请求者)重点在于等消息时候的行为,带哦用着是否能干其它事

JAVA实践

背景

一个redisServer + 2个Client

BIO

当用户进程调用了recvfrom这个系统调用,kernel就开始了IO的第一个阶段:准备数据(对于网络IO来说,很多时候数据在一开始还没有到达。比如,还没有收到一个完整的UDP包。这个时候kernel就要等待足够的数据到来)。这个过程需要等待,也就是说数据被拷贝到操作系统内核的缓冲区中是需要一个过程的。而在用户进程这边,整个进程会被阻塞(当然,是进程自己选择的阻塞)。当kernel一直等到数据准备好了,它就会将数据从kernel中拷贝到用户内存,然后kernel返回结果,用户进程才解除block的状态,重新运行起来。所以,BIO的特点就是在IO执行的两个阶段都被block了。

accept监听

RedisServer

public class RedisServer
{
    public static void main(String[] args) throws IOException
    {
        byte[] bytes = new byte[1024];

        ServerSocket serverSocket = new ServerSocket(6379);

        while(true)
        {
            System.out.println("-----111 等待连接");
            Socket socket = serverSocket.accept();
            System.out.println("-----222 成功连接");
        }
    }
}

 

RedisClient01

public class RedisClient01
{
    public static void main(String[] args) throws IOException
    {
        System.out.println("------RedisClient01 start");
        Socket socket = new Socket("127.0.0.1", 6379);
    }
}

 

RedisClient02

public class RedisClient02
{
    public static void main(String[] args) throws IOException
    {
        System.out.println("------RedisClient02 start");
        Socket socket = new Socket("127.0.0.1", 6379);
    }
}

Read读取

先启动RedisServerBIO,再启动RedisClient01验证后再启动02客户端

RedisServerBIO

public class RedisServerBIO
{
    public static void main(String[] args) throws IOException
    {

        ServerSocket serverSocket = new ServerSocket(6379);

        while(true)
        {
            System.out.println("-----111 等待连接");
            Socket socket = serverSocket.accept();//阻塞1 ,等待客户端连接
            System.out.println("-----222 成功连接");

            InputStream inputStream = socket.getInputStream();
            int length = -1;
            byte[] bytes = new byte[1024];
            System.out.println("-----333 等待读取");
            while((length = inputStream.read(bytes)) != -1)//阻塞2 ,等待客户端发送数据
            {
                System.out.println("-----444 成功读取"+new String(bytes,0,length));
                System.out.println("====================");
                System.out.println();
            }
            inputStream.close();
            socket.close();
        }
    }
}

 

RedisClient01

public class RedisClient01
{
    public static void main(String[] args) throws IOException
    {
        Socket socket = new Socket("127.0.0.1",6379);
        OutputStream outputStream = socket.getOutputStream();

        //socket.getOutputStream().write("RedisClient01".getBytes());

        while(true)
        {
            Scanner scanner = new Scanner(System.in);
            String string = scanner.next();
            if (string.equalsIgnoreCase("quit")) {
                break;
            }
            socket.getOutputStream().write(string.getBytes());
            System.out.println("------input quit keyword to finish......");
        }
        outputStream.close();
        socket.close();
    }
}

 

RedisClient02

public class RedisClient02
{
    public static void main(String[] args) throws IOException
    {
        Socket socket = new Socket("127.0.0.1",6379);
        OutputStream outputStream = socket.getOutputStream();

        //socket.getOutputStream().write("RedisClient01".getBytes());

        while(true)
        {
            Scanner scanner = new Scanner(System.in);
            String string = scanner.next();
            if (string.equalsIgnoreCase("quit")) {
                break;
            }
            socket.getOutputStream().write(string.getBytes());
            System.out.println("------input quit keyword to finish......");
        }
        outputStream.close();
        socket.close();
    }
}

 

存在的问题

上面的模型存在很大的问题,如果客户端与服务端建立了连接,如果这个连接的客户端迟迟不发数据,程就会一直堵塞在read()方法上,这样其他客户端也不能进行连接,也就是一次只能处理一个客户端,对客户很不友好

知道问题所在了,请问如何解决??

解决

利用多线程

只要连接了一个socket,操作系统分配一个线程来处理,这样read()方法堵塞在每个具体线程上而不堵塞主线程,就能操作多个socket了,哪个线程中的socket有数据,就读哪个socket,各取所需,灵活统一。

程序服务端只负责监听是否有客户端连接,使用 accept() 阻塞

客户端1连接服务端,就开辟一个线程(thread1)来执行 read() 方法,程序服务端继续监听

客户端2连接服务端,也开辟一个线程(thread2)来执行 read() 方法,程序服务端继续监听

客户端3连接服务端,也开辟一个线程(thread3)来执行 read() 方法,程序服务端继续监听

。。。。。。

任何一个线程上的socket有数据发送过来,read()就能立马读到,cpu就能进行处理。

RedisServerBIOMultiThread

public class RedisServerBIOMultiThread
{
    public static void main(String[] args) throws IOException
    {
        ServerSocket serverSocket = new ServerSocket(6379);

        while(true)
        {
            //System.out.println("-----111 等待连接");
            Socket socket = serverSocket.accept();//阻塞1 ,等待客户端连接
            //System.out.println("-----222 成功连接");

            new Thread(() -> {
                try {
                    InputStream inputStream = socket.getInputStream();
                    int length = -1;
                    byte[] bytes = new byte[1024];
                    System.out.println("-----333 等待读取");
                    while((length = inputStream.read(bytes)) != -1)//阻塞2 ,等待客户端发送数据
                    {
                        System.out.println("-----444 成功读取"+new String(bytes,0,length));
                        System.out.println("====================");
                        System.out.println();
                    }
                    inputStream.close();
                    socket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            },Thread.currentThread().getName()).start();

            System.out.println(Thread.currentThread().getName());

        }
    }
}

 

RedisClient01

public class RedisClient01
{
    public static void main(String[] args) throws IOException
    {
        Socket socket = new Socket("127.0.0.1",6379);
        OutputStream outputStream = socket.getOutputStream();

        //socket.getOutputStream().write("RedisClient01".getBytes());

        while(true)
        {
            Scanner scanner = new Scanner(System.in);
            String string = scanner.next();
            if (string.equalsIgnoreCase("quit")) {
                break;
            }
            socket.getOutputStream().write(string.getBytes());
            System.out.println("------input quit keyword to finish......");
        }
        outputStream.close();
        socket.close();
    }
}

 

RedisClient02

public class RedisClient02
{
    public static void main(String[] args) throws IOException
    {
        Socket socket = new Socket("127.0.0.1",6379);
        OutputStream outputStream = socket.getOutputStream();

        //socket.getOutputStream().write("RedisClient01".getBytes());

        while(true)
        {
            Scanner scanner = new Scanner(System.in);
            String string = scanner.next();
            if (string.equalsIgnoreCase("quit")) {
                break;
            }
            socket.getOutputStream().write(string.getBytes());
            System.out.println("------input quit keyword to finish......");
        }
        outputStream.close();
        socket.close();
    }
}

 

存在问题

多线程模型

每来一个客户端,就要开辟一个线程,如果来1万个客户端,那就要开辟1万个线程。

在操作系统中用户态不能直接开辟线程,需要调用内核来创建的一个线程,这其中还涉及到用户状态的切换(上下文的切换),十分耗资源。

知道问题所在了,请问如何解决??

第一个办法:使用线程池

这个在客户端连接少的情况下可以使用,但是用户量大的情况下,你不知道线程池要多大,太大了内存可能不够,也不可行。

第二个办法:NIO(非阻塞式IO)方式

因为read()方法堵塞了,所有要开辟多个线程,如果什么方法能使read()方法不堵塞,这样就不用开辟多个线程了,这就用到了另一个IO模型,NIO(非阻塞式IO)

tomcat7之前就是用BIO多线程来解决多连接

两个痛点

accept

read

在阻塞式IO模型中,应用程序在从调用recvfrom开始到它返回有数据报准备好这段时间是阻塞的,recvfrom返回成功后,应用进程才能开始处理数据报。

思考:每个线程分配一个连接,必然会产生多个,既然是多个socket连接必然需要放入容器,纳入统一管理

   
次浏览       
相关文章

基于EA的数据库建模
数据流建模(EA指南)
“数据湖”:概念、特征、架构与案例
在线商城数据库系统设计 思路+效果
 
相关文档

Greenplum数据库基础培训
MySQL5.1性能优化方案
某电商数据中台架构实践
MySQL高扩展架构设计
相关课程

数据治理、数据架构及数据标准
MongoDB实战课程
并发、大容量、高性能数据库设计与优化
PostgreSQL数据库实战培训

最新活动计划
SysML和EA系统设计与建模 1-16[北京]
企业架构师(业务、应用、技术) 1-23[北京]
大语言模型(LLM)Fine Tune 2-22[在线]
MBSE(基于模型的系统工程)2-27[北京]
OpenGauss数据库调优实践 3-11[北京]
UAF架构体系与实践 3-25[北京]
 
 
最新文章
InfluxDB概念和基本操作
InfluxDB TSM存储引擎之数据写入
深度漫谈数据系统架构——Lambda architecture
Lambda架构实践
InfluxDB TSM存储引擎之数据读取
最新课程
Oracle数据库性能优化、架构设计和运行维护
并发、大容量、高性能数据库设计与优化
NoSQL数据库(原理、应用、最佳实践)
企业级Hadoop大数据处理最佳实践
Oracle数据库性能优化最佳实践
成功案例
某金融公司 Mysql集群与性能优化
北京 并发、大容量、高性能数据库设计与优化
知名某信息通信公司 NoSQL缓存数据库技术
北京 oracle数据库SQL优化
中国移动 IaaS云平台-主流数据库及存储技术