# 10.11 【并发编程】实战异步IO框架:asyncio 下篇
## 1. 动态添加协程
在实战之前,我们要先了解下在asyncio中如何将协程态添加到事件循环中的。这是前提。
如何实现呢,有两种方法:
- 主线程是同步的
```python
import time
import asyncio
from queue import Queue
from threading import Thread
def start_loop(loop):
# 一个在后台永远运行的事件循环
asyncio.set_event_loop(loop)
loop.run_forever()
def do_sleep(x, queue, msg=""):
time.sleep(x)
queue.put(msg)
queue = Queue()
new_loop = asyncio.new_event_loop()
# 定义一个线程,并传入一个事件循环对象
t = Thread(target=start_loop, args=(new_loop,))
t.start()
print(time.ctime())
# 动态添加两个协程
# 这种方法,在主线程是同步的
new_loop.call_soon_threadsafe(do_sleep, 6, queue, "第一个")
new_loop.call_soon_threadsafe(do_sleep, 3, queue, "第二个")
while True:
msg = queue.get()
print("{} 协程运行完..".format(msg))
print(time.ctime())
```
由于是同步的,所以总共耗时6+3=9秒.
输出结果
```python
Thu May 31 22:11:16 2018
第一个 协程运行完..
Thu May 31 22:11:22 2018
第二个 协程运行完..
Thu May 31 22:11:25 2018
```
- 主线程是异步的,这是重点,一定要掌握。。
```python
import time
import asyncio
from queue import Queue
from threading import Thread
def start_loop(loop):
# 一个在后台永远运行的事件循环
asyncio.set_event_loop(loop)
loop.run_forever()
async def do_sleep(x, queue, msg=""):
await asyncio.sleep(x)
queue.put(msg)
queue = Queue()
new_loop = asyncio.new_event_loop()
# 定义一个线程,并传入一个事件循环对象
t = Thread(target=start_loop, args=(new_loop,))
t.start()
print(time.ctime())
# 动态添加两个协程
# 这种方法,在主线程是异步的
asyncio.run_coroutine_threadsafe(do_sleep(6, queue, "第一个"), new_loop)
asyncio.run_coroutine_threadsafe(do_sleep(3, queue, "第二个"), new_loop)
while True:
msg = queue.get()
print("{} 协程运行完..".format(msg))
print(time.ctime())
```
输出结果
由于是同步的,所以总共耗时max(6, 3)=`6`秒
```python
Thu May 31 22:23:35 2018
第二个 协程运行完..
Thu May 31 22:23:38 2018
第一个 协程运行完..
Thu May 31 22:23:41 2018
```
## 2. 利用redis实现动态添加任务
对于并发任务,通常是用生成消费模型,对队列的处理可以使用类似master-worker的方式,master主要用户获取队列的msg,worker用户处理消息。
为了简单起见,并且协程更适合单线程的方式,我们的主线程用来监听队列,子线程用于处理队列。这里使用redis的队列。主线程中有一个是无限循环,用户消费队列。
先安装Redis
到 https://github.com/MicrosoftArchive/redis/releases 下载
![](https://i.loli.net/2018/06/03/5b13ba8525bcf.png)
解压到你的路径。
![](https://i.loli.net/2018/06/03/5b13ba9f66baa.png)
然后,在当前路径运行cmd,运行redis的服务端。
![](https://i.loli.net/2018/06/03/5b13bab682a32.png)
服务开启后,我们就可以运行我们的客户端了。
并依次输入key=queue,value=5,3,1的消息。
![](https://i.loli.net/2018/06/03/5b13bad79f5ce.png)
一切准备就绪之后,我们就可以运行我们的代码了。
```python
import time
import redis
import asyncio
from queue import Queue
from threading import Thread
def start_loop(loop):
# 一个在后台永远运行的事件循环
asyncio.set_event_loop(loop)
loop.run_forever()
async def do_sleep(x, queue):
await asyncio.sleep(x)
queue.put("ok")
def get_redis():
connection_pool = redis.ConnectionPool(host='127.0.0.1', db=0)
return redis.Redis(connection_pool=connection_pool)
def consumer():
while True:
task = rcon.rpop("queue")
if not task:
time.sleep(1)
continue
asyncio.run_coroutine_threadsafe(do_sleep(int(task), queue), new_loop)
if __name__ == '__main__':
print(time.ctime())
new_loop = asyncio.new_event_loop()
# 定义一个线程,运行一个事件循环对象,用于实时接收新任务
loop_thread = Thread(target=start_loop, args=(new_loop,))
loop_thread.setDaemon(True)
loop_thread.start()
# 创建redis连接
rcon = get_redis()
queue = Queue()
# 子线程:用于消费队列消息,并实时往事件对象容器中添加新任务
consumer_thread = Thread(target=consumer)
consumer_thread.setDaemon(True)
consumer_thread.start()
while True:
msg = queue.get()
print("协程运行完..")
print("当前时间:", time.ctime())
```
稍微讲下代码
`loop_thread`:单独的线程,运行着一个事件对象容器,用于实时接收新任务。
`consumer_thread`:单独的线程,实时接收来自Redis的消息队列,并实时往事件对象容器中添加新任务。
输出结果
```python
Thu May 31 23:42:48 2018
协程运行完..
当前时间: Thu May 31 23:42:49 2018
协程运行完..
当前时间: Thu May 31 23:42:51 2018
协程运行完..
当前时间: Thu May 31 23:42:53 2018
```
我们在Redis,分别发起了5s,3s,1s的任务。
从结果来看,这三个任务,确实是并发执行的,1s的任务最先结束,三个任务完成总耗时5s
运行后,程序是一直运行在后台的,我们每一次在Redis中输入新值,都会触发新任务的执行。。
- 第一章:安装运行
- 1.1 【环境】快速安装 Python 解释器
- 1.2 【环境】Python 开发环境的搭建
- 1.3 【基础】两种运行 Python 程序方法
- 第二章:数据类型
- 2.1 【基础】常量与变量
- 2.2 【基础】字符串类型
- 2.3 【基础】整数与浮点数
- 2.4 【基础】布尔值:真与假
- 2.5 【基础】学会输入与输出
- 2.6 【基础】字符串格式化
- 2.6 【基础】运算符(超全整理)
- 第三章:数据结构
- 3.1 【基础】列表
- 3.2 【基础】元组
- 3.3 【基础】字典
- 3.4 【基础】集合
- 3.5 【基础】迭代器
- 3.6 【基础】生成器
- 第四章:控制流程
- 4.1 【基础】条件语句:if
- 4.2 【基础】循环语句:for
- 4.3 【基础】循环语句:while
- 4.4 【进阶】五种推导式
- 第五章:学习函数
- 5.1 【基础】普通函数
- 5.2 【基础】匿名函数
- 5.3 【基础】高阶函数
- 5.4 【基础】反射函数
- 5.5 【基础】偏函数
- 5.6 【进阶】泛型函数
- 5.7 【基础】变量的作用域
- 5.8 【进阶】上下文管理器
- 5.9 【进阶】装饰器的六种写法
- 第六章:错误异常
- 6.1 【基础】什么是异常?
- 6.2 【基础】如何抛出和捕获异常?
- 6.3 【基础】如何自定义异常?
- 6.4 【进阶】如何关闭异常自动关联上下文?
- 6.5 【进阶】异常处理的三个好习惯
- 第七章:类与对象
- 7.1 【基础】类的理解与使用
- 7.2 【基础】静态方法与类方法
- 7.3 【基础】私有变量与私有方法
- 7.4 【基础】类的封装(Encapsulation)
- 7.5 【基础】类的继承(Inheritance)
- 7.6 【基础】类的多态(Polymorphism)
- 7.7 【基础】类的 property 属性
- 7.8 【进阶】类的 Mixin 设计模式
- 7.9 【进阶】类的魔术方法(超全整理)
- 7.10 【进阶】神奇的元类编程(metaclass)
- 7.11 【进阶】深藏不露的描述符(Descriptor)
- 第八章:包与模块
- 8.1 【基础】什么是包、模块和库?
- 8.2 【基础】安装第三方包的八种方法
- 8.3 【基础】导入单元的构成
- 8.4 【基础】导入包的标准写法
- 8.5 【进阶】常规包与空间命名包
- 8.6 【进阶】花式导包的八种方法
- 8.7 【进阶】包导入的三个冷门知识点
- 8.8 【基础】pip 的超全使用指南
- 8.9 【进阶】理解模块的缓存
- 8.10 【进阶】理解查找器与加载器
- 8.11 【进阶】实现远程导入模块
- 8.12 【基础】分发工具:distutils和setuptools
- 8.13 【基础】源码包与二进制包有什么区别?
- 8.14 【基础】eggs与wheels 有什么区别?
- 8.15 【进阶】超详细讲解 setup.py 的编写
- 8.16 【进阶】打包辅助神器 PBR 是什么?
- 8.17 【进阶】开源自己的包到 PYPI 上
- 第九章:调试技巧
- 9.1 【调试技巧】超详细图文教你调试代码
- 9.2 【调试技巧】PyCharm 中指定参数调试程序
- 9.3 【调试技巧】PyCharm跑完后立即进入调试模式
- 9.4 【调试技巧】脚本报错后立即进入调试模式
- 9.5 【调试技巧】使用 PDB 进行无界面调试
- 9.6 【调试技巧】如何调试已经运行的程序?
- 9.7 【调试技巧】使用 PySnopper 调试疑难杂症
- 9.8 【调试技巧】使用 PyCharm 进行远程调试
- 第十章:并发编程
- 10.1 【并发编程】从性能角度初探并发编程
- 10.2 【并发编程】创建多线程的几种方法
- 10.3 【并发编程】谈谈线程中的“锁机制”
- 10.4 【并发编程】线程消息通信机制
- 10.5 【并发编程】线程中的信息隔离
- 10.6 【并发编程】线程池创建的几种方法
- 10.7 【并发编程】从 yield 开始入门协程
- 10.8 【并发编程】深入理解yield from语法
- 10.9 【并发编程】初识异步IO框架:asyncio 上篇
- 10.10 【并发编程】深入异步IO框架:asyncio 中篇
- 10.11 【并发编程】实战异步IO框架:asyncio 下篇
- 10.12 【并发编程】生成器与协程,你分清了吗?
- 10.14 【并发编程】浅谈线程安全那些事儿
- 第十二章:虚拟环境
- 12.1 【虚拟环境】为什么要有虚拟环境?
- 12.2 【虚拟环境】方案一:使用 virtualenv
- 12.3 【虚拟环境】方案二:使用 pipenv
- 12.4 【虚拟环境】方案三:使用 pipx
- 12.5 【虚拟环境】方案四:使用 poetry
- 第十三章:绝佳工具
- 13.1 【静态检查】mypy 的使用
- 13.2 【代码测试】pytest 的使用
- 13.3 【代码提交】pre-commit hook
- 13.4 【项目生成】cookiecutter 的使用
- 第十四章:数据可视化
- 14.1 【可视化之matplotlib】一图带你入门matplotlib
- 14.2 【可视化之matplotlib】详解六种可视化图表
- 14.3 【可视化之matplotlib】 绘制正余弦函数图象
- 14.4 【可视化之matplotlib】难点:子图与子区
- 14.5 【可视化之matplotlib】绘制酷炫的gif动态图
- 14.6 【可视化之matplotlib】自动生成图像视频
- 14.7 【可视化神器】最高级的可视化神器: plotly_express