小步慢跑,聊聊Python中的多线程
在开始今天的话题之前,简单的来看有关Python的体系结构。为了方便起见我做一张导图,让大家有个宏观的认识。
今天本来准备全面的聊聊有关高性能并发这个话题来着,但是周末马上要来了啊。所以我就取了其中的一点来介绍,关于其他的方面,有兴趣的小伙伴可以和我交流。谈高效并发,往往脱离不了以下三种方案:
1.进程:每个逻辑控制流都是一个进程,由内核来调度和维护。因为进程有独立的虚拟地址空间,想要和其他控制流通信必须依靠显示的进程间通信,即我们所说的IPC机制
2.线程:线程应该是我们最为熟知的。它本质是运行在一个单一进程上下文中的逻辑流,由内核进行调度。
3. I/O多路复用:应用程序在一个进程的上下文中显式地调度他们自己的逻辑流.逻辑流被模型化为状态机,数据到达文件描述符之后,主程序显式地从一个状态转换为另一个状态。因为程序都是以一个单独的进程,所以所有的流都共享同一个地址空间。基本的思路就是使用select函数要求内核挂起进程,只有一个或多个I/O事件发生后,才将控制权返回给应用程序。
看起来令人难以理解,但幸运的是Python中针对这三方面都提供了响应的支持,简化了我们的操作。那今天咱就聊聊其中的一点–线程。为什么选择线程呢?一方面考虑到大部分人都有线程这个概念,另一方面考虑相比进程线程更轻量级,相比协程,线程更易于理解。进程和线程之间的关系可以用衣服最简单的图来表示:
线程的状态
任何一门支持线程的语言都可以具备以下几种运行状态,无论是你做Java、Python还是C,首先来看下面一张图:
在这里我简单来解释以下这几种状态的含义:
新建:使用线程的第一步就是创建线程,创建后的线程只是进入可执行的状态,也就是Runnable
Runnable:进入此状态的线程还并未开始运行,一旦CPU分配时间片给这个线程后,该线程才正式的开始运行
Running:线程正式开始运行,在运行过程中线程可能会进入阻塞的状态,即Blocked
Blocked:在该状态下,线程暂停运行,解除阻塞后,线程会进入Runnable状态,等待CPU再次分配时间片给它
结束:线程方法执行完毕或者因为异常终止返回
其中最复杂的是线程从Running进入Blocked状态,通常有三种情况:
睡眠:线程主动调用sleep()或join()方法后
等待:线程中调用wait()方法,此时需要有其他线程通过notify()方法来唤醒
同步:线程中获取线程锁,但是因为资源已经被其他线程占用时
到现在,我们对线程有个基本的概念,光说不练假把式,下面我们就通过是三个小的示例来聊聊线程的使用以及线程中最终的两个概念:同步和通信。
线程简单使用
Python当中要实现多线程有两种方式:一种是使用低级的_thread模块,另一种高级threading模块,相比而言,我推荐使用threading模块。另外我只推荐用于多线程用于处理有关I/O的操作,不然反而造成性能下降。在开始之前呢,先来了解下threading模块给我提供哪些常用的类:
Thread,Lock,RLock,Condition,Event,Semaphore,Timer和Local。这几个类可谓开发多线程中的神兵利器。但是介于篇幅,咱就不展开讲了。
我们直接来看如何使用多线程,这才是至关重要的,有句老话是这么说的:要想让小孩子跑得先让他学会走。我们这就走两步:
import threading def run(number): print(threading.currentThread().getName() + '\n') print(number) if name == '__main': for i in range(10): #创建线程 mythread = threading.Thread(target=run, args=(i,)) mythread.start() |
多线程的创建和运行都是套路啊,写的多了自然熟了,来看看运行结果:
Thread-1,value=0 Thread-2,value=1 Thread-3,value=2 Thread-4,value=3 Thread-5,value=4 Thread-6,value=5 Thread-7,value=6 Thread-8,value=7 Thread-9,value=8 Thread-10,value=9 |
同步与通信
多线程开发中最难的问题不是如何使用,而是如何写出正确高效的代码,要写出正确而高效的代码必须要理解两个很重要的概念:同步和通信。所谓的通信指的是线程之间如何交换消息,而同步则用于控制不同线程之间操作发生的相对顺序。简单点说同步就是控制多个线程访问代码的顺序,通信就是线程之间如何传递消息。在python中实现同步的最简单的方案就是使用锁机制,实现通信最简单的方案就是Event。下面就来看看这两者的具体使用。
线程同步
当多个线程同时访问同一资源的时候,就会发生竞争,这有点像很多个男性都在追同一个妹纸一样,结果是不可预期的。因此有必要使用某种机制来保证每个男生都有机会和女生相处,这有点像将小姑娘放在一间房子里,然后进去的男生锁上门。下一个男生要想进去。必须等待上一个男生出来。只不过在这里叫线程锁。
Python的threading模块为我们提供了线程锁功能,在threading中提供RLock对象,RLock对象内部维护着一个Lock对象,它是一种可重入锁。对于Lock对象而言,如果一个线程连续两次进行acquire操作,那么由于第一次acquire之后没有release,第二次acquire将挂起线程。这会导致Lock对象永远不会release,使得线程死锁。而RLock对象允许一个线程多次对其进行acquire操作,因为在其内部通过一个counter变量维护着线程acquire的次数。而且每一次的acquire操作必须有一个release操作与之对应,在所有的release操作完成之后,别的线程才能申请该RLock对象。
通过锁机制,最终多线程访问共享资源的过程就类似以下:
上图其实演示了在使用锁来解决线程同步最本质的一点:将所有线程对共享资源的读写操作串行化。
import threading
mylock = threading.RLock()
num = 0
class WorkThread(threading.Thread):
def __init__(self, name):
threading.Thread.__init__(self)
self.t_name = name
def run(self):
global num
while True:
mylock.acquire()
print('\n%s locked, number: %d' % (self.t_name,
num))
if num >= 4:
mylock.release()
print('\n%s released, number: %d' % (self.t_name,
num))
break
num += 1
print('\n%s released, number: %d' % (self.t_name,
num))
mylock.release()
def test():
thread1 = WorkThread('A-Worker')
thread2 = WorkThread('B-Worker')
thread1.start()
thread2.start()
if __name__ == '__main__':
test()
|
来看看运行结果:
A-Worker locked, number: 0
A-Worker released, number: 1
A-Worker locked, number: 1
A-Worker released, number: 2
A-Worker locked, number: 2
A-Worker released, number: 3
A-Worker locked, number: 3
A-Worker released, number: 4
A-Worker locked, number: 4
A-Worker released, number: 4
B-Worker locked, number: 4
B-Worker released, number: 4 |
有些同学会问除了Lock和RLock还有其他的方式来实现类似的效果么?当然,比如Condition和Semaphore都有类似的功能,其中Condition是在Lock/RLock的基础上再次包装而成,而Semaphore的原理和操作系统的PV操作一致。之所以不细说的原因在于基本他们的基本使用和原理并无本质区别。我个人也一直认为越复杂的东西背后越是有简单的原理,当然欢迎有兴趣的同学和我进行探讨。
线程通信
在很多时候,我们需要在线程间传递消息,也叫作线程通信。Python中提供的Event就是最简单的通信机制之一。使用threading.Event可以使一个线程等待其他线程的通知,我们把这个Event传递到线程对象中,Event默认内置了一个标志,初始值为False。一旦该线程通过wait()方法进入等待状态,直到另一个线程调用该Event的set()方法将内置标志设置为True时,该Event会通知所有等待状态的线程恢复运行。先来看看Event中一些常用的方法:
来看个简单示例,我们暂且假设你有6个妹纸需要叫她们起床,这时候你该怎么做呢?
import threading import time
class WorkThread(threading.Thread):
def __init__(self, signal):
threading.Thread.__init__(self)
self.singal = signal
def run(self):
print("妹纸 %s,睡觉了 ..." % self.name)
self.singal.wait()
print("妹纸 %s, 起床..." % self.name)
if __name__ == "__main__":
singal = threading.Event()
for t in range(0, 6):
thread = WorkThread(singal)
thread.start()
print("三秒钟后叫妹纸起床 ")
time.sleep(3)
#唤醒阻塞中的妹纸
singal.set() |
这里的的你就充当了主线程,每个妹纸就是一个子线程,不出意外三秒之后你就会按时唤醒所有的妹纸了:
妹纸 Thread-1,睡觉了 ... 妹纸 Thread-2,睡觉了 ... 妹纸 Thread-3,睡觉了 ... 妹纸 Thread-4,睡觉了 ... 妹纸 Thread-5,睡觉了 ... 妹纸 Thread-6,睡觉了 ... 三秒钟后叫妹纸起床 妹纸 Thread-1, 起床... 妹纸 Thread-2, 起床... 妹纸 Thread-5, 起床... 妹纸 Thread-4, 起床... 妹纸 Thread-3, 起床... 妹纸 Thread-6, 起床... |
使用Event实现线程通信通信固然可以,但是另一种进行线程通信的方式是借助队列,也就是Queue。在python的标准库中提供了线程安全的队列,基于FIFO(先进先出)实现,可以方便的帮助我们实现线程间的消息传递,使用非常简单,其原理也不难,用一张简单的图展示:
另外,凡是符合该种结构的多线程通信过程我们称之为生产者-消费者模型。
线程池
其实,有关多线程的使用时非常简单的,更多的是根据具体的业务情况编写相应的逻辑.初次之外,考虑处理器的资源毕竟是有限的,不能一味的创建线程,我曾看到有些小伙伴在写爬虫的时候,100个url就创建了100个线程,其后果可想而知,因此当有需求要用到很多线程时,考虑使用线程池技术。
分享结束后,同学们还向刘冬请教了在工作学习中遇到的相关问题,比如
1.生产者消费者模式是不是类似于发布者订阅者模式?
2.请问已经存在全局锁了,为什么还会有threading.Lock等?
不明白全局锁的作用是什么
3.单线程的IO复用(REACTOR)和多线程版的IO复用区别在哪
4.关于Thread类,target参数,传入的函数如果有返回值如何处理 |