# 10.10 【并发编程】深入异步IO框架:asyncio 中篇
今天的内容其实还挺多的,我准备了三天,到今天才整理完毕。希望大家看完,有所收获的,能给小明一个赞。这就是对小明最大的鼓励了。
为了更好地衔接这一节,我们先来回顾一下上一节的内容。
上一节,我们首先介绍了,如何创建一个协程对象.
主要有两种方法
- 通过`async`关键字,
- 通过`@asyncio.coroutine` 装饰函数。
然后有了协程对象,就需要一个事件循环容器来运行我们的协程。其主要的步骤有如下几点:
- 将协程对象转为task任务对象
- 定义一个事件循环对象容器用来存放task
- 将task任务扔进事件循环对象中并触发
为了让大家,对生成器和协程有一个更加清晰的认识,我还介绍了`yield`和`async/await`的区别。
最后,我们还讲了,如何给一个协程添加回调函数。
好了,用个形象的比喻,上一节,其实就只是讲了协程中的`单任务`。哈哈,是不是还挺难的?希望大家一定要多看几遍,多敲代码,不要光看。
那么这一节,我们就来看下,协程中的`多任务`。
## 1. 协程中的并发
协程的并发,和线程一样。举个例子来说,就好像 一个人同时吃三个馒头,咬了第一个馒头一口,就得等这口咽下去,才能去啃第其他两个馒头。就这样交替换着吃。
`asyncio`实现并发,就需要多个协程来完成任务,每当有任务阻塞的时候就await,然后其他协程继续工作。
第一步,当然是创建多个协程的列表。
```python
# 协程函数
async def do_some_work(x):
print('Waiting: ', x)
await asyncio.sleep(x)
return 'Done after {}s'.format(x)
# 协程对象
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)
# 将协程转成task,并组成list
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
```
第二步,如何将这些协程注册到事件循环中呢。
有两种方法,至于这两种方法什么区别,稍后会介绍。
- 使用`asyncio.wait()`
```python
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
```
- 使用`asyncio.gather()`
```python
# 千万注意,这里的 「*」 不能省略
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(*tasks))
```
最后,return的结果,可以用`task.result()`查看。
```python
for task in tasks:
print('Task ret: ', task.result())
```
完整代码如下
```python
import asyncio
# 协程函数
async def do_some_work(x):
print('Waiting: ', x)
await asyncio.sleep(x)
return 'Done after {}s'.format(x)
# 协程对象
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)
# 将协程转成task,并组成list
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
for task in tasks:
print('Task ret: ', task.result())
```
输出结果
```python
Waiting: 1
Waiting: 2
Waiting: 4
Task ret: Done after 1s
Task ret: Done after 2s
Task ret: Done after 4s
```
## 2. 协程中的嵌套
使用async可以定义协程,协程用于耗时的io操作,我们也可以封装更多的io操作过程,这样就实现了嵌套的协程,即一个协程中await了另外一个协程,如此连接起来。
来看个例子。
```python
import asyncio
# 用于内部的协程函数
async def do_some_work(x):
print('Waiting: ', x)
await asyncio.sleep(x)
return 'Done after {}s'.format(x)
# 外部的协程函数
async def main():
# 创建三个协程对象
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)
# 将协程转为task,并组成list
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
# 【重点】:await 一个task列表(协程)
# dones:表示已经完成的任务
# pendings:表示未完成的任务
dones, pendings = await asyncio.wait(tasks)
for task in dones:
print('Task ret: ', task.result())
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
```
如果这边,使用的是`asyncio.gather()`,是这么用的
```python
# 注意这边返回结果,与await不一样
results = await asyncio.gather(*tasks)
for result in results:
print('Task ret: ', result)
```
输出还是一样的。
```
Waiting: 1
Waiting: 2
Waiting: 4
Task ret: Done after 1s
Task ret: Done after 2s
Task ret: Done after 4s
```
仔细查看,可以发现这个例子完全是由 上面「`协程中的并发`」例子改编而来。结果完全一样。只是把创建协程对象,转换task任务,封装成在一个协程函数里而已。外部的协程,嵌套了一个内部的协程。
其实你如果去看下`asyncio.await()`的源码的话,你会发现下面这种写法
```python
loop.run_until_complete(asyncio.wait(tasks))
```
看似没有嵌套,实际上内部也是嵌套的。
这里也把源码,贴出来,有兴趣可以看下,没兴趣,可以直接跳过。
```python
# 内部协程函数
async def _wait(fs, timeout, return_when, loop):
assert fs, 'Set of Futures is empty.'
waiter = loop.create_future()
timeout_handle = None
if timeout is not None:
timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
counter = len(fs)
def _on_completion(f):
nonlocal counter
counter -= 1
if (counter <= 0 or
return_when == FIRST_COMPLETED or
return_when == FIRST_EXCEPTION and (not f.cancelled() and
f.exception() is not None)):
if timeout_handle is not None:
timeout_handle.cancel()
if not waiter.done():
waiter.set_result(None)
for f in fs:
f.add_done_callback(_on_completion)
try:
await waiter
finally:
if timeout_handle is not None:
timeout_handle.cancel()
done, pending = set(), set()
for f in fs:
f.remove_done_callback(_on_completion)
if f.done():
done.add(f)
else:
pending.add(f)
return done, pending
# 外部协程函数
async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
if futures.isfuture(fs) or coroutines.iscoroutine(fs):
raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
if not fs:
raise ValueError('Set of coroutines/Futures is empty.')
if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
raise ValueError(f'Invalid return_when value: {return_when}')
if loop is None:
loop = events.get_event_loop()
fs = {ensure_future(f, loop=loop) for f in set(fs)}
# 【重点】:await一个内部协程
return await _wait(fs, timeout, return_when, loop)
```
## 3. 协程中的状态
还记得我们在讲生成器的时候,有提及过生成器的状态。同样,在协程这里,我们也了解一下协程(准确的说,应该是Future对象,或者Task任务)有哪些状态。
>`Pending`:创建future,还未执行
>`Running`:事件循环正在调用执行任务
>`Done`:任务执行完毕
>`Cancelled`:Task被取消后的状态
可手工 `python3 xx.py` 执行这段代码,
```python
import asyncio
import threading
import time
async def hello():
print("Running in the loop...")
flag = 0
while flag < 1000:
with open("F:\\test.txt", "a") as f:
f.write("------")
flag += 1
print("Stop the loop")
if __name__ == '__main__':
coroutine = hello()
loop = asyncio.get_event_loop()
task = loop.create_task(coroutine)
# Pending:未执行状态
print(task)
try:
t1 = threading.Thread(target=loop.run_until_complete, args=(task,))
# t1.daemon = True
t1.start()
# Running:运行中状态
time.sleep(1)
print(task)
t1.join()
except KeyboardInterrupt as e:
# 取消任务
task.cancel()
# Cacelled:取消任务
print(task)
finally:
print(task)
```
顺利执行的话,将会打印 `Pending` -> `Pending:Runing` -> `Finished` 的状态变化
假如,执行后 立马按下 Ctrl+C,则会触发task取消,就会打印 `Pending` -> `Cancelling` -> `Cancelling` 的状态变化。
## 4. gather与wait
还记得上面我说,把多个协程注册进一个事件循环中有两种方法吗?
- 使用`asyncio.wait()`
```python
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
```
- 使用`asyncio.gather()`
```python
# 千万注意,这里的 「*」 不能省略
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(*tasks))
```
`asyncio.gather` 和 `asyncio.wait` 在asyncio中用得的比较广泛,这里有必要好好研究下这两货。
还是照例用例子来说明,先定义一个协程函数
```python
import asyncio
async def factorial(name, number):
f = 1
for i in range(2, number+1):
print("Task %s: Compute factorial(%s)..." % (name, i))
await asyncio.sleep(1)
f *= i
print("Task %s: factorial(%s) = %s" % (name, number, f))
```
## 5. 接收参数方式
### asyncio.wait
接收的tasks,必须是一个list对象,这个list对象里,存放多个的task。
它可以这样,用`asyncio.ensure_future`转为task对象
```python
tasks=[
asyncio.ensure_future(factorial("A", 2)),
asyncio.ensure_future(factorial("B", 3)),
asyncio.ensure_future(factorial("C", 4))
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
```
也可以这样,不转为task对象。
```python
loop = asyncio.get_event_loop()
tasks=[
factorial("A", 2),
factorial("B", 3),
factorial("C", 4)
]
loop.run_until_complete(asyncio.wait(tasks))
```
### asyncio.gather
接收的就比较广泛了,他可以接收list对象,但是 `*` 不能省略
```python
tasks=[
asyncio.ensure_future(factorial("A", 2)),
asyncio.ensure_future(factorial("B", 3)),
asyncio.ensure_future(factorial("C", 4))
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(*tasks))
```
还可以这样,和上面的 `*` 作用一致,这是因为`asyncio.gather()`的第一个参数是 `*coros_or_futures`,它叫 `非命名键值可变长参数列表`,可以集合所有没有命名的变量。
```python
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(
factorial("A", 2),
factorial("B", 3),
factorial("C", 4),
))
```
甚至还可以这样
```python
loop = asyncio.get_event_loop()
group1 = asyncio.gather(*[factorial("A" ,i) for i in range(1, 3)])
group2 = asyncio.gather(*[factorial("B", i) for i in range(1, 5)])
group3 = asyncio.gather(*[factorial("B", i) for i in range(1, 7)])
loop.run_until_complete(asyncio.gather(group1, group2, group3))
```
## 6. 返回结果不同
### asyncio.wait
`asyncio.wait` 返回`dones`和`pendings`
- `dones`:表示已经完成的任务
- `pendings`:表示未完成的任务
如果我们需要获取,运行结果,需要手工去收集获取。
```python
dones, pendings = await asyncio.wait(tasks)
for task in dones:
print('Task ret: ', task.result())
```
### asyncio.gather
`asyncio.gather` 它会把值直接返回给我们,不需要手工去收集。
```python
results = await asyncio.gather(*tasks)
for result in results:
print('Task ret: ', result)
```
## 7. wait有控制功能
```python
import asyncio
import random
async def coro(tag):
await asyncio.sleep(random.uniform(0.5, 5))
loop = asyncio.get_event_loop()
tasks = [coro(i) for i in range(1, 11)]
# 【控制运行任务数】:运行第一个任务就返回
# FIRST_COMPLETED :第一个任务完全返回
# FIRST_EXCEPTION:产生第一个异常返回
# ALL_COMPLETED:所有任务完成返回 (默认选项)
dones, pendings = loop.run_until_complete(
asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED))
print("第一次完成的任务数:", len(dones))
# 【控制时间】:运行一秒后,就返回
dones2, pendings2 = loop.run_until_complete(
asyncio.wait(pendings, timeout=1))
print("第二次完成的任务数:", len(dones2))
# 【默认】:所有任务完成后返回
dones3, pendings3 = loop.run_until_complete(asyncio.wait(pendings2))
print("第三次完成的任务数:", len(dones3))
loop.close()
```
输出结果
```python
第一次完成的任务数: 1
第二次完成的任务数: 4
第三次完成的任务数: 5
```
- 第一章:安装运行
- 1.1 【环境】快速安装 Python 解释器
- 1.2 【环境】Python 开发环境的搭建
- 1.3 【基础】两种运行 Python 程序方法
- 第二章:数据类型
- 2.1 【基础】常量与变量
- 2.2 【基础】字符串类型
- 2.3 【基础】整数与浮点数
- 2.4 【基础】布尔值:真与假
- 2.5 【基础】学会输入与输出
- 2.6 【基础】字符串格式化
- 2.6 【基础】运算符(超全整理)
- 第三章:数据结构
- 3.1 【基础】列表
- 3.2 【基础】元组
- 3.3 【基础】字典
- 3.4 【基础】集合
- 3.5 【基础】迭代器
- 3.6 【基础】生成器
- 第四章:控制流程
- 4.1 【基础】条件语句:if
- 4.2 【基础】循环语句:for
- 4.3 【基础】循环语句:while
- 4.4 【进阶】五种推导式
- 第五章:学习函数
- 5.1 【基础】普通函数
- 5.2 【基础】匿名函数
- 5.3 【基础】高阶函数
- 5.4 【基础】反射函数
- 5.5 【基础】偏函数
- 5.6 【进阶】泛型函数
- 5.7 【基础】变量的作用域
- 5.8 【进阶】上下文管理器
- 5.9 【进阶】装饰器的六种写法
- 第六章:错误异常
- 6.1 【基础】什么是异常?
- 6.2 【基础】如何抛出和捕获异常?
- 6.3 【基础】如何自定义异常?
- 6.4 【进阶】如何关闭异常自动关联上下文?
- 6.5 【进阶】异常处理的三个好习惯
- 第七章:类与对象
- 7.1 【基础】类的理解与使用
- 7.2 【基础】静态方法与类方法
- 7.3 【基础】私有变量与私有方法
- 7.4 【基础】类的封装(Encapsulation)
- 7.5 【基础】类的继承(Inheritance)
- 7.6 【基础】类的多态(Polymorphism)
- 7.7 【基础】类的 property 属性
- 7.8 【进阶】类的 Mixin 设计模式
- 7.9 【进阶】类的魔术方法(超全整理)
- 7.10 【进阶】神奇的元类编程(metaclass)
- 7.11 【进阶】深藏不露的描述符(Descriptor)
- 第八章:包与模块
- 8.1 【基础】什么是包、模块和库?
- 8.2 【基础】安装第三方包的八种方法
- 8.3 【基础】导入单元的构成
- 8.4 【基础】导入包的标准写法
- 8.5 【进阶】常规包与空间命名包
- 8.6 【进阶】花式导包的八种方法
- 8.7 【进阶】包导入的三个冷门知识点
- 8.8 【基础】pip 的超全使用指南
- 8.9 【进阶】理解模块的缓存
- 8.10 【进阶】理解查找器与加载器
- 8.11 【进阶】实现远程导入模块
- 8.12 【基础】分发工具:distutils和setuptools
- 8.13 【基础】源码包与二进制包有什么区别?
- 8.14 【基础】eggs与wheels 有什么区别?
- 8.15 【进阶】超详细讲解 setup.py 的编写
- 8.16 【进阶】打包辅助神器 PBR 是什么?
- 8.17 【进阶】开源自己的包到 PYPI 上
- 第九章:调试技巧
- 9.1 【调试技巧】超详细图文教你调试代码
- 9.2 【调试技巧】PyCharm 中指定参数调试程序
- 9.3 【调试技巧】PyCharm跑完后立即进入调试模式
- 9.4 【调试技巧】脚本报错后立即进入调试模式
- 9.5 【调试技巧】使用 PDB 进行无界面调试
- 9.6 【调试技巧】如何调试已经运行的程序?
- 9.7 【调试技巧】使用 PySnopper 调试疑难杂症
- 9.8 【调试技巧】使用 PyCharm 进行远程调试
- 第十章:并发编程
- 10.1 【并发编程】从性能角度初探并发编程
- 10.2 【并发编程】创建多线程的几种方法
- 10.3 【并发编程】谈谈线程中的“锁机制”
- 10.4 【并发编程】线程消息通信机制
- 10.5 【并发编程】线程中的信息隔离
- 10.6 【并发编程】线程池创建的几种方法
- 10.7 【并发编程】从 yield 开始入门协程
- 10.8 【并发编程】深入理解yield from语法
- 10.9 【并发编程】初识异步IO框架:asyncio 上篇
- 10.10 【并发编程】深入异步IO框架:asyncio 中篇
- 10.11 【并发编程】实战异步IO框架:asyncio 下篇
- 10.12 【并发编程】生成器与协程,你分清了吗?
- 10.14 【并发编程】浅谈线程安全那些事儿
- 第十二章:虚拟环境
- 12.1 【虚拟环境】为什么要有虚拟环境?
- 12.2 【虚拟环境】方案一:使用 virtualenv
- 12.3 【虚拟环境】方案二:使用 pipenv
- 12.4 【虚拟环境】方案三:使用 pipx
- 12.5 【虚拟环境】方案四:使用 poetry
- 第十三章:绝佳工具
- 13.1 【静态检查】mypy 的使用
- 13.2 【代码测试】pytest 的使用
- 13.3 【代码提交】pre-commit hook
- 13.4 【项目生成】cookiecutter 的使用
- 第十四章:数据可视化
- 14.1 【可视化之matplotlib】一图带你入门matplotlib
- 14.2 【可视化之matplotlib】详解六种可视化图表
- 14.3 【可视化之matplotlib】 绘制正余弦函数图象
- 14.4 【可视化之matplotlib】难点:子图与子区
- 14.5 【可视化之matplotlib】绘制酷炫的gif动态图
- 14.6 【可视化之matplotlib】自动生成图像视频
- 14.7 【可视化神器】最高级的可视化神器: plotly_express