企业🤖AI Agent构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
[TOC] Concurrency is the art of making a computer do (or appear to do) multiple things at once. Historically, this meant inviting the processor to switch between different tasks many times per second. In modern systems, it can also literally mean doing two or more things simultaneously on separate processor cores. Concurrency is not inherently an object-oriented topic, but Python's concurrent systems are built on top of the object-oriented constructs we've covered throughout the book. This chapter will introduce you to the following topics: • Threads • Multiprocessing • Futures • AsyncIO Concurrency is complicated. The basic concepts are fairly simple, but the bugs that can occur are notoriously dificult to track down. However, for many projects, concurrency is the only way to get the performance we need. Imagine if a web server couldn't respond to a user's request until the previous one was completed! We won't be going into all the details of just how hard it is (another full book would be required) but we'll see how to do basic concurrency in Python, and some of the most common pitfalls to avoid. 并发是让计算机做(或看起来做)多件事情的艺术 立刻。历史上,这意味着邀请处理器在不同的 每秒执行多次任务。在现代系统中,它也可以字面意思是做 在不同的处理器内核上同时处理两件或更多的事情。 并发性本质上不是一个面向对象的主题,而是Python的并发性 系统是建立在我们贯穿始终的面向对象结构之上的 这本书。本章将向您介绍以下主题: 螺纹 多重处理 未来 AsyncIO 并发是复杂的。基本概念相当简单,但是缺陷 众所周知,这种情况很难追踪。然而,对于许多项目来说, 并发是获得我们需要的性能的唯一方法。想象一下如果一个网络服务器 在上一个请求完成之前,无法响应用户的请求!我们 不会详细讨论它有多难(另一本完整的书会 (这是必需的),但是我们将看到如何在Python中实现基本并发,以及 要避免的最常见陷阱。 ## 线程 Most often, concurrency is created so that work can continue happening while the program is waiting for I/O to happen. For example, a server can start processing a new network request while it waits for data from a previous request to arrive. An interactive program might render an animation or perform a calculation while waiting for the user to press a key. Bear in mind that while a person can type more than 500 characters per minute, a computer can perform billions of instructions per second. Thus, a ton of processing can happen between individual key presses, even when typing quickly. It's theoretically possible to manage all this switching between activities within your program, but it would be virtually impossible to get right. Instead, we can rely on Python and the operating system to take care of the tricky switching part, while we create objects that appear to be running independently, but simultaneously. These objects are called threads; in Python they have a very simple API. Let's take a look at a basic example: 大多数情况下,创建并发性是为了在 程序正在等待输入输出发生。例如,服务器可以开始处理 新的网络请求,同时等待来自先前请求的数据到达。一 交互式程序可能会在等待时渲染动画或执行计算 供用户按键。请记住,虽然一个人可以输入500多种 一台计算机每秒可以执行数十亿条指令。 因此,即使在以下情况下,在单个按键之间也可能发生大量的处理 打字很快。 理论上有可能管理您的内部活动之间的所有切换 程序,但实际上不可能做对。相反,我们可以依靠 Python和操作系统来处理棘手的切换部分,而我们 创建看起来独立运行但同时运行的对象。这些 对象被称为线程;在Python中,它们有一个非常简单的应用编程接口。让我们看看 在一个基本的例子中: ``` from threading import Thread class InputReader(Thread): def run(self): self.line_of_text = input() print("Enter some text and press enter: ") thread = InputReader() thread.start() count = result = 1 while thread.is_alive(): result = count * count count += 1 print("calculated squares up to {0} * {0} = {1}".format( count, result)) print("while you typed '{}'".format(thread.line_of_text)) ``` This example runs two threads. Can you see them? Every program has one thread, called the main thread. The code that executes from the beginning is happening in this thread. The second thread, more obviously, exists as the InputReader class. To construct a thread, we must extend the Thread class and implement the run method. Any code inside the run method (or that is called from within that method) is executed in a separate thread. 本示例运行两个线程。你能看见他们吗?每个程序都有一个线程, 叫做主线。从头开始执行的代码发生在 这根线。更明显的是,第二个线程是作为InputReader类存在的。 要构建线程,我们必须扩展线程类并实现运行 方法。run方法中的任何代码(或者从该方法中调用的代码 方法)在单独的线程中执行。 The new thread doesn't start running until we call the start() method on the object. In this case, the thread immediately pauses to wait for input from the keyboard. In the meantime, the original thread continues executing at the point start was called. It starts calculating squares inside a while loop. The condition in the while loop checks if the InputReader thread has exited its run method yet; once it does, it outputs some summary information to the screen. If we run the example and type the string "hello world", the output looks as follows: 直到我们对对象调用start()方法,新线程才会开始运行。 在这种情况下,线程会立即暂停,等待键盘输入。 同时,原始线程在开始时继续执行 打电话来。它开始在while循环中计算方块。当时的情况 循环检查输入线程是否已经退出其运行方法;一旦它做到了,它 向屏幕输出一些摘要信息。 如果我们运行示例并键入字符串“hello world”,输出如下: ``` Enter some text and press enter: hello world calculated squares up to 1044477 * 1044477 = 1090930114576 while you typed 'hello world' ``` You will, of course, calculate more or less squares while typing the string as the numbers are related to both our relative typing speeds, and to the processor speeds of the computers we are running. A thread only starts running in concurrent mode when we call the start method. If we want to take out the concurrent call to see how it compares, we can call thread. run() in the place that we originally called thread.start(). The output is telling: 当然,在将字符串作为 数字既与我们的相对打字速度有关,也与处理器速度有关 我们运行的计算机中。 当我们调用start方法时,线程才开始以并发模式运行。如果 我们想取出并发调用,看看它是如何比较的,我们可以调用thread。 在我们最初称之为thread.start()的地方运行()。输出表明: ``` Enter some text and press enter: hello world calculated squares up to 1 * 1 = 1 while you typed 'hello world' ``` In this case, the thread never becomes alive and the while loop never executes. We wasted a lot of CPU power sitting idle while we were typing. There are a lot of different patterns for using threads effectively. We won't be covering all of them, but we will look at a common one so we can learn about the join method. Let's check the current temperature in the capital city of every province in Canada: 在这种情况下,线程永远不会变得活跃,while循环永远不会执行。 我们在打字的时候闲坐着浪费了大量的中央处理器能量。 有效使用线程有许多不同的模式。我们不会报道 所有这些,但是我们将看一个公共的,这样我们就可以了解join方法。 让我们来看看加拿大各省首府的当前温度: ``` from threading import Thread import json from urllib.request import urlopen import time CITIES = [ 'Edmonton', 'Victoria', 'Winnipeg', 'Fredericton', "St. John's", 'Halifax', 'Toronto', 'Charlottetown', 'Quebec City', 'Regina' ] class TempGetter(Thread): def __init__(self, city): super().__init__() self.city = city def run(self): url_template = ( 'http://api.openweathermap.org/data/2.5/' 'weather?q={},CA&units=metric') response = urlopen(url_template.format(self.city)) data = json.loads(response.read().decode()) self.temperature = data['main']['temp'] threads = [TempGetter(c) for c in CITIES] start = time.time() for thread in threads: thread.start() for thread in threads: thread.join() for thread in threads: print( "it is {0.temperature:.0f}°C in {0.city}".format(thread)) print( "Got {} temps in {} seconds".format( len(threads), time.time() - start)) ``` This code constructs 10 threads before starting them. Notice how we can override the constructor to pass them into the Thread object, remembering to call super to ensure the Thread is properly initialized. Pay attention to this: the new thread isn't running yet, so the \_\_init\_\_ method is still executing from inside the main thread. Data we construct in one thread is accessible from other running threads. After the 10 threads have been started, we loop over them again, calling the join() method on each. This method essentially says "wait for the thread to complete before doing anything". We call this ten times in sequence; the for loop won't exit until all ten threads have completed. 这段代码在启动前构造了10个线程。注意我们如何重写 构造函数将它们传递给线程对象,记住调用super以确保 线程已正确初始化。请注意:新线程没有运行 然而,所以__init__方法仍然在主线程内部执行。数据我们 一个线程中的构造可以从其他正在运行的线程中访问。 10个线程启动后,我们再次循环它们,调用join() 方法。这个方法本质上说是“等待线程完成之前” 做任何事”。我们依次称之为十次;for循环直到所有 十个线程已经完成。 At this point, we can print the temperature that was stored on each thread object. Notice once again that we can access data that was constructed within the thread from the main thread. In threads, all state is shared by default. Executing this code on my 100 mbit connection takes about two tenths of a second: 此时,我们可以打印存储在每个线程对象上的温度。 再次注意,我们可以访问线程中构建的数据 从主线开始。在线程中,默认情况下所有状态都是共享的。 在我的100兆位连接上执行这段代码大约需要十分之二秒: ``` it is 5°C in Edmonton it is 11°C in Victoria it is 0°C in Winnipeg it is -10°C in Fredericton it is -12°C in St. John's it is -8°C in Halifax it is -6°C in Toronto it is -13°C in Charlottetown it is -12°C in Quebec City it is 2°C in Regina Got 10 temps in 0.18970298767089844 seconds ``` If we run this code in a single thread (by changing the start() call to run() and commenting out the join() call), it takes closer to 2 seconds because each 0.2 second request has to complete before the next one begins. This speedup of 10 times shows just how useful concurrent programming can be. 如果我们在一个线程中运行这段代码(通过将start()调用改为run()和 注释掉join()调用),因为每0.2秒需要将近2秒 请求必须在下一个开始之前完成。十倍的加速显示 并发编程有多有用。 ### 与线程有关的问题 Threads can be useful, especially in other programming languages, but modern Python programmers tend to avoid them for several reasons. As we'll see, there are other ways to do concurrent programming that are receiving more attention from the Python developers. Let's discuss some of these pitfalls before moving on to more salient topics. 线程可能很有用,尤其是在其他编程语言中,但是很现代 Python程序员倾向于避免它们,原因有几个。我们会看到,在那里 其他进行并发编程的方法受到更多关注吗 来自Python开发人员。在开始之前,让我们讨论一下这些陷阱 更突出的话题。 #### 共享内存 The main problem with threads is also their primary advantage. Threads have access to all the memory and thus all the variables in the program. This can too easily cause inconsistencies in the program state. Have you ever encountered a room where a single light has two switches and two different people turn them on at the same time? Each person (thread) expects their action to turn the lamp (a variable) on, but the resulting value (the lamp is off) is inconsistent with those expectations. Now imagine if those two threads were transferring funds between bank accounts or managing the cruise control in a vehicle. 线程的主要问题也是它们的主要优势。线程有 访问所有内存,从而访问程序中的所有变量。这也可以 容易导致程序状态不一致。你曾经遇到过 一盏灯有两个开关,两个不同的人转动开关的房间 同时打开吗?每个人(线)都期望他们的行动能点亮灯 (一个变量)打开,但是结果值(灯关闭)与这些值不一致 期望。现在想象一下,如果这两个线程在 银行账户或管理汽车巡航控制。 The solution to this problem in threaded programming is to "synchronize" access to any code that reads or writes a shared variable. There are a few different ways to do this, but we won't go into them here so we can focus on more Pythonic constructs. The synchronization solution works, but it is way too easy to forget to apply it. Worse, bugs due to inappropriate use of synchronization are really hard to track down because the order in which threads perform operations is inconsistent. We can't easily reproduce the error. Usually, it is safest to force communication between threads to happen using a lightweight data structure that already uses locks appropriately. Python offers the queue.Queue class to do this; it's functionality is basically the same as the multiprocessing.Queue that we will discuss in the next section. In some cases, these disadvantages might be outweighed by the one advantage of allowing shared memory: it's fast. If multiple threads need access to a huge data structure, shared memory can provide that access quickly. However, this advantage is usually nulliied by the fact that, in Python, it is impossible for two threads running on different CPU cores to be performing calculations at exactly the same time. This brings us to our second problem with threads. 线程编程中这个问题的解决方案是“同步”访问 读取或写入共享变量的任何代码。有几种不同的方法可以做 这一点,但我们不会在这里讨论它们,这样我们就可以专注于更多的蟒蛇构造。这 同步解决方案可以工作,但是忘记应用它太容易了。更糟的是,虫子 由于同步使用不当,很难跟踪 线程执行操作的顺序不一致。我们不能轻易复制 错误。通常,强制线程之间进行通信是最安全的 使用已经适当使用锁的轻量级数据结构。大蟒 提供队列。队列类来执行此操作;它的功能与 多重处理。排队,我们将在下一节讨论。 在某些情况下,这些缺点可能会被以下优点所压倒 允许共享内存:速度很快。如果多个线程需要访问大量数据 结构化共享内存可以快速提供访问。然而,这一优势是 通常是因为在Python中,两个线程不可能运行在 不同的中央处理器核心在完全相同的时间执行计算。这带来了 线程的第二个问题。 #### 全局解释器锁 In order to eficiently manage memory, garbage collection, and calls to machine code in libraries, Python has a utility called the global interpreter lock, or GIL. It's impossible to turn off, and it means that threads are useless in Python for one thing that they excel at in other languages: parallel processing. The GIL's primary effect, for our purposes is to prevent any two threads from doing work at the exact same time, even if they have work to do. In this case, "doing work" means using the CPU, so it's perfectly ok for multiple threads to access the disk or network; the GIL is released as soon as the thread starts to wait for something. The GIL is quite highly disparaged, mostly by people who don't understand what it is or all the beneits it brings to Python. It would deinitely be nice if our language didn't have this restriction, but the Python reference developers have determined that, for now at least, it brings more value than it costs. It makes the reference implementation easier to maintain and develop, and during the single-core processor days when Python was originally developed, it actually made the interpreter faster. The net result of the GIL, however, is that it limits the beneits that threads bring us, without alleviating the costs. 为了有效地管理内存、垃圾收集和对机器的调用 在库代码中,Python有一个名为全局解释器锁(或称GIL)的实用程序。这是 无法关闭,这意味着线程在Python中毫无用处 他们擅长的其他语言:并行处理。GIL的主要影响, 我们的目的是防止任何两个线程完全相同地工作 时间,即使他们有工作要做。在这种情况下,“工作”意味着使用中央处理器, 所以多线程访问磁盘或网络是完全可以的;GIL是 线程开始等待某个东西时释放。 GIL受到了相当高的蔑视,主要是被那些不了解它是什么的人所蔑视 或者它给蟒蛇带来的所有好处。如果我们的语言不是,那肯定会很好 有这个限制,但是Python参考开发人员已经确定,对于 至少现在,它带来的价值超过了成本。它进行引用实现 更易于维护和开发,在单核处理器时代 Python最初是开发出来的,它实际上使解释器更快。网络 然而,GIL的结果是,它限制了线程给我们带来的好处 降低成本。 While the GIL is a problem in the reference implementation of Python that most people use, it has been solved in some of the nonstandard implementations such as IronPython and Jython. Unfortunately, at the time of publication, none of these support Python 3. > 虽然GIL是Python参考实现中的一个问题 大多数人使用的,它已经在一些非标准的 实现,如IronPython和Jython。不幸的是,在 发布时,这些都不支持Python 3。 ### 线程过载 One inal limitation of threads as compared to the asynchronous system we will be discussing later is the cost of maintaining the thread. Each thread takes up a certain amount of memory (both in the Python process and the operating system kernel) to record the state of that thread. Switching between the threads also uses a (small) amount of CPU time. This work happens seamlessly without any extra coding (we just have to call start() and the rest is taken care of), but the work still has to happen somewhere. This can be alleviated somewhat by structuring our workload so that threads can be reused to perform multiple jobs. Python provides a ThreadPool feature to handle this. It is shipped as part of the multiprocessing library and behaves identically to the ProcessPool, that we will discuss shortly, so let's defer discussion until the next section. 与异步系统相比,线程的一个最终限制是 稍后讨论的是维护线程的成本。每根线占用一定的空间 内存量(在Python进程和操作系统内核中) 来记录线程的状态。线程之间的切换也使用(小) CPU时间量。这项工作无缝进行,无需任何额外的编码(我们 只需要调用start()就可以了,剩下的都处理好了),但是工作仍然需要 发生在某个地方。 这可以通过构建我们的工作负载来缓解,这样线程就可以 被重用以执行多个作业。Python提供了线程池功能来处理 这个。它作为多处理库的一部分提供,其行为与 我们将很快讨论流程池,所以我们将讨论推迟到 下一节。 ## 多进程 The multiprocessing API was originally designed to mimic the thread API. However, it has evolved and in recent versions of Python 3, it supports more features more robustly. The multiprocessing library is designed when CPU-intensive jobs need to happen in parallel and multiple cores are available (given that a four core Raspberry Pi can currently be purchased for $35, there are usually multiple cores available). Multiprocessing is not useful when the processes spend a majority of their time waiting on I/O (for example, network, disk, database, or keyboard), but they are the way to go for parallel computation. The multiprocessing module spins up new operating system processes to do the work. On Windows machines, this is a relatively expensive operation; on Linux, processes are implemented in the kernel the same way threads are, so the overhead is limited to the cost of running separate Python interpreters in each process. 多处理应用编程接口最初是为了模拟线程应用编程接口而设计的。然而, 它已经发展,在Python 3的最新版本中,它更多地支持更多功能 强壮。多处理库是在CPU密集型作业需要时设计的 并行发生且有多个内核可用(假设四核覆盆子 Pi目前可以35美元购买,通常有多个内核可用)。 当进程花费大部分时间时,多重处理没有用 等待输入/输出(例如,网络、磁盘、数据库或键盘),但它们是 并行计算之路。 多处理模块启动新的操作系统进程来完成 工作。在视窗机器上,这是一个相对昂贵的操作;在Linux上, 进程在内核中的实现方式与线程相同,因此开销 仅限于在每个过程中运行独立的Python解释器的成本。 Let's try to parallelize a compute-heavy operation using similar constructs to those provided by the threading API: 让我们尝试使用与那些类似的结构来并行化计算密集型操作 由线程应用编程接口提供: ``` from multiprocessing import Process, cpu_count import time import os class MuchCPU(Process): def run(self): print(os.getpid()) for i in range(200000000): pass if __name__ == '__main__': procs = [MuchCPU() for f in range(cpu_count())] t = time.time() for p in procs: p.start() for p in procs: p.join() print('work took {} seconds'.format(time.time() - t)) ``` This example just ties up the CPU for 200 million iterations. You may not consider this to be useful work, but it's a cold day and I appreciate the heat my laptop generates under such load. The API should be familiar; we implement a subclass of Process (instead of Thread) and implement a run method. This method prints out the process ID (a unique number the operating system assigns to each process on the machine) before doing some intense (if misguided) work. Pay special attention to the if \_\_name\_\_ == '\_\_main\_\_': guard around the module level code that prevents it to run if the module is being imported, rather than run as a program. This is good practice in general, but when using multiprocessing on some operating systems, it is essential. Behind the scenes, multiprocessing may have to import the module inside the new process in order to execute the run() method. If we allowed the entire module to execute at that point, it would start creating new processes recursively until the operating system ran out of resources. We construct one process for each processor core on our machine, then start and join each of those processes. On my 2014 era quad-core laptop, the output looks like this: 这个例子只是将中央处理器占用了2亿次迭代。你可能不会考虑 这是一项很有用的工作,但是今天很冷,我很感激我的笔记本电脑的热度 在这样的负载下产生。 该应用编程接口应该是熟悉的;我们实现了一个进程子类(而不是线程) 并实现一个运行方法。该方法打印出进程标识(唯一的数字 操作系统分配给机器上的每个进程) 紧张的工作(如果被误导的话)。 请特别注意if _ _ name _ _ _ =“_ _ main _ _ _”:保护模块周围 如果模块正在导入而不是运行,则阻止其运行的级别代码 作为一个程序。一般来说,这是一个很好的实践,但是在使用多处理时 一些操作系统,这是必不可少的。在幕后,多重处理可能 在新进程中导入模块,以便执行run()方法。 如果我们允许整个模块在那个时候执行,它将开始创建新的 递归处理,直到操作系统耗尽资源。 我们为机器上的每个处理器内核构建一个进程,然后启动并加入 每一个过程。在我的2014年款四核笔记本电脑上,输出如下: ``` 6987 6988 6989 6990 work took 12.96659541130066 seconds ``` The irst four lines are the process ID that was printed inside each MuchCPU instance. The last line shows that the 200 million iterations can run in about 13 seconds on my machine. During that 13 seconds, my process monitor indicated that all four of my cores were running at 100 percent. If we subclass threading.Thread instead of multiprocessing.Process in MuchCPU, the output looks like this: 前四行是打印在每个MuchCPU实例中的进程标识。 最后一行显示,在我的 机器。在这13秒钟内,我的过程监视器显示 内核以100%的速度运行。 如果我们将线程子类化。线程代替多处理。过程在 很多CPU,输出如下: ``` 7235 7235 7235 7235 work took 28.577413082122803 seconds ``` This time, the four threads are running inside the same process and take close to three times as long to run. This is the cost of the global interpreter lock; in other languages or implementations of Python, the threaded version would run at least as fast as the multiprocessing version, We might expect it to be four times as long, but remember that many other programs are running on my laptop. In the multiprocessing version, these programs also need a share of the four CPUs. In the threading version, those programs can use the other three CPUs instead. 这一次,四个线程在同一个进程中运行,并且占用了近三个线程 跑步时间的两倍。这是全局解释器锁的成本;用其他语言 、或Python的实现,线程版本的运行速度至少与 多重处理版本,我们可能预计它会有四倍长,但是请记住 许多其他程序正在我的笔记本电脑上运行。在多处理版本中, 这些项目也需要四个中央处理器中的一部分。在线程版本中,那些 程序可以使用其他三个处理器来代替。 ### 多进程池 In general, there is no reason to have more processes than there are processors on the computer. There are a few reasons for this: • Only cpu\_count() processes can run simultaneously • Each process consumes resources with a full copy of the Python interpreter • Communication between processes is expensive • Creating processes takes a nonzero amount of time Given these constraints, it makes sense to create at most cpu\_count() processes when the program starts and then have them execute tasks as needed. It is not dificult to implement a basic series of communicating processes that does this, but it can be tricky to debug, test, and get right. Of course, Python being Python, we don't have to do all this work because the Python developers have already done it for us in the form of multiprocessing pools. 总的来说,没有理由比上有更多的处理器 电脑。这有几个原因: 只有cpu_count()进程可以同时运行 每一个过程都使用Python解释器的完整副本来消耗资源 流程之间的沟通很昂贵 创建流程需要非零的时间 鉴于这些限制,创建最多个cpu_count()进程是有意义的 当程序启动后,让他们根据需要执行任务。事实并非如此 很难实现一系列基本的沟通过程, 但是调试、测试和纠正可能很棘手。当然,蟒蛇就是蟒蛇, 我们不必做所有这些工作,因为Python开发人员已经做了 以多处理池的形式为我们做这件事。 The primary advantage of pools is that they abstract away the overhead of iguring out what code is executing in the main process and which code is running in the subprocess. As with the threading API that multiprocessing mimics, it can often be hard to remember who is executing what. The pool abstraction restricts the number of places that code in different processes interact with each other, making it much easier to keep track of. • Pools also seamlessly hide the process of passing data between processes. Using a pool looks much like a function call; you pass data into a function, it is executed in another process or processes, and when the work is done, a value is returned. It is important to understand that under the hood, a lot of work is being done to support this: objects in one process are being pickled and passed into a pipe. • Another process retrieves data from the pipe and unpickles it. Work is done in the subprocess and a result is produced. The result is pickled and passed into a pipe. Eventually, the original process unpickles it and returns it. All this pickling and passing data into pipes takes time and memory. Therefore, it is ideal to keep the amount and size of data passed into and returned from the pool to a minimum, and it is only advantageous to use the pool if a lot of processing has to be done on the data in question. Armed with this knowledge, the code to make all this machinery work is surprisingly simple. Let's look at the problem of calculating all the prime factors of a list of random numbers. This is a common and expensive part of a variety of cryptography algorithms (not to mention attacks on those algorithms!). It requires years of processing power to crack the extremely large numbers used to secure your bank accounts. The following implementation, while readable, is not at all eficient, but that's ok because we want to see it using lots of CPU time: 池的主要优势是它们可以减少配置开销 找出在主进程中执行的代码以及在 子流程。如同多处理模拟的线程应用编程接口一样,它通常可以 很难记起谁在执行什么。池抽象限制了数量 不同进程中的代码相互作用的地方 更容易跟踪。 池还无缝隐藏了进程之间传递数据的过程。 使用池看起来很像函数调用;您将数据传递到函数中, 它在另一个或多个进程中执行,当工作完成时 返回值。重要的是要理解,在引擎盖下,很多 支持这一点的工作正在进行:一个过程中的对象正在被腌制 然后进入管道。 另一个过程从管道中检索数据并解除锁定。工作完成了 并产生结果。结果被腌制并通过 放进管子里。最终,原始进程会将其拆封并返回。 所有这些酸洗和将数据传入管道需要时间和内存。因此,它是 非常适合将传入池和从池中返回的数据量和大小保持在 最低限度,只有在大量处理必须使用池的情况下才是有利的 对有问题的数据进行处理。 有了这些知识,让所有这些机器运转的代码令人惊讶 简单。让我们来看看计算一个随机列表的所有质因数的问题 数字。这是各种密码算法中常见且昂贵的部分 (更不用说对那些算法的攻击了!)。需要多年的处理能力来 破解用来保护你银行账户的巨大数字。接下来的 实现虽然可读,但一点也不有效,但是没关系,因为我们想要 要使用大量的CPU时间来查看它: ``` import random from multiprocessing.pool import Pool def prime_factor(value): factors = [] for divisor in range(2, value-1): quotient, remainder = divmod(value, divisor) if not remainder: factors.extend(prime_factor(divisor)) factors.extend(prime_factor(quotient)) break else: factors = [value] return factors if __name__ == '__main__': pool = Pool() to_factor = [ random.randint(100000, 50000000) for i in range(20) ] results = pool.map(prime_factor, to_factor) for value, factors in zip(to_factor, results): print("The factors of {} are {}".format(value, factors)) ``` Let's focus on the parallel processing aspects as the brute force recursive algorithm for calculating factors is pretty clear. We irst construct a multiprocessing pool instance. By default, this pool creates a separate process for each of the CPU cores in the machine it is running on. The map method accepts a function and an iterable. The pool pickles each of the values in the iterable and passes it into an available process, which executes the function on it. When that process is inished doing it's work, it pickles the resulting list of factors and passes it back to the pool. Once all the pools are inished processing work (which could take some time), the results list is passed back to the original process, which has been waiting patiently for all this work to complete. It is often more useful to use the similar map\_async method, which returns immediately even though the processes are still working. In that case, the results variable would not be a list of values, but a promise to return a list of values later by calling results.get(). This promise object also has methods like ready(), and wait(), which allow us to check whether all the results are in yet. Alternatively, if we don't know all the values we want to get results for in advance, we can use the apply\_async method to queue up a single job. If the pool has a process that isn't already working, it will start immediately; otherwise, it will hold onto the task until there is a free process available. Pools can also be closed, which refuses to take any further tasks, but processes everything currently in the queue, or terminated, which goes one step further and refuses to start any jobs still on the queue, although any jobs currently running are still permitted to complete. 让我们把重点放在并行处理方面,如强力递归算法 因为计算因素很清楚。我们首先构建一个多处理池 实例。默认情况下,该池为每个CPU创建一个单独的进程 它运行的机器中的核心。 映射方法接受一个函数和一个可迭代函数。游泳池腌制每一种价值 并将它传递给一个可用的进程,该进程在其上执行函数。 当这个过程完成了它的工作,它就把结果列成了一系列因素 把它送回游泳池。一旦所有池都完成了处理工作(这可能 需要一些时间),结果列表被传递回原始过程 耐心等待所有这些工作完成。 使用类似的map_async方法通常更有用,该方法返回 即使这些过程仍在运行。在这种情况下,结果 变量不是一个值列表,而是一个稍后返回值列表的承诺 通过调用results.get()。这个promise对象也有类似ready()的方法, 和wait(),这样我们就可以检查是否所有的结果都已经出来了。 或者,如果我们事先不知道我们想要得到结果的所有值, 我们可以使用apply_async方法将单个作业排队。如果游泳池有一个 尚未工作的进程将立即启动;否则,它将保持不变 直到有可用的自由流程。 池也可以关闭,这将拒绝接受任何进一步的任务,但进程 当前队列中的所有内容,或者终止的内容,更进一步 拒绝启动任何仍在队列中的作业,尽管当前运行的任何作业 仍然可以完成。 ### 队列 If we need more control over communication between processes, we can use a Queue. Queue data structures are useful for sending messages from one process into one or more other processes. Any picklable object can be sent into a Queue, but remember that pickling can be a costly operation, so keep such objects small. To illustrate queues, let's build a little search engine for text content that stores all relevant entries in memory. This is not the most sensible way to build a text-based search engine, but I have used this pattern to query numerical data that needed to use CPU-intensive processes to construct a chart that was then rendered to the user. This particular search engine scans all iles in the current directory in parallel. A process is constructed for each core on the CPU. Each of these is instructed to load some of the iles into memory. Let's look at the function that does the loading and searching: 如果我们需要更多地控制进程间的通信,我们可以使用队列。 队列数据结构对于将消息从一个进程发送到一个或 更多其他流程。任何可选择的对象都可以被发送到队列中,但是记住 酸洗可能是一个昂贵的操作,所以保持这些物体小。为了说明 队列,让我们为存储所有相关内容的文本内容构建一个小型搜索引擎 内存中的条目。 这不是建立基于文本的搜索引擎的最明智的方法,但是我有 使用这种模式来查询需要使用大量中央处理器的数字数据 构建图表的过程,然后呈现给用户。 这个特定的搜索引擎并行扫描当前目录中的所有文件。A 进程是为中央处理器上的每个核心构建的。每个都被指示加载 记忆中的一些文件。让我们看看加载的函数 搜索: ``` def search(paths, query_q, results_q): lines = [] for path in paths: lines.extend(l.strip() for l in path.open()) query = query_q.get() while query: results_q.put([l for l in lines if query in l]) query = query_q.get() ``` Remember, this function is run in a different process (in fact, it is run in cpucount() different processes) from the main thread. It is passes a list of path.path objects and two multiprocessing.Queue objects; one for incoming queries and one to send outgoing results. These queues have a similar interface to the Queue class we discussed in Chapter 6, Python Data Structures. However, they are doing extra work to pickle the data in the queue and pass it into the subprocess over a pipe. These two queues are set up in the main process and passed through the pipes into the search function inside the child processes. The search code is pretty dumb, both in terms of eficiency and of capabilities; it loops over every line stored in memory and puts the matching ones in a list. The list is placed on a queue and passed back to the main process. Let's look at the main process, which sets up these queues: 请记住,此函数在不同的进程中运行(事实上,它在cpucount()中运行 不同的进程)。它传递路径列表。路径对象 和两个多重处理。队列对象;一个用于传入查询,一个用于 发送输出结果。这些队列与我们的队列类有相似的接口 在第6章,Python数据结构中讨论。然而,他们正在做额外的工作 提取队列中的数据,并通过管道将其传递给子流程。这两个 队列在主进程中建立,并通过管道进入搜索 子进程内部的函数。 搜索代码在效率和能力方面都相当愚蠢;它循环往复 存储在内存中的每一行,并将匹配的行放入列表中。列表已被放置 并传递回主进程。 让我们看看建立这些队列的主要过程: ``` if __name__ == '__main__': from multiprocessing import Process, Queue, cpu_count from path import path cpus = cpu_count() pathnames = [f for f in path('.').listdir() if f.isfile()] paths = [pathnames[i::cpus] for i in range(cpus)] query_queues = [Queue() for p in range(cpus)] results_queue = Queue() search_procs = [ Process(target=search, args=(p, q, results_queue)) for p, q in zip(paths, query_queues) ] for proc in search_procs: proc.start() ``` For easier description, let's assume cpu\_count is four. Notice how the import statements are placed inside the if guard? This is a small optimization that prevents them from being imported in each subprocess (where they aren't needed) on certain operating systems. We list all the paths in the current directory and then split the list into four approximately equal parts. We also construct a list of four Queue objects to send data into each subprocess. Finally, we construct a single results queue; this is passed into all four of the subprocesses. Each of them can put data into the queue and it will be aggregated in the main process. Now let's look at the code that makes a search actually happen: 为了便于描述,让我们假设cpu_count为4。请注意导入 语句被放在if保护内部?这是一个小的优化,防止 它们一定不会在每个子过程(不需要的地方)中被导入 操作系统。我们列出当前目录中的所有路径,然后拆分列表 分成四个大致相等的部分。我们还构建了四个队列对象的列表,以 将数据发送到每个子流程。最后,我们构建了一个单一的结果队列;这 被传递到所有四个子过程中。他们每个人都可以将数据放入队列 它将在主过程中聚合。 现在让我们来看看真正实现搜索的代码: ``` for q in query_queues: q.put("def") q.put(None) # Signal process termination for i in range(cpus): for match in results_queue.get(): print(match) for proc in search_procs: proc.join() ``` This code performs a single search for "def" (because it's a common phrase in a directory full of Python iles!). In a more production ready system, we would probably hook a socket up to this search code. In that case, we'd have to change the inter-process protocol so that the message coming back on the return queue contained enough information to identify which of many queries the results were attached to. This use of queues is actually a local version of what could become a distributed system. Imagine if the searches were being sent out to multiple computers and then recombined. We won't discuss it here, but the multiprocessing module includes a manager class that can take a lot of the boilerplate out of the preceding code. There is even a version of the multiprocessing.Manager that can manage subprocesses on remote systems to construct a rudimentary distributed application. Check the Python multiprocessing documentation if you are interested in pursuing this further. 这段代码对“def”执行一次搜索(因为它是 一个满是Python文件的目录!)。在一个更易于生产的系统中,我们会 可能会将一个套接字连接到这个搜索代码。在这种情况下,我们必须改变 进程间协议,以便消息返回到返回队列 包含足够的信息来识别结果是哪个查询 附着于。 队列的这种使用实际上是分布式的本地版本 系统。想象一下,如果搜索被发送到多台计算机,然后 重组。我们不在这里讨论它,但是多处理模块包括一个 manager类,它可以从前面的代码中提取大量样板文件。在那里 甚至是多重处理的一个版本。可以管理子流程的管理器 构建一个基本的分布式应用程序。检查 Python多处理文档,如果您有兴趣进一步研究的话。 ### 多进程的问题 As threads do, multiprocessing also has problems, some of which we have already discussed. There is no best way to do concurrency; this is especially true in Python. We always need to examine the parallel problem to igure out which of the many available solutions is the best one for that problem. Sometimes, there is no best solution. In the case of multiprocessing, the primary drawback is that sharing data between processes is very costly. As we have discussed, all communication between processes, whether by queues, pipes, or a more implicit mechanism requires pickling the objects. Excessive pickling quickly dominates processing time. Multiprocessing works best when relatively small objects are passed between processes and a tremendous amount of work needs to be done on each one. On the other hand, if no communication between processes is required, there may not be any point in using the module at all; we can spin up four separate Python processes and use them independently. The other major problem with multiprocessing is that, like threads, it can be hard to tell which process a variable or method is being accessed in. In multiprocessing, if you access a variable from another process it will usually overwrite the variable in the currently running process while the other process keeps the old value. This is really confusing to maintain, so don't do it. 和线程一样,多重处理也有问题,其中一些我们已经有了 讨论过了。没有最佳的并发方式;在Python中尤其如此。我们 总是需要检查并行问题,找出许多可用问题中的哪一个 解决这个问题的最好办法。有时候,没有最好的解决办法。 在多处理的情况下,主要缺点是在 过程非常昂贵。正如我们已经讨论过的,流程之间的所有通信, 无论是通过队列、管道还是更隐含的机制,都需要酸洗对象。 过度酸洗很快支配了加工时间。多重处理效果最好 当相对较小的对象在进程和大量的 每一个都需要做大量的工作。另一方面,如果没有交流 在需要的过程之间,使用该模块可能没有任何意义; 我们可以旋转四个独立的Python进程,并独立使用它们。 多重处理的另一个主要问题是,像线程一样,很难 告知变量或方法正在哪个进程中被访问。在多重处理中,如果 从另一个进程访问变量时,它通常会覆盖 当前正在运行的进程,而另一个进程保持旧值。这真是 难以维护,所以不要这样做。 ## Futures Let's start looking at a more asynchronous way of doing concurrency. Futures wrap either multiprocessing or threading depending on what kind of concurrency we need (tending towards I/O versus tending towards CPU). They don't completely solve the problem of accidentally altering shared state, but they allow us to structure our code such that it is easier to track down when we do so. Futures provide distinct boundaries between the different threads or processes. Similar to the multiprocessing pool, they are useful for "call and answer" type interactions in which processing can happen in another thread and then at some point in the future (they are aptly named, after all), you can ask it for the result. It's really just a wrapper around multiprocessing pools and thread pools, but it provides a cleaner API and encourages nicer code. A future is an object that basically wraps a function call. That function call is run in the background in a thread or process. The future object has methods to check if the future has completed and to get the results after it has completed. 让我们开始考虑一种更异步的并发方式。期货包装 多处理或线程处理取决于我们需要哪种并发 (倾向于输入输出而不是倾向于中央处理器)。他们没有完全解决 意外改变共享状态的问题,但是它们允许我们构造代码 以便在我们这样做时更容易追踪。期货提供了截然不同的界限 在不同的线程或进程之间。与多处理池相似,它们 对于“呼叫和应答”类型的交互非常有用,在这种交互中可以进行处理 另一个线程,然后在将来的某个时候(毕竟它们被恰当地命名), 你可以向它询问结果。它实际上只是多处理池的包装 和线程池,但它提供了更干净的应用编程接口,并鼓励更好的代码。 未来是一个基本上包装函数调用的对象。该函数调用在 线程或进程中的背景。未来的对象有方法来检查 未来已经完成,并在完成后得到结果。 Let's do another ile search example. In the last section, we implemented a version of the unix grep command. This time, let's do a simple version of the find command. The example will search the entire ilesystem for paths that contain a given string of characters: 让我们做另一个文件搜索示例。在最后一节中,我们实现了一个版本 unix grep命令的。这次,让我们做一个简单的查找版本 命令。该示例将在整个ilesystem中搜索包含以下内容的路径 给定的字符串: ``` from concurrent.futures import ThreadPoolExecutor from pathlib import Path from os.path import sep as pathsep from collections import deque def find_files(path, query_string): subdirs = [] for p in path.iterdir(): full_path = str(p.absolute()) if p.is_dir() and not p.is_symlink(): subdirs.append(p) if query_string in full_path: print(full_path) return subdirs query = '.py' futures = deque() basedir = Path(pathsep).absolute() with ThreadPoolExecutor(max_workers=10) as executor: futures.append( executor.submit(find_files, basedir, query)) while futures: future = futures.popleft() if future.exception(): continue elif future.done(): subdirs = future.result() for subdir in subdirs: futures.append(executor.submit( find_files, subdir, query)) else: futures.append(future) ``` This code consists of a function named find\_files that is run in a separate thread (or process, if we used ProcessPoolExecutor). There isn't anything particularly special about this function, but note how it does not access any global variables. All interaction with the external environment is passed into the function or returned from it. This is not a technical requirement, but it is the best way to keep your brain inside your skull when programming with futures. 这段代码由一个名为find_files的函数组成,该函数在单独的线程中运行(或者 流程,如果我们使用ProcessPoolExecutor)。没有什么特别的 关于这个函数,但是注意它如何不访问任何全局变量。所有互动 与外部环境一起传递到函数中或从函数中返回。这是 这不是一个技术要求,但这是让你的大脑保持清醒的最好方法 当用期货编程时。 Accessing outside variables without proper synchronization results in something called a race condition. For example, imagine two concurrent writes trying to increment an integer counter. They start at the same time and both read the value as 5. Then they both increment the value and write back the result as 6. But if two processes are trying to increment a variable, the expected result would be that it gets incremented by two, so the result should be 7. Modern wisdom is that the easiest way to avoid doing this is to keep as much state as possible private and share them through known-safe constructs, such as queues. > 在没有正确同步结果的情况下访问外部变量 在一种叫做种族状况的情况下。例如,想象两个 试图递增整数计数器的并发写入。他们从 同时读取值为5。然后它们都增加 值,并将结果写回6。但是如果有两个过程 试图增加一个变量,预期的结果是它 增加2,所以结果应该是7。现代智慧 避免这样做的最简单的方法是保持尽可能多的状态 可能是私有的,并通过已知安全的结构共享它们,例如 排队。 We set up a couple variables before we get started; we'll be searching for all iles that contain the characters '.py' for this example. We have a queue of futures that we'll discuss shortly. The basedir variable points to the root of the ilesystem; '/' on Unix machines and probably C:\\ on Windows. First, let's have a short course on search theory. This algorithm implements breadth irst search in parallel. Rather than recursively searching every directory using a depth irst search, it adds all the subdirectories in the current folder to the queue, then all the subdirectories of each of those folders and so on. The meat of the program is known as an event loop. We can construct a ThreadPoolExecutor as a context manager so that it is automatically cleaned up and its threads closed when it is done. It requires a max\_workers argument to indicate the number of threads running at a time; if more than this many jobs are submitted, it queues up the rest until a worker thread becomes available. When using ProcessPoolExecutor, this is normally constrained to the number of CPUs on the machine, but with threads, it can be much higher, depending how many are waiting on I/O at a time. Each thread takes up a certain amount of memory, so it shouldn't be too high; it doesn't take all that many threads before the speed of the disk, rather than number of parallel requests, is the bottleneck. 开始之前,我们设置了几个变量;我们会搜索所有 包含字符’。这个例子的py。我们有一个未来的队列 稍后讨论。basedir变量指向ilesystem的根;/'打开 Unix机器,可能还有视窗上的C:\系统。 首先,让我们上一堂关于搜索理论的短期课程。该算法实现了广度 第一次并行搜索。而不是使用深度递归搜索每个目录 首先搜索,它将当前文件夹中的所有子目录添加到队列中,然后 每个文件夹的子目录等等。 程序的核心被称为事件循环。我们可以构建一个 线程池执行器作为上下文管理器,以便自动清理它 完成后,它的线程会关闭。它需要一个max_workers参数来 指示一次运行的线程数;如果比这更多的工作 提交后,它将剩余的线程排队,直到有一个工作线程可用。使用时 ProcessPoolExecutor,这通常受限于 机器,但是有线程,它可以更高,这取决于有多少人在等待 一次输入/输出。每个线程都占用一定量的内存,所以不应该占用 太高;磁盘速度之前不需要那么多线程,而是 并行请求的数量是瓶颈。 Once the executor has been constructed, we submit a job to it using the root directory. The submit() method immediately returns a Future object, which promises to give us a result eventually. The future is placed on the queue. The loop then repeatedly removes the irst future from the queue and inspects it. If it is still running, it gets added back to the end of the queue. Otherwise, we check if the function raised an exception with a call to future.exception(). If it did, we just ignore it (it's usually a permission error, although a real app would need to be more careful about what the exception was). If we didn't check this exception here, it would be raised when we called result() and could be handled through the normal try...except mechanism. Assuming no exception occurred, we can call result() to get the return value of the function call. Since the function returns a list of subdirectories that are not symbolic links (my lazy way of preventing an ininite loop), result() returns the same thing. These new subdirectories are submitted to the executor and the resulting futures are tossed onto the queue to have their contents searched in a later iteration. So that's all that is required to develop a future-based I/O-bound application. Under the hood, it's using the same thread or process APIs we've already discussed, but it provides a more understandable interface and makes it easier to see the boundaries between concurrently running functions (just don't try to access global variables from inside the future!). 一旦构建了执行器,我们就使用根目录向它提交一个作业。 submit()方法立即返回一个Future对象,该对象承诺给出 我们最终会有结果。未来就在队列中。然后循环重复 从队列中删除第一个未来并检查它。如果它还在运行,它会 添加回队列的末尾。否则,我们检查函数是否引发了 调用future时出现异常。异常()。如果有,我们就忽略它(通常是 权限错误,尽管真正的应用需要更加小心 例外是)。如果我们不在这里检查这个异常,当我们 调用result(),可以通过正常尝试进行处理...除了机械装置。 假设没有异常发生,我们可以调用result()来获取 函数调用。因为函数返回一个非符号子目录列表 链接(我防止站点循环的懒惰方法),结果()返回同样的内容。 这些新的子目录被提交给执行器,结果的未来是 扔到队列中,以便在以后的迭代中搜索它们的内容。 这就是开发基于未来的输入输出绑定应用程序所需要的全部。下面的 引擎盖,它使用了我们已经讨论过的相同的线程或进程APIs,但是它 提供了一个更容易理解的界面,并且更容易看到边界 在并发运行的函数之间(只是不要试图访问全局变量 从未来开始!)。 ## AsyncIO AsyncIO is the current state of the art in Python concurrent programming. It combines the concept of futures and an event loop with the coroutines we discussed in Chapter 9, The Iterator Pattern. The result is about as elegant and easy to understand as it is possible to get when writing concurrent code, though that isn't saying a lot! AsyncIO can be used for a few different concurrent tasks, but it was speciically designed for network I/O. Most networking applications, especially on the server side, spend a lot of time waiting for data to come in from the network. This can be solved by handling each client in a separate thread, but threads use up memory and other resources. AsyncIO uses coroutines instead of threads. The library also provides its own event loop, obviating the need for the several lines long while loop in the previous example. However, event loops come with a cost. When we run code in an async task on the event loop, that code must return immediately, blocking neither on I/O nor on long-running calculations. This is a minor thing when writing our own code, but it means that any standard library or third-party functions that block on I/O have to have non-blocking versions created. AsyncIO是Python并发编程的最新技术。它 将期货和事件循环的概念与我们讨论过的协程结合起来 在第9章迭代器模式中。结果是一样优雅和容易理解 因为编写并发代码时有可能得到,尽管这并不意味着什么! AsyncIO可以用于一些不同的并发任务,但是它特别 专为网络输入输出设计。大多数网络应用程序,尤其是服务器上的应用程序 另一方面,花大量时间等待数据从网络传入。这可能是 通过在单独的线程中处理每个客户端来解决,但是线程会耗尽内存 其他资源。AsyncIO使用coroutines而不是线程。 该库还提供了自己的事件循环,避免了对几个 在前面的例子中是长while循环。然而,事件循环伴随着 成本。当我们在事件循环的异步任务中运行代码时,该代码必须返回 立即,既不阻塞输入/输出,也不阻塞长期运行的计算。这是一个 在编写我们自己的代码时,这是一件小事,但这意味着任何标准库或 在输入/输出上阻塞的第三方函数必须创建非阻塞版本。 AsyncIO solves this by creating a set of coroutines that use the yield from syntax to return control to the event loop immediately. The event loop takes care of checking whether the blocking call has completed and performing any subsequent tasks, just like we did manually in the previous section. AsyncIO通过创建一组使用从语法到 立即将控制返回事件循环。事件循环负责检查 阻塞调用是否已经完成并执行任何后续任务,只是 就像我们在前一节手动做的那样。 ### AsyncIO原理 A canonical example of a blocking function is the time.sleep call. Let's use the asynchronous version of this call to illustrate the basics of an AsyncIO event loop: 阻塞函数的典型例子是time.sleep调用。让我们使用 此调用的异步版本,用于说明AsyncIO事件循环的基础: ``` import asyncio import random @asyncio.coroutine def random_sleep(counter): delay = random.random() * 5 print("{} sleeps for {:.2f} seconds".format(counter, delay)) yield from asyncio.sleep(delay) print("{} awakens".format(counter)) @asyncio.coroutine def five_sleepers(): print("Creating five tasks") tasks = [ asyncio.async(random_sleep(i)) for i in range(5)] print("Sleeping after starting five tasks") yield from asyncio.sleep(2) print("Waking and waiting for five tasks") yield from asyncio.wait(tasks) asyncio.get_event_loop().run_until_complete(five_sleepers()) print("Done five tasks") ``` This is a fairly basic example, but it covers several features of AsyncIO programming. It is easiest to understand in the order that it executes, which is more or less bottom to top. The second last line gets the event loop and instructs it to run a future until it is inished. The future in question is named five\_sleepers. Once that future has done its work, the loop will exit and our code will terminate. As asynchronous programmers, we don't need to know too much about what happens inside that run\_ until\_complete call, but be aware that a lot is going on. It's a souped up coroutine version of the futures loop we wrote in the previous chapter that knows how to deal with iteration, exceptions, function returns, parallel calls, and more. 这是一个相当基本的例子,但是它涵盖了AsyncIO编程的几个特性。 最容易理解的是它的执行顺序,大致是底部 到顶端。 最后第二行获取事件循环,并指示它运行未来,直到它 结束了。正在讨论的未来被命名为五个沉睡者。一旦未来到来 完成工作后,循环将退出,我们的代码将终止。异步 程序员,我们不需要太多了解运行中发生了什么 直到完成通话,但是要注意很多事情正在发生。这是一个加大的花冠 我们在上一章写的期货循环的版本,知道如何处理 迭代、异常、函数返回、并行调用等等。 Now look a little more closely at that five\_sleepers future. Ignore the decorator for a few paragraphs; we'll get back to it. The coroutine irst constructs ive instances of the random\_sleep future. The resulting futures are wrapped in an asyncio.async task, which adds them to the loop's task queue so they can execute concurrently when control is returned to the event loop. That control is returned whenever we call yield from. In this case, we call yield from asyncio.sleep to pause execution of this coroutine for two seconds. During this break, the event loop executes the tasks that it has queued up; namely the ive random\_sleep futures. These coroutines each print a starting message, then send control back to the event loop for a speciic amount of time. If any of the sleep calls inside random\_sleep are shorter than two seconds, the event loop passes control back into the relevant future, which prints its awakening message before returning. When the sleep call inside five\_sleepers wakes up, it executes up to the next yield from call, which waits for the remaining random\_sleep tasks to complete. When all the sleep calls have inished executing, the random\_sleep tasks return, which removes them from the event queue. Once all ive of those are completed, the asyncio.wait call and then the five\_sleepers method also return. Finally, since the event queue is now empty, the run\_until\_complete call is able to terminate and the program ends. The asyncio.coroutine decorator mostly just documents that this coroutine is meant to be used as a future in an event loop. In this case, the program would run just ine without the decorator. However, the asyncio.coroutine decorator can also be used to wrap a normal function (one that doesn't yield) so that it can be treated as a future. In this case, the entire function executes before returning control to the event loop; the decorator just forces the function to fulill the coroutine API so the event loop knows how to handle it. 现在再仔细看看五个睡眠者未来。忽略装饰者 几段;我们会回来的。协程首先构造了 随机睡眠的未来。结果期货被包装在asyncio.async中 任务,将它们添加到循环的任务队列中,以便它们可以并发执行 当控制返回到事件循环时。 每当我们调用。在这种情况下,我们称之为收益 从asyncio.sleep到暂停执行此coroutine两秒钟。在...期间 这个中断,事件循环执行它已经排队的任务;即ive 随机睡眠期货。这些协同程序每个都打印一个开始消息,然后发送 控制返回事件循环一段特定的时间。如果任何睡眠呼叫 在random_sleep短于两秒钟的时间内,事件循环将控制权传回 进入相关的未来,在返回之前打印它的觉醒信息。当...的时候 五个睡眠者中的睡眠调用醒来,它执行到下一个产出 调用,等待剩余的random_sleep任务完成。当所有的 睡眠调用已经完成执行,random_sleep任务返回,这将删除 他们来自事件队列。一旦所有这些都完成了,asyncio .等等 调用,然后五睡眠者方法也返回。最后,由于事件队列是 现在为空,run _直到_complete调用能够终止,程序结束。 asyncio.coroutine装饰器主要只是记录这个coroutine的含义 在事件循环中用作未来。在这种情况下,程序运行正常 没有装潢师。然而,也可以使用asyncio.coroutine装饰器 包装一个正常的函数(一个不屈服的函数),这样它就可以被当作一个未来。 在这种情况下,整个函数在将控制返回到事件循环之前执行;这 decorator只是强迫函数填充coroutine应用程序接口,这样事件循环就知道了 如何处理。 ### 阅读 AsyncIO Futures An AsyncIO coroutine executes each line in order until it encounters a yield from statement, at which point it returns control to the event loop. The event loop then executes any other tasks that are ready to run, including the one that the original coroutine was waiting on. Whenever that child task completes, the event loop sends the result back into the coroutine so that it can pick up executing until it encounters another yield from statement or returns. This allows us to write code that executes synchronously until we explicitly need to wait for something. This removes the nondeterministic behavior of threads, so we don't need to worry nearly so much about shared state. AsyncIO coroutine按顺序执行每一行,直到它从 语句,此时它将控制权返回给事件循环。然后事件循环 执行任何其他准备运行的任务,包括原始任务 科罗廷在等着。每当子任务完成时,事件循环都会发送 结果返回到coroutine中,以便它可以继续执行,直到遇到 报表或收益的另一个收益。 这允许我们编写同步执行的代码,直到我们明确需要 等待什么。这消除了线程的不确定行为,所以我们 不需要太担心共享状态。 It's still a good idea to avoid accessing shared state from inside a coroutine. It makes your code much easier to reason about. More importantly, even though an ideal world might have all asynchronous execution happen inside coroutines, the reality is that some futures are executed behind the scenes inside threads or processes. Stick to a "share nothing" philosophy to avoid a ton of dificult bugs. > 避免从内部访问共享状态仍然是个好主意 科罗丁。这使得您的代码更容易推理。更多 重要的是,即使一个理想的世界可能所有的都是异步的 执行发生在协同程序内部,事实是有些未来 在线程或进程内部的幕后执行。坚持 “什么都不分享”的哲学来避免大量困难的错误。 In addition, AsyncIO allows us to collect logical sections of code together inside a single coroutine, even if we are waiting for other work elsewhere. As a speciic instance, even though the yield from asyncio.sleep call in the random\_sleep coroutine is allowing a ton of stuff to happen inside the event loop, the coroutine itself looks like it's doing everything in order. This ability to read related pieces of asynchronous code without worrying about the machinery that waits for tasks to complete is the primary beneit of the AsyncIO module. 此外,AsyncIO允许我们在内部收集代码的逻辑部分 即使我们在其他地方等待其他工作,也只有一个花冠。具体来说 实例,即使asyncio.sleep在随机睡眠中调用 科罗廷允许大量的事情发生在事件循环中,科罗廷 它看起来一切都井然有序。阅读相关文章的能力 异步代码不用担心等待任务的机器 完成是AsyncIO模块的主要好处。 ### AsyncIO 网络 AsyncIO was speciically designed for use with network sockets, so let's implement a DNS server. More accurately, let's implement one extremely basic feature of a DNS server. The domain name system's basic purpose is to translate domain names, such as www.amazon.com into IP addresses such as 72.21.206.6. It has to be able to perform many types of queries and know how to contact other DNS servers if it doesn't have the answer required. We won't be implementing any of this, but the following example is able to respond directly to a standard DNS query to look up IPs for my three most recent employers: AsyncIO是专门为网络套接字设计的,所以让我们实现 域名系统服务器。更准确地说,让我们实现 DNS服务器。 域名系统的基本目的是翻译域名,例如 www.amazon.com变成了72.21.206.6这样的知识产权地址。它必须能够执行 许多类型的查询,并知道如何联系其他DNS服务器 需要答案。我们不会实现这些,但是 示例能够直接响应标准域名系统查询,为我的 三个最近的雇主: ``` import asyncio from contextlib import suppress ip_map = { b'facebook.com.': '173.252.120.6', b'yougov.com.': '213.52.133.246', b'wipo.int.': '193.5.93.80' } def lookup_dns(data): domain = b'' pointer, part_length = 13, data[12] while part_length: domain += data[pointer:pointer+part_length] + b'.' pointer += part_length + 1 part_length = data[pointer - 1] ip = ip_map.get(domain, '127.0.0.1') return domain, ip def create_response(data, ip): ba = bytearray packet = ba(data[:2]) + ba([129, 128]) + data[4:6] * 2 packet += ba(4) + data[12:] packet += ba([192, 12, 0, 1, 0, 1, 0, 0, 0, 60, 0, 4]) for x in ip.split('.'): packet.append(int(x)) return packet class DNSProtocol(asyncio.DatagramProtocol): def connection_made(self, transport): self.transport = transport def datagram_received(self, data, addr): print("Received request from {}".format(addr[0])) domain, ip = lookup_dns(data) print("Sending IP {} for {} to {}".format( domain.decode(), ip, addr[0])) self.transport.sendto( create_response(data, ip), addr) loop = asyncio.get_event_loop() transport, protocol = loop.run_until_complete( loop.create_datagram_endpoint( DNSProtocol, local_addr=('127.0.0.1', 4343))) print("DNS Server running") with suppress(KeyboardInterrupt): loop.run_forever() transport.close() loop.close() ``` This example sets up a dictionary that dumbly maps a few domains to IPv4 addresses. It is followed by two functions that extract information from a binary DNS query packet and construct the response. We won't be discussing these; if you want to know more about DNS read RFC ("request for comment", the format for deining most Internet protocols) 1034 and 1035. You can test this service by running the following command in another terminal: nslookup -port=4343 facebook.com localhost Let's get on with the entrée. AsyncIO networking revolves around the intimately linked concepts of transports and protocols. A protocol is a class that has speciic methods that are called when relevant events happen. Since DNS runs on top of UDP (User Datagram Protocol); we build our protocol class as a subclass of DatagramProtocol. This class has a variety of events that it can respond to; we are speciically interested in the initial connection occurring (solely so we can store the transport for future use) and the datagram\_received event. For DNS, each received datagram must be parsed and responded to, at which point the interaction is over. So, when a datagram is received, we process the packet, look up the IP, and construct a response using the functions we aren't talking about (they're black sheep in the family). Then we instruct the underlying transport to send the resulting packet back to the requesting client using its sendto method. The transport essentially represents a communication stream. In this case, it abstracts away all the fuss of sending and receiving data on a UDP socket on an event loop. There are similar transports for interacting with TCP sockets and subprocesses, for example. The UDP transport is constructed by calling the loop's create\_datagram\_endpoint coroutine. This constructs the appropriate UDP socket and starts listening on it. We pass it the address that the socket needs to listen on, and importantly, the protocol class we created so that the transport knows what to call when it receives data. Since the process of initializing a socket takes a non-trivial amount of time and would block the event loop, the create\_datagram\_endpoint function is a coroutine. In our example, we don't really need to do anything while we wait for this initialization, so we wrap the call in loop.run\_until\_complete. The event loop takes care of managing the future, and when it's complete, it returns a tuple of two values: the newly initialized transport and the protocol object that was constructed from the class we passed in. 本示例建立了一个字典,将几个域愚蠢地映射到IPv4 地址。接下来是从二进制文件中提取信息的两个函数 DNS查询数据包并构造响应。我们不会讨论这些;如果你 想了解更多关于域名系统的信息,请阅读“征求意见”,格式为 定义大多数互联网协议)1034和1035。 您可以通过在另一个终端中运行以下命令来测试该服务: nslookup-port = 4343 facebook.com本地主机 让我们进入正题。AsyncIO网络围绕着 传输和协议的相关概念。协议是一个具有特定 相关事件发生时调用的方法。因为域名系统运行在顶端 用户数据报协议;我们将协议类构建为的子类 DatagramProtocol。这个类有各种各样的事件可以响应;我们是 特别感兴趣的是初始连接的发生(仅仅是为了存储 供将来使用的传输)和数据报接收事件。对于域名系统,每个都已收到 数据报必须被解析和响应,此时交互结束。 因此,当收到数据报时,我们处理数据包,查找IP,并构造 使用我们没有谈到的函数的响应(它们是中的害群之马 家庭)。然后,我们指示底层传输将生成的数据包发送回来 使用sendto方法发送给请求客户端。 传输本质上代表一个通信流。在这种情况下,它抽象了 消除了在事件循环的UDP套接字上发送和接收数据的所有麻烦。 有类似的传输用于与TCP套接字和子进程交互, 例如。 UDP传输是通过调用循环的创建数据报端点来构建的 科罗丁。这将构建适当的UDP套接字并开始侦听它。我们 传递套接字需要监听的地址,更重要的是,传递协议 类,以便传输知道在接收数据时调用什么。 因为初始化套接字的过程需要大量的时间 阻止事件循环时,create_datagram_endpoint函数是一个协同函数。在我们的 例如,在等待初始化时,我们不需要做任何事情, 所以我们将调用包装在循环中。事件循环负责 管理未来,当它完成时,它返回一个由两个值组成的元组 新初始化的传输和从 我们经过的班级。 Behind the scenes, the transport has set up a task on the event loop that is listening for incoming UDP connections. All we have to do, then, is start the event loop running with the call to loop.run\_forever() so that task can process these packets. When the packets arrive, they are processed on the protocol and everything just works. The only other major thing to pay attention to is that transports (and, indeed, event loops) are supposed to be closed when we are inished with them. In this case, the code runs just ine without the two calls to close(), but if we were constructing transports on the ly (or just doing proper error handling!), we'd need to be quite a bit more conscious of it. You may have been dismayed to see how much boilerplate is required in setting up a protocol class and underlying transport. AsyncIO provides an abstraction on top of these two key concepts called streams. We'll see an example of streams in the TCP server in the next example. 在幕后,交通工具已经在事件循环上设置了一个任务 传入的UDP连接。那么,我们所要做的就是启动事件循环运行 调用loop.run _ forever(),以便任务可以处理这些数据包。当 数据包到达,它们在协议上被处理,一切正常。 唯一需要注意的另一件大事是运输(事实上,还有事件 循环)应该在我们完成它们时关闭。在这种情况下,代码 不需要调用close()就可以正常运行,但是如果我们正在构建传输 在ly上(或者只是做正确的错误处理!),我们还需要更多 意识到这一点。 你可能会沮丧地看到设置需要多少样板文件 协议类和底层传输。AsyncIO在顶部提供了一个抽象 这两个被称为流的关键概念。我们将看到一个传输控制协议中流的例子 下一个示例中的服务器。 ### 使用执行器包装块代码 AsyncIO provides its own version of the futures library to allow us to run code in a separate thread or process when there isn't an appropriate non-blocking call to be made. This essentially allows us to combine threads and processes with the asynchronous model. One of the more useful applications of this feature is to get the best of both worlds when an application has bursts of I/O-bound and CPUbound activity. The I/O-bound portions can happen in the event-loop while the CPU-intensive work can be spun off to a different process. To illustrate this, let's implement "sorting as a service" using AsyncIO: AsyncIO提供了自己版本的期货库,允许我们运行代码 在单独的线程或进程中,当没有合适的非阻塞调用时 有待制造。这本质上允许我们将线程和进程与 异步模型。这个特性的一个更有用的应用是 当应用程序具有突发的输入/输出绑定和复制绑定活动时,这是两全其美的。输入/输出绑定部分可能发生在事件循环中,而 CPU密集型的工作可以分散到不同的流程中。为了说明这一点,让我们 使用AsyncIO实现“分类即服务”: ``` import asyncio import json from concurrent.futures import ProcessPoolExecutor def sort_in_process(data): nums = json.loads(data.decode()) curr = 1 while curr < len(nums): if nums[curr] >= nums[curr-1]: curr += 1 else: nums[curr], nums[curr-1] = \ nums[curr-1], nums[curr] if curr > 1: curr -= 1 return json.dumps(nums).encode() @asyncio.coroutine def sort_request(reader, writer): print("Received connection") length = yield from reader.read(8) data = yield from reader.readexactly( int.from_bytes(length, 'big')) result = yield from asyncio.get_event_loop().run_in_executor( None, sort_in_process, data) print("Sorted list") writer.write(result) writer.close() print("Connection closed") loop = asyncio.get_event_loop() loop.set_default_executor(ProcessPoolExecutor()) server = loop.run_until_complete( asyncio.start_server(sort_request, '127.0.0.1', 2015)) print("Sort Service running") loop.run_forever() server.close() loop.run_until_complete(server.wait_closed()) loop.close() ``` This is an example of good code implementing some really stupid ideas. The whole idea of sort as a service is pretty ridiculous. Using our own sorting algorithm instead of calling Python's sorted is even worse. The algorithm we used is called gnome sort, or in some cases, "stupid sort". It is a slow sort algorithm implemented in pure Python. We deined our own protocol instead of using one of the many perfectly suitable application protocols that exist in the wild. Even the idea of using multiprocessing for parallelism might be suspect here; we still end up passing all the data into and out of the subprocesses. Sometimes, it's important to take a step back from the program you are writing and ask yourself if you are trying to meet the right goals. But let's look at some of the smart features of this design. First, we are passing bytes into and out of the subprocess. This is a lot smarter than decoding the JSON in the main process. It means the (relatively expensive) decoding can happen on a different CPU. Also, pickled JSON strings are generally smaller than pickled lists, so less data is passing between processes. 这是一个很好的代码实现一些非常愚蠢的想法的例子。整体 将排序作为服务的想法非常荒谬。而是使用我们自己的排序算法 调用Python的排序更糟糕。我们使用的算法叫做侏儒排序, 或者在某些情况下,“愚蠢的那种”。这是一种在纯Python中实现的慢速排序算法。 我们定义了自己的协议,而不是使用许多非常合适的协议之一 野外存在的应用协议。甚至使用多重处理的想法 并行性可能在这里受到怀疑;我们最终还是会将所有数据传入传出 子流程。有时候,从你的计划中后退一步是很重要的 正在写作,问自己是否在努力实现正确的目标。 但是让我们来看看这种设计的一些智能特征。首先,我们传递字节 进出子流程。这比在 主要过程。这意味着(相对昂贵的)解码可以发生在不同的地方 中央处理器。此外,腌制的JSON字符串通常比腌制列表小,因此数据较少 在进程间传递。 Second, the two methods are very linear; it looks like code is being executed one line after another. Of course, in AsyncIO, this is an illusion, but we don't have to worry about shared memory or concurrency primitives. 第二,这两种方法是非常线性的;看起来代码被执行了一行 一个接一个。当然,在安森西奥,这是一种幻觉,但我们不必担心 关于共享内存或并发原语。 ### 数据流 The previous example should look familiar by now as it has a similar boilerplate to other AsyncIO programs. However, there are a few differences. You'll notice we called start\_server instead of create\_server. This method hooks into AsyncIO's streams instead of using the underlying transport/protocol code. Instead of passing in a protocol class, we can pass in a normal coroutine, which receives reader and writer parameters. These both represent streams of bytes that can be read from and written like iles or sockets. Second, because this is a TCP server instead of UDP, there is some socket cleanup required when the program inishes. This cleanup is a blocking call, so we have to run the wait\_closed coroutine on the event loop. Streams are fairly simple to understand. Reading is a potentially blocking call so we have to call it with yield from. Writing doesn't block; it just puts the data on a queue, which AsyncIO sends out in the background. Our code inside the sort\_request method makes two read requests. First, it reads 8 bytes from the wire and converts them to an integer using big endian notation. This integer represents the number of bytes of data the client intends to send. So in the next call, to readexactly, it reads that many bytes. The difference between read and readexactly is that the former will read up to the requested number of bytes, while the latter will buffer reads until it receives all of them, or until the connection closes. 上一个例子现在看起来应该很熟悉,因为它有相似的样板文件 其他AsyncIO项目。然而,也有一些不同。你会注意到我们 调用start_server而不是create_server。这种方法与AsyncIO挂钩 而不是使用底层传输/协议代码。而不是路过 在一个协议类中,我们可以传入一个普通的coroutine,它接收读取器和 writer参数。这两者都表示可以从和读取的字节流 像文件或套接字一样编写。其次,因为这是一个TCP服务器而不是UDP, 程序完成时需要进行一些套接字清理。这次清理是 阻塞调用,所以我们必须在事件循环上运行wait_closed coroutine。 溪流很容易理解。阅读是一个潜在的障碍 我们不得不称之为屈服。写作不会阻碍;它只是把数据放在 AsyncIO在后台发送的队列。 sort_request方法中的代码发出两个读请求。首先,它是8 字节,并使用大端符号将其转换为整数。这 integer表示客户端打算发送的数据字节数。所以在 下一次调用,准确地说,它读取这么多字节。阅读和 确切地说,前者将读取请求的字节数,而 后者将缓冲读取,直到它接收到所有读取,或者直到连接关闭。 #### 执行器 Now let's look at the executor code. We import the exact same ProcessPoolExecutor that we used in the previous section. Notice that we don't need a special AsyncIO version of it. The event loop has a handy run\_in\_executor coroutine that we can use to run futures on. By default, the loop runs code in ThreadPoolExecutor, but we can pass in a different executor if we wish. Or, as we did in this example, we can set a different default when we set up the event loop by calling loop.set\_default\_ executor(). As you probably recall from the previous section, there is not a lot of boilerplate for using futures with an executor. However, when we use them with AsyncIO, there is none at all! The coroutine automatically wraps the function call in a future and submits it to the executor. Our code blocks until the future completes, while the event loop continues processing other connections, tasks, or futures. When the future is done, the coroutine wakes up and continues on to write the data back to the client. 现在让我们看看执行者代码。我们导入完全相同的ProcessPoolExecutor 我们在前一节中使用的。请注意,我们不需要特殊的海关 它的版本。事件循环有一个方便的运行执行器协同程序 用于运行期货。默认情况下,循环在线程池执行器中运行代码,但是 如果我们愿意,我们可以换一个遗嘱执行人。或者,正如我们在这个例子中所做的,我们可以 当我们通过调用loop.set_default_ 执行者()。 您可能还记得上一节,没有太多关于 和遗嘱执行人一起使用期货。然而,当我们在AsyncIO中使用它们时,会有 一个也没有!coroutine会在将来自动包装函数调用并提交 交给遗嘱执行人。我们的代码一直阻塞到未来完成,而事件循环 继续处理其他连接、任务或未来。当未来结束时, coroutine醒来,继续将数据写回客户端。 You may be wondering if, instead of running multiple processes inside an event loop, it might be better to run multiple event loops in different processes. The answer is: "maybe". However, depending on the exact problem space, we are probably better off running independent copies of a program with a single event loop than to try to coordinate everything with a master multiprocessing process. We've hit most of the high points of AsyncIO in this section, and the chapter has covered many other concurrency primitives. Concurrency is a hard problem to solve, and no one solution its all use cases. The most important part of designing a concurrent system is deciding which of the available tools is the correct one to use for the problem. We have seen advantages and disadvantages of several concurrent systems, and now have some insights into which are the better choices for different types of requirements. 您可能会想,与其在一个事件中运行多个进程,不如 循环,最好在不同的进程中运行多个事件循环。答案是 是:“也许”。然而,根据具体的问题空间,我们可能会更好 使用单个事件循环运行程序的独立副本 用主多处理过程协调一切。 我们已经触及了这一部分的绝大多数要点,这一章 涵盖了许多其他并发原语。并发是一个很难解决的问题 解决,没有一个解决方案是所有用例。设计一个 并发系统正在决定使用哪种可用的工具是正确的 为了这个问题。我们已经看到了几种并发的优点和缺点 系统,现在已经有了一些见解,知道哪些是不同的更好的选择 需求类型。 ## 个案研究 ## 摘要