Python 的 2 个进程池相关 API
Intro
作为一名算法工程师,在我的工作中经常都会写各种 Python 脚本来处理大量数据,做数据清洗、信息提取等。通常情况下,这些数据的处理并不涉及竞争条件(Race Condition),而是简单的数据并行(Data Parallel),属于 CPU 密集型任务。通常情况下,这样的任务可以被抽象为 map(fn, data)
的模式
Python 因为存在 GIL 锁,这种情况下,采用多线程并不能提高处理的效率,而是应该采用多进程。另外,一般情况下写代码的时候都是直接创建进程池然后,今天本文要讲的就是 2 种进程池的 API:multiprocessing.Pool
和 concurrent.futures.ProcessPoolExecutor
,并进行一些简单的对比
在开始之前,先定义一个处理函数 task
以及导入必要的库,并假设它是一个计算密集的函数。这里用 time.sleep
随机暂停 1 ~ 3
秒假装它很耗时
import time
import random
import multiprocessing as mp
from concurrent import futures
def task(x: int, y: int) -> int:
# Let's assume it's a heavy task
time.sleep(random.randint(1, 5))
return x + y
并假设我们要处理的输入是 (1, 1), (2, 2), (3, 3), (4, 4)
,这可以用 xs
和 ys
构造出来
xs = range(1, 5)
ys = range(1, 5)
multiprocessing.Pool
multiprocessing
库提供了进程池的实现 Pool
,通过 processes
参数可以设置进程池的大小。multiprocessing.Pool
对象有很多的方法,比如 map, imap, apply
等。在本文的场景下,最适合的函数是 starmap
方法
def mp_starmap():
with mp.Pool(processes=4) as pool:
for result in pool.starmap(task, zip(xs, ys)):
print(f"{result=}")
if __name__ == "__main__":
mp_starmap()
如果 task
的入参只有 1 个,那么就用 pool.map
方法
concurrent.futures.ProcessPoolExecutor
从 Python 3.2 起,多了一个 concurrent.futures
包,里面也提供了 1 个进程池的实现,叫做 ProcessPoolExecutor
1
def exetutor_map():
with futures.ProcessPoolExecutor(max_workers=4) as exetutor:
for result in exetutor.map(task, xs, ys):
print(f"{result=}")
if __name__ == "__main__":
exetutor_map()
可以看到,代码几乎和 multiprocessing.Pool
的实现差不多,只是在名字和参数上换了一下。一个比较明显的区别是,executor.map
方法传入多个参数的时候更加自然,不需要再自己 zip
一下
executor.map(fn, *iterables)
的原理是用 future 对象包裹 fn(*iterables)
,它会返回一个关于 future 对象的生成器,当遍历这个生成器获取返回结果的时候,会调用每个 future 对象的 .result()
方法阻塞等待直到结果返回
如果你不想要用 executor.map
,也可以自己进行异步任务的提交和结果获取,需要用到 executor.submit
方法和 futures.as_completed
方法
executor.submit(fn, *args, **kwargs)
,提交fn(*args, **kwargs)
任务,它会将其变成一个 future 对象futures.as_completed(futures)
,参数是可迭代的 future 对象列表,它会返回一个迭代器,当底层的 future 对象执行完成之后会一一返回。注意它返回的是一个个 future 对象,所以还需要自己用.result()
获取每个 future 对象包裹的可调用对象返回的值
整体代码如下
def exetutor_submit_completed():
with futures.ProcessPoolExecutor() as exetutor:
todo = [exetutor.submit(task, x, y) for x, y in zip(xs, ys)]
for future in futures.as_completed(todo):
print(f"result={future.result()}")
if __name__ == "__main__":
exetutor_submit_completed()
手动 submit
有个不足是 as_completed
返回的 future 对象的顺序跟 submit
提交任务的顺序不匹配。这种情况下,经常会用一个字典来记住入参和 futures 对象之间的映射关系,如下所示
def exetutor_submit_completed_enhanced():
result = {}
with futures.ProcessPoolExecutor() as exetutor:
todo = {exetutor.submit(task, x, y): (x, y) for x, y in zip(xs, ys)}
for future in futures.as_completed(todo):
input_args = todo[future]
result[input_args] = future.result()
print(f"result={future.result()}")
if __name__ == "__main__":
exetutor_submit_completed_enhanced()
用哪一个?
就我个人观点,更推荐使用 concurrent.futures.ProcessPoolExecutor
,主要是出于下面几个原因
第一,executor.map
跟自带的 map
一样,都是 lazy 的,两者返回的都是 Python Generator。但 pool.map
是 eager 的,要用 lazy 的得换成 pool.imap
第二,当 task
函数有多个入参的时候(正如本文的例子),executor.map(task, args1, args2)
比 pool.starmap(task, zip(args1, args2))
语法上要优美一些。而且这两者其实是不对等的,一个是 lazy 的,一个是 eager 的,对等的 API 应该是 pool.istarmap
,但这个 API 并不存在。但可以通过 pool.imap
进行模拟,只是需要额外定义一个 helper 函数
def task_helper(seq):
return task(*seq)
def mp_imap():
with mp.Pool() as pool:
for result in pool.imap(task_helper, zip(xs, ys)):
print(f"{result=}")
if __name__ == "__main__":
mp_imap()
第三,可能也是最重要的一点。在使用 multiprocessing.Pool
的时候,如果其中一个进程被 kill
了,multiprocessing.Pool
会一直卡住不动。但如果是 concurrent.futures.ProcessPoolExecutor
,会立刻报一个异常,并显示如下的错误信息
concurrent.futures.process.BrokenProcessPool: A process in the process pool was\
terminated abruptly while the future was running or pending.
总结
用 concurrent.futures.ProcessPoolExecutor
实现进程池