python多线程模块threading

简介

threading是python是基于线程的并行模块,它基于更底层的_thread模块构建。

Thread

Thread类表示在单独的线程中运行的活动。常用的方式有两种:

  • 传入一个callable的对象给参数target
  • 写一个继承Thread的子类,重写run方法。

线程同步

其他线程可以调用一个线程的join() 方法。这会阻塞调用该方法的线程,直到被调用join()方法的线程终结。

守护线程

定义:设置为daemon的线程会随着主线程的退出而结束,而非daemon线程会阻塞主线程的退出

1
threading.Thread(target=func, daemon=True)

详见Daemon is not daemon, but what is it?

线程本地数据

1
2
mydata = threading.local()
mydata.x = 1

Lock 锁

原始锁处于”锁定“或者”非锁定“两种状态之一。它被创建时为非锁定状态。它有两个基本方法,

  • acquire():
    • 当状态为非锁定时: acquire()将状态改为锁定 并立即返回。
    • 当状态是锁定时,acquire() 将阻塞至其他线程调用release()将其改为非锁定状态,然后 acquire() 调用重置其为锁定状态并返回。
  • release(): 只在锁定状态下调用; 它将状态改为非锁定并立即返回。如果尝试释放一个非锁定的锁,则会引发RuntimeError异常。

简单示例:

1
2
3
dataLock = threading.Lock()
with dataLock:
operate_your_data()

RLock 递归锁

重入锁是一个可以被同一个线程多次获取的同步基元组件。附加了 “所属线程“ 和 “递归等级“ 的概念。

Condition 条件对象

  • acquire():请求底层锁。
  • release():释放底层锁。
  • wait(timeout=None):等待直到被通知或发生超时。如果线程在调用此方法时没有获得锁,将会引发RuntimeError异常。
  • wait_for(predicate, timeout=None):等待,直到条件计算为真。
  • notify():默认唤醒一个等待这个条件的线程。
  • notify_all():唤醒所有正在等待这个条件的线程。

Semaphore 信号量

一个信号量管理一个内部计数器。

  • acquire():获取一个信号量,计数器-1。发现计数器为零时,将会阻塞,直到其它线程调用release() 方法。
  • release():释放一个信号量,计数器+1。

BoundedSemaphore(value=1):有界信号量通过检查以确保它当前的值不会超过初始值。

Event 事件

常用于线程间通信:一个线程发出事件信号,而其他线程等待该信号。一个事件对象管理一个内部标志

  • is_set():内部标志是否为True
  • set():将内部标志设为True
  • clear():将内部标志设为False
  • wait(timeout=None):阻塞线程知道内部标志为True

event实现一个生产线程和消费线程的通信:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import threading
import time


product_event = threading.Event()
print(f"product_event state: {product_event.is_set()}")


def produce():
print("start produce ...")
time.sleep(5)
print("product produced.")
product_event.set()
print(f"product_event state: {product_event.is_set()}")


def consume():
print("consumer start wait ...")
product_event.wait(timeout=10)
print(f"product_event state: {product_event.is_set()}")
product_event.clear()
print("product consumed.")
print(f"product_event state: {product_event.is_set()}")


t_pro = threading.Thread(name="producer", target=produce)
t_con = threading.Thread(name="consumer", target=consume)

t_pro.start()
t_con.start()

Timer 定时器

表示一个操作应该在等待一定的时间之后运行,是Thread的子类。

  • cancel():停止计时器并取消执行;仅在等待状态时有效,

简单的示例,五秒后自动拍照:

1
2
3
WAIT_TIME = 5
t = threading.Timer(WAIT_TIME, take_picture, ())
t.start()

Barrier 栅栏

栅栏类提供一个简单的步原语,用于应对固定数量的线程需要彼此相互等待的情况。线程调用wait()方法后将阻塞,直到所有线程都调用了wait()方法。此时所有线程将被同时释放

形象理解:像全班组织出去春游,大家都要到指定地点(栅栏)进行等待,等人到齐,一起出发。

  • wait(timeout=None):冲出栅栏。当栅栏中所有线程都已经调用了这个函数,它们将同时被释放。
  • reset():重置栅栏为默认的初始态。
  • abort():将栅栏置为破损态。会导致所有的active或者future等待失败,并收到BrokenBarrierError
  • parties:冲出栅栏所需要的线程数量。
  • n_waiting:当前时刻正在栅栏中阻塞的线程数量。
  • broken:表示栅栏是否为破损态。

with语句中使用锁、条件和信号量

1
2
with some_lock:
# do something...

相当于:

1
2
3
4
5
some_lock.acquire()
try:
# do something...
finally:
some_lock.release()

LockRLockConditionSemaphoreBoundedSemaphore都服从服从 上下文管理协议

threading特性

存活

  • 一个程序如果有一个子线程启动了,如果这个线程不是daemon,那么即便没有join()同步,也会等子线程运行完毕再退出;反之如果是daemon,那么只要主线程退出,子线程也会退出。
  • 子线程挂了不影响master线程,master线程可以通过t.is_alive()来判断该线程是否存活。

GIL

GIL全称是Global Interpreter Lock,即全局解释器锁。由于官方的cpython解释器不是完全线程安全的,因此为了支持多线程编程,便有了GIL。有了GIL,在同一时刻,一个解释器进程下只能有一个线程在运行。因此threading模块不能使你利用多核CPU,如果你想利用多核CPU,可以使用multiprocessing模块。

选用时机

  • IO密集型任务:虽然因为GIL,python中的多线程不是真正并行的多线程,但是如果是IO密集型的操作,使用多线程还是会提速不少。原因是一个线程在IO时被阻塞,会被切换到另一个线程。
  • 单个进程内的并发任务:如果你需要在单个进程内继续使用并发的特性,可以考虑使用threading

参考资料