Python异步编程

协程(Coroutines)的优点:协程由于由程序主动控制切换,没有线程切换的开销,所以执行效率极高。对于IO密集型任务非常适用,如果是cpu密集型,推荐多进程+协程的方式。

Generator

理解yield表达式

方法中包含yield表达式后,Python会将其视作generator对象,不再是普通的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def test():
print("generator start")
n = 1
while True:
yield_expression_value = yield n
print("yield_expression_value = %d" % yield_expression_value)
n += 1

# ①创建generator对象
generator = test()
print(type(generator))

print("\n---------------\n")

# ②启动generator
next_result = generator.__next__()
print("next_result = %d" % next_result)

print("\n---------------\n")

# ③发送值给yield表达式
send_result = generator.send(666)
print("send_result = %d" % send_result)

输出:

1
2
3
4
5
6
7
8
9
10
11
<class 'generator'>

---------------

generator start
next_result = 1

---------------

yield_expression_value = 666
send_result = 2

yield实现的功能是一个函数可以不止在一个地方return一次。我们可以再一个地方return一次,然后记住return的点,返回信息。下一次再调用的时候可以回到这个点,继续执行,到下一个return的点。这个功能恰好是实现一个协程管理器需要的,我们希望目标程序到可以等待的点的时候,先返回主程序,然后主程序主动选择下一个调用目标程序的时机,并回到之前返回的地方继续。

理解yield from表达式

用于将一个生成器部分操作委托给另一个生成器。此外,允许子生成器(即yield from后的“参数”)返回一个值,该值可供委派生成器(即包含yield from的生成器)使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 子生成器
def test(n):
i = 0
while i < n:
yield i
i += 1

# 委派生成器
def test_yield_from(n):
print("test_yield_from start")
yield from test(n)
print("test_yield_from end")

for i in test_yield_from(3):
print(i)

输出:

1
2
3
4
5
test_yield_from start
0
1
2
test_yield_from end

协程(Coroutine)

async/await

本质上是@asyncio.coroutineyield from语法糖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import asyncio

async def compute(x, y): # 定义一个协程
print("Compute %s + %s ..." % (x, y))
await asyncio.sleep(1.0)
return x + y

async def print_sum(x, y):
result = await compute(x, y)
print("%s + %s = %s" % (x, y, result))

print("start")
asyncio.run(print_sum(1, 2))
print("end")

输出:

1
2
3
4
start
Compute 1 + 2 ...
1 + 2 = 3
end

Awaitable对象

Coroutine/Task/Future

Coroutine

  • coroutine function: an async def function
  • coroutine object: an object returned by calling a coroutine function

Task

asyncio.create_task() 包装一个协程,随后可以被立即调度执行。

Future

Future是一种底层的awaitable对象,通常表示异步操作的最后结果。

其他核心组件

Semaphore

可以用于控制并发量。

Semaphore的核心代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
class Semaphore(_ContextManagerMixin, mixins._LoopBoundMixin):
"""A Semaphore implementation.

A semaphore manages an internal counter which is decremented by each
acquire() call and incremented by each release() call. The counter
can never go below zero; when acquire() finds that it is zero, it blocks,
waiting until some other thread calls release().

Semaphores also support the context management protocol.

The optional argument gives the initial value for the internal
counter; it defaults to 1. If the value given is less than 0,
ValueError is raised.
"""

def __init__(self, value=1, *, loop=mixins._marker):
super().__init__(loop=loop)
if value < 0:
raise ValueError("Semaphore initial value must be >= 0")
self._waiters = None
self._value = value

def __repr__(self):
res = super().__repr__()
extra = 'locked' if self.locked() else f'unlocked, value:{self._value}'
if self._waiters:
extra = f'{extra}, waiters:{len(self._waiters)}'
return f'<{res[1:-1]} [{extra}]>'

def locked(self):
"""Returns True if semaphore cannot be acquired immediately."""
return self._value == 0 or (
any(not w.cancelled() for w in (self._waiters or ())))

async def acquire(self):
"""Acquire a semaphore.

If the internal counter is larger than zero on entry,
decrement it by one and return True immediately. If it is
zero on entry, block, waiting until some other coroutine has
called release() to make it larger than 0, and then return
True.
"""
if not self.locked():
self._value -= 1
return True

if self._waiters is None:
self._waiters = collections.deque()
fut = self._get_loop().create_future()
self._waiters.append(fut)

# Finally block should be called before the CancelledError
# handling as we don't want CancelledError to call
# _wake_up_first() and attempt to wake up itself.
try:
try:
await fut
finally:
self._waiters.remove(fut)
except exceptions.CancelledError:
if not fut.cancelled():
self._value += 1
self._wake_up_next()
raise

if self._value > 0:
self._wake_up_next()
return True

def release(self):
"""Release a semaphore, incrementing the internal counter by one.

When it was zero on entry and another coroutine is waiting for it to
become larger than zero again, wake up that coroutine.
"""
self._value += 1
self._wake_up_next()

def _wake_up_next(self):
"""Wake up the first waiter that isn't done."""
if not self._waiters:
return

for fut in self._waiters:
if not fut.done():
self._value -= 1
fut.set_result(True)
return

class _ContextManagerMixin: # 异步的上下文管理器
async def __aenter__(self):
await self.acquire() # 如果锁住了就await
# We have no use for the "as ..." clause in the with
# statement for locks.
return None

async def __aexit__(self, exc_type, exc, tb):
self.release()

常用代码片

对一系列数据调用并发函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import asyncio

async def coroutine_func_1():
print('call func 1')
await asyncio.sleep(5)
print('func 1 done!')
return 1

async def coroutine_func_2():
print('call func 2')
await asyncio.sleep(5)
print('func 2 done!')
return 2

async def gather_data():
results = await asyncio.gather(*[coroutine_func_1(), coroutine_func_2()])
return results

results = asyncio.run(gather_data())
print(f"results: {results}")

结果:

1
2
3
4
5
call func 1
call func 2
func 1 done!
func 2 done!
results: [1, 2]

限制并发量

asyncio.Semaphore来实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import asyncio

async def coroutine_func_1(sem):
async with sem:
print('call func 1')
await asyncio.sleep(5)
print('func 1 done!')
return 1

async def coroutine_func_2(sem):
async with sem:
print('call func 2')
await asyncio.sleep(5)
print('func 2 done!')
return 2

async def coroutine_func_3(sem):
async with sem:
print('call func 3')
await asyncio.sleep(5)
print('func 3 done!')
return 3

async def gather_data():
sem = asyncio.Semaphore(2)
results = await asyncio.gather(*[coroutine_func_1(sem),
coroutine_func_2(sem),
coroutine_func_3(sem)])
return results

results = asyncio.run(gather_data())
print(f"results: {results}")

结果:

1
2
3
4
5
6
7
call func 1
call func 2
func 1 done!
func 2 done!
call func 3
func 3 done!
results: [1, 2, 3]

Future使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import asyncio

async def set_future_result(future, result):
await asyncio.sleep(1)
future.set_result(result)

async def main():
loop = asyncio.get_event_loop()
future = loop.create_future()

loop.create_task(set_future_result(future, "Hello, asyncio!"))

result = await future
print(f"Future result: {result}")

if __name__ == "__main__":
asyncio.run(main())

结果:

1
Future result: Hello, asyncio!

参考

理解Python协程(Coroutine)

Coroutines and Tasks