Python 的 2 个进程池相关 API

作为一名算法工程师,在我的工作中经常都会写各种 Python 脚本来处理大量数据,做数据清洗、信息提取等。通常情况下,这些数据的处理并不涉及竞争条件(Race Condition),而是简单的数据并行(Data Parallel),属于 CPU 密集型任务。通常情况下,这样的任务可以被抽象为 map(fn, data) 的模式

Python 因为存在 GIL 锁,这种情况下,采用多线程并不能提高处理的效率,而是应该采用多进程。另外,一般情况下写代码的时候都是直接创建进程池然后,今天本文要讲的就是 2 种进程池的 API:multiprocessing.Poolconcurrent.futures.ProcessPoolExecutor,并进行一些简单的对比

在开始之前,先定义一个处理函数 task 以及导入必要的库,并假设它是一个计算密集的函数。这里用 time.sleep 随机暂停 1 ~ 3 秒假装它很耗时

python

import time
import random
import multiprocessing as mp
from concurrent import futures


def task(x: int, y: int) -> int:
    # Let's assume it's a heavy task
    time.sleep(random.randint(1, 5))
    return x + y

并假设我们要处理的输入是 (1, 1), (2, 2), (3, 3), (4, 4),这可以用 xsys 构造出来

python

xs = range(1, 5)
ys = range(1, 5)

multiprocessing 库提供了进程池的实现 Pool,通过 processes 参数可以设置进程池的大小。multiprocessing.Pool 对象有很多的方法,比如 map, imap, apply 等。在本文的场景下,最适合的函数是 starmap 方法

python

def mp_starmap():
    with mp.Pool(processes=4) as pool:
        for result in pool.starmap(task, zip(xs, ys)):
            print(f"{result=}")


if __name__ == "__main__":
    mp_starmap()
Tip

如果 task 的入参只有 1 个,那么就用 pool.map 方法

从 Python 3.2 起,多了一个 concurrent.futures 包,里面也提供了 1 个进程池的实现,叫做 ProcessPoolExecutor1

python

def exetutor_map():
    with futures.ProcessPoolExecutor(max_workers=4) as exetutor:
        for result in exetutor.map(task, xs, ys):
            print(f"{result=}")

if __name__ == "__main__":
    exetutor_map()

可以看到,代码几乎和 multiprocessing.Pool 的实现差不多,只是在名字和参数上换了一下。一个比较明显的区别是,executor.map 方法传入多个参数的时候更加自然,不需要再自己 zip 一下

Tip

executor.map(fn, *iterables) 的原理是用 future 对象包裹 fn(*iterables),它会返回一个关于 future 对象的生成器,当遍历这个生成器获取返回结果的时候,会调用每个 future 对象的 .result() 方法阻塞等待直到结果返回

如果你不想要用 executor.map,也可以自己进行异步任务的提交和结果获取,需要用到 executor.submit 方法和 futures.as_completed 方法

  • executor.submit(fn, *args, **kwargs),提交 fn(*args, **kwargs) 任务,它会将其变成一个 future 对象
  • futures.as_completed(futures),参数是可迭代的 future 对象列表,它会返回一个迭代器,当底层的 future 对象执行完成之后会一一返回。注意它返回的是一个个 future 对象,所以还需要自己用 .result() 获取每个 future 对象包裹的可调用对象返回的值

整体代码如下

python

def exetutor_submit_completed():
    with futures.ProcessPoolExecutor() as exetutor:
        todo = [exetutor.submit(task, x, y) for x, y in zip(xs, ys)]

        for future in futures.as_completed(todo):
            print(f"result={future.result()}")


if __name__ == "__main__":
    exetutor_submit_completed()

手动 submit 有个不足是 as_completed 返回的 future 对象的顺序跟 submit 提交任务的顺序不匹配。这种情况下,经常会用一个字典来记住入参和 futures 对象之间的映射关系,如下所示

python

def exetutor_submit_completed_enhanced():
    result = {}
    with futures.ProcessPoolExecutor() as exetutor:
        todo = {exetutor.submit(task, x, y): (x, y) for x, y in zip(xs, ys)}

        for future in futures.as_completed(todo):
            input_args = todo[future]
            result[input_args] = future.result()
            print(f"result={future.result()}")


if __name__ == "__main__":
    exetutor_submit_completed_enhanced()

就我个人观点,更推荐使用 concurrent.futures.ProcessPoolExecutor,主要是出于下面几个原因

第一,executor.map 跟自带的 map 一样,都是 lazy 的,两者返回的都是 Python Generator。但 pool.map 是 eager 的,要用 lazy 的得换成 pool.imap

第二,当 task 函数有多个入参的时候(正如本文的例子),executor.map(task, args1, args2)pool.starmap(task, zip(args1, args2)) 语法上要优美一些。而且这两者其实是不对等的,一个是 lazy 的,一个是 eager 的,对等的 API 应该是 pool.istarmap,但这个 API 并不存在。但可以通过 pool.imap 进行模拟,只是需要额外定义一个 helper 函数

python

def task_helper(seq):
    return task(*seq)

def mp_imap():
    with mp.Pool() as pool:
        for result in pool.imap(task_helper, zip(xs, ys)):
            print(f"{result=}")


if __name__ == "__main__":
    mp_imap()

第三,可能也是最重要的一点。在使用 multiprocessing.Pool 的时候,如果其中一个进程被 kill 了,multiprocessing.Pool一直卡住不动。但如果是 concurrent.futures.ProcessPoolExecutor,会立刻报一个异常,并显示如下的错误信息

text

concurrent.futures.process.BrokenProcessPool: A process in the process pool was\
terminated abruptly while the future was running or pending.

concurrent.futures.ProcessPoolExecutor 实现进程池