Python Asyncio 学习笔记

  1. 操作系统任务调度
  2. 进程、线程
  3. 协程
  4. Asyncio
  5. 结语
  6. 参考

操作系统任务调度

操作系统执行的任务基本可以分为:CPU 密集型、I/O 密集型。CPU 密集型任务会消耗大量的 CPU 计算资源,因此让操作系统调度任务的执行即可。而 I/O 密集型任务一般会涉及到硬盘 I/O、网络传输,大部分的时间在等待 I/O 的完成,因此出现了基于多任务系统的 CPU 任务调度。参考:IBM/调整 Linux I/O 调度器优化系统性能

在多任务系统中,操作系统接管了所有硬件资源并持有对硬件控制的最高权限。在操作系统中执行的程序,都以进程的方式运行在低的权限中。所有的硬件资源,由操作系统根据进程的优先级以及进程的运行状况进行统一的调度。

常见 Linux 操作系统抢占式任务处理(现代操作系统都支持抢占式多任务,包括 Windows、macOS、Linux(包括Android)和 iOS)

进程、线程

程序是一组指令的集合,程序运行时操作系统会将程序载入内存空间,在逻辑上产生一个单独的实例叫做进程(Process)。

随着多核 CPU 的发展,为了充分利用多核资源,需要进程内能并行地执行任务,因此产生了线程(Thread)的概念。

线程是操作系统进行任务调度的最小单元,线程存活于进程之中;同一个进程中的线程,共享一个虚拟内存空间;线程之间各自持有自己的线程 ID、当前指令的指针(PC)、寄存器集合以及栈。

线程和进程均由操作系统调度。

image.png

多线程的优势:

  1. 充分利用多核 CPU 资源(在 Python 中是不存在的);
  2. 将等待 I/O 操作的时间,调度到其他线程执行,提高 CPU 利用率;
  3. 将计算密集型的操作留给工作线程,预留线程保持与用户的交互;
  4. 同进程内多线程之间更加容易实现内存共享;

多线程从一定程度上提升了 CPU 资源的利用率,然而类似 C10K 等问题又开始让程序员对内核级别的上下文切换开销重视起来。

协程

协程让用户可以自主调度协程的运行状态(运行,挂起),协程可以看做是用户态线程,协程的目的在于让阻塞的 I/O 操作异步化。

一般子程序/函数的调用是按照顺序执行的,一个入口,一次返回。而协程可以在子程序 A 的调用过程中中断执行,转而调用另外一个子程序 B,在适当的时机再切回到子程序 A 继续执行,因此协程节省了多线程切换带来的开销问题,实现了在单线程中多线程的效果(当然,前提是各个子程序都是非阻塞的)。

协程拥有自己的寄存器上下文和栈,协程调度切换时,将寄存器上下文和栈保存起来,在切回来的时候,恢复之前保存的寄存器上下文和栈,这种直接切换操作栈的方式(context上下文切换),避开了内核切换的开销,可以不加锁的访问全局变量,切换速度快。

协程的优势:

  1. 比线程开销小;
  2. 单线程模型,线程安全避免了资源竞争;
  3. 代码逻辑清晰,同步的方式编写异步逻辑代码;

Asyncio

Python 在 3.4 中引入了协程的概念3.5 确定了协程的语法,Asyncio 基本概念:

  • Event Loop 事件循环:程序开启一个 While True 循环,用户将一些函数注册到事件循环上,当满足事件执行条件时,调用的协程函数;
  • Coroutine 协程对象:使用 asnc关键字定义的函数,它的调用不会立即执行函数,而是返回一个协程对象,协程对象需要注册到事件循环中,由事件循环负责调用;
  • Task:对协程对象的进一步封装,包括任务的各种状态;
  • Future:代表将来执行或没有执行的任务的结果,和 Task 没有本质的区别;
  • async:定义一个协程对象;
  • await:挂起阻塞的异步调用接口;

tips : 使用 Cython + libuv 实现的 uvloop 可以提升事件循环更多的性能:

1
2
3
4
5
6
7
8
import asyncio
import uvloop
# 声明使用 uvloop 事件循环
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
...
...
loop = uvloop.new_event_loop()
asyncio.set_event_loop(loop)

协程示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import asyncio

async def compute(x, y):
print("Compute %s + %s ..." % (x, y))
# 阻塞
await asyncio.sleep(1.0)
return x + y

async def print_sum(x, y):
result = await compute(x, y)
print("%s + %s = %s" % (x, y, result))

loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
# 注释掉 close 避免中断 notebook 执行
# loop.close()
Compute 1 + 2 ...
1 + 2 = 3

流程
image.png

使用 async 关键字定义一个协程(coroutine),协程是一个对象,直接调用并不会运行。可以通过在协程内部 await coroutine 或 yield from coroutine 运行,或者将协程加入到事件循环中让 EventLoop 调度执行。

Calling a coroutine does not start its code running – the coroutine object returned by the call doesn’t do anything until you schedule its execution. There are two basic ways to start it running: call await coroutine
or yield from coroutine
from another coroutine (assuming the other coroutine is already running!), or schedule its execution using the ensure_future() function or the AbstractEventLoop.create_task() method.
Coroutines (and tasks) can only run when the event loop is running.

定义一个协程(Coroutine)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import time
import asyncio

# 定义协程
async def test(x):
print("wait:", x)
await asyncio.sleep(x)

start = time.time()

coroutine = test(1)

# 获取事件循环
loop = asyncio.get_event_loop()
loop.run_until_complete(coroutine)

print("time:", time.time() - start)

# 输出:
# wait: 1
# time: 1.0050649642944336
wait: 1
time: 1.0018291473388672

定义一个任务(Task、Future)

Future 对象保存了协程的状态,可以用来获取协程的执行返回结果。
asyncio.ensure_future(coroutine)loop.create_task(coroutine) 都可以创建任务,run_until_complete 的参数是一个 futrue 对象。当传入一个协程方法时,其内部会自动封装成task,task是Future的子类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import time
import asyncio

# 定义协程
async def test(x):
print("wait:", x)
await asyncio.sleep(x)

start = time.time()

coroutine = test(1)

loop = asyncio.get_event_loop()
# future
# task = asyncio.ensure_future(coroutine)
# 显式创建任务:task 是 future 的子类
task = loop.create_task(coroutine)

print(task)
loop.run_until_complete(task)
print(task)
print("time:", time.time() - start)

# <Task pending coro=<test() running at xxx>>
# wait: 1
# <Task finished coro=<test() done, defined at xxx> result=None>
# time: 1.006286859512329
<Task pending coro=<test() running at <ipython-input-3-99e99d78180e>:5>>
wait: 1
<Task finished coro=<test() done, defined at <ipython-input-3-99e99d78180e>:5> result=None>
time: 1.0028810501098633

绑定回调获取结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import time
import asyncio

# 定义协程
async def test(x):
print("wait:", x)
await asyncio.sleep(x)
return "done of {}".format(x)

def callback(future):
print("callback:", future.result())

start = time.time()

coroutine = test(1)
loop = asyncio.get_event_loop()
task = loop.create_task(coroutine)
# 回调
task.add_done_callback(callback)
loop.run_until_complete(task)

# 直接获取
print("result:", task.result())
print("time:", time.time() - start)

# wait: 1
# callback: done of 1
# result: done of 1
# time: 1.0015690326690674
wait: 1
callback: done of 1
result: done of 1
time: 1.001621961593628

并发、并发控制

多个协程注册到事件循环中,当执行某一个协程时在任务阻塞的时候用 await 挂起,其他协程继续工作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import time
import asyncio

async def test(x):
print("wait:", x)
await asyncio.sleep(x)
return "done of {}".format(x)

start = time.time()

# sleep 1s 2s 3s
coroutine1 = test(1)
coroutine2 = test(2)
coroutine3 = test(3)

loop = asyncio.get_event_loop()

task = [
loop.create_task(coroutine1),
loop.create_task(coroutine2),
loop.create_task(coroutine3)
]

# wait方式
# run_task = asyncio.wait(task)
# gather 能保证有序的结果返回
run_task = asyncio.gather(*task)

loop.run_until_complete(run_task)

for t in task:
print("task result:", t.result())

print("time:", time.time() - start)

# 输出:
# wait: 1
# wait: 2
# wait: 3
# task result: done of 1
# task result: done of 2
# task result: done of 3
# time: 3.0037271976470947
wait: 1
wait: 2
wait: 3
task result: done of 1
task result: done of 2
task result: done of 3
time: 3.0026791095733643

通过 Semaphore 信号量机制控制并发数量
通过 await 再调用另外一个协程,这样可以实现协程的嵌套

  • await asyncio.gather(*task)
  • await asyncio.wait(task)
  • asyncio.as_completed(task)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import time
import asyncio
import aiohttp

URL = "https://www.baidu.com"

# 设置并发数:3
sema = asyncio.Semaphore(3)

cookie_jar = aiohttp.CookieJar(unsafe=True)
session = None

async def fetcher(url, index):
"""
通过 aiohttp 非阻塞的方式访问 URL 资源
"""
async with session.get(url) as resp:
print("start fetch index:{}".format(index))
# 假装多卡1秒
await asyncio.sleep(1)
return await resp.text()

async def worker(url, index):
"""
Semaphore信号量机制控制并发
"""
with (await sema):
resp = await fetcher(url, index)
return ("index:", index, len(resp), time.time())

async def dispatch(task_list):
"""
派发下载任务
"""
# init session
global session
session = aiohttp.ClientSession(cookie_jar=cookie_jar)
# send task
tasks = [asyncio.ensure_future(worker(URL, t)) for t in task_list]
for task in asyncio.as_completed(tasks):
resp = await task
print(resp)
# release session
session.close()

start = time.time()

loop = asyncio.get_event_loop()
coroutine = dispatch(range(5))
loop.run_until_complete(coroutine)
print("total time:", time.time() - start)

# 输出:
# start fetch index:2
# start fetch index:1
# start fetch index:0
# ('index:', 2, 227, 1508508870.628295)
# ('index:', 1, 227, 1508508870.642124)
# ('index:', 0, 227, 1508508870.6424)
# start fetch index:4
# start fetch index:3
# ('index:', 4, 227, 1508508871.736131)
# ('index:', 3, 227, 1508508871.737195)
# total time: 2.2324538230895996
start fetch index:0
start fetch index:2
start fetch index:1
('index:', 0, 227, 1509678077.006072)
('index:', 2, 227, 1509678077.0073102)
('index:', 1, 227, 1509678077.0080798)
start fetch index:4
start fetch index:3
('index:', 4, 227, 1509678078.032135)
('index:', 3, 227, 1509678078.032223)
total time: 2.2896299362182617

协程停止

Future 对象状态:

  • pending
  • running
  • waiting (瞎蒙的)
  • done
  • canceled

Future 对象在协程创建之后状态为 pending,事件循环调度执行协程时状态变为 running,想要停止协程,调用 future.cancel() 即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import time
import asyncio

async def test(x):
print("wait:", x)
await asyncio.sleep(x)
return "done of {}".format(x)

coroutine1 = test(1)
coroutine2 = test(2)
coroutine3 = test(3)

loop = asyncio.get_event_loop()

task = [
loop.create_task(coroutine1),
loop.create_task(coroutine2),
loop.create_task(coroutine3),
]

start = time.time()

try:
loop.run_until_complete(asyncio.wait(task))
except KeyboardInterrupt:
print(asyncio.Task.all_tasks())
print(asyncio.gather(*asyncio.Task.all_tasks()).cancel())
loop.stop()
loop.run_forever()
finally:
loop.close()

print("time:", time.time() - start)

# 输出:
# wait: 1
# wait: 10
# wait: 15
# {<Task pending coro=<test() running at a.py:6> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at /usr/local/var/pyenv/versions/3.5.3/Python.framework/Versions/3.5/lib/python3.5/asyncio/tasks.py:422]>, <Task pending coro=<test() running at a.py:6> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at /usr/local/var/pyenv/versions/3.5.3/Python.framework/Versions/3.5/lib/python3.5/asyncio/tasks.py:422]>, <Task pending coro=<wait() running at /usr/local/var/pyenv/versions/3.5.3/Python.framework/Versions/3.5/lib/python3.5/asyncio/tasks.py:355> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_run_until_complete_cb() at /usr/local/var/pyenv/versions/3.5.3/Python.framework/Versions/3.5/lib/python3.5/asyncio/base_events.py:176]>, <Task finished coro=<test() done, defined at a.py:4> result='done of 1'>}
# True
# time: 1.4758961200714111
wait: 1
wait: 2
wait: 3
time: 3.0019960403442383

结语

Asyncio 对于熟悉 Tornado 或 Twisted 等异步框架的同学上手起来会很快,编程风格也可以很”同步化”。目前我们仅在生产环境尝试了 asyncio + aiohttp 作为网络采集的解决方案,初步使用下来感觉还是挺稳定的,并且避免了之前使用 Gevent Monkey Patch 的侵入式改动,Aysncio 还有更多的场景等待我们去发掘(比如 aiohttp 作为 Web 服务)。

目前 Github 开源的部分支持异步非阻塞的 aio 库,链接:https://github.com/aio-libs

对于新事物,永远保持一颗探索的心,共勉。

参考

https://docs.python.org/3/library/asyncio.html

https://liam0205.me/2017/01/17/layers-and-operation-system/

https://segmentfault.com/a/1190000003063859