Two different APIs related to Process Pool in Python

As a machine learning engineer, I work with vast amounts of data, performing tasks such as data cleaning and information extraction. These tasks are typically data-parallel and do not involve race conditions. Such workloads are usually referred to as CPU-intensive tasks. Typically, these tasks can be formulated as map(fn, data).

Python is limited by its Global Interpreter Lock (GIL). As a result, using multithreading could not improve the performance. Instead, multiprocessing should be used. Usually, you would not manually manage all the launched processes but would instead use a process pool. Today, I’d like to discuss two different APIs related to multiprocessing: multiprocessing.Pool and concurrent.futures.ProcessPoolExecutor, and provide a simple comparison.

Before we get started, let’s define a function called task and import all the modules we need. We assume that the task function is computationally expansive and uses time.sleep to simulate its workload.

python

import time
import random
import multiprocessing as mp
from concurrent import futures


def task(x: int, y: int) -> int:
    # Let's assume it's a heavy task
    time.sleep(random.randint(1, 5))
    return x + y

The arguments of the task function are (1, 1), (2, 2), (3, 3), (4, 4) which can be constructed by xs and ys.

python

xs = range(1, 5)
ys = range(1, 5)

The multiprocessing library provides a process pool implementation called Pool. By setting the processes parameter we can control the size of the process pool. multiprocessing.Pool has many methods available, such as map, imap, apply. In our example, the most suitable method would be starmap, as shown below.

python

def mp_starmap():
    with mp.Pool(processes=4) as pool:
        for result in pool.starmap(task, zip(xs, ys)):
            print(f"{result=}")


if __name__ == "__main__":
    mp_starmap()
Tip

If the task function has single argument, the pool.map method should be used instead.

Starting from Python 3.2, the concurrent.futures module has been added to the official libraries. It also provides a process pool implementation called ProcexsPoolExecutor1.

python

def exetutor_map():
    with futures.ProcessPoolExecutor(max_workers=4) as exetutor:
        for result in exetutor.map(task, xs, ys):
            print(f"{result=}")

if __name__ == "__main__":
    exetutor_map()

From the two different implementations with the same functionality, we can see that they are quite similar. One noticeable difference is that when passing multiple arguments to the exetutor.map method, it feels more natural and doesn’t require manually using zip.

Tip

Behind the scenes, executor.map(fn, *iterables) will wrap each function call (fn(*iterables) in a future object and return a generator. When we iterate over the generator, it calls the .result() method of each future object and blocks until the result is available.

If you prefer to manually submit the asynchronous tasks and retrieve results, you can use the executor.submit and futures.as_completed methods.

  • executor.submit(fn, *args, **kwargs) submits the task fn(*args, **kwargs) and it will transform it to a future object.
  • futures.as_completed(futures) takes iterable future objects and returns an iterator that yields each future object as it completes. Note that you must call the .result() method on each future object to retrieve its result, as the iterator returns the future object itself.

See the code below.

python

def exetutor_submit_completed():
    with futures.ProcessPoolExecutor() as exetutor:
        todo = [exetutor.submit(task, x, y) for x, y in zip(xs, ys)]

        for future in futures.as_completed(todo):
            print(f"result={future.result()}")


if __name__ == "__main__":
    exetutor_submit_completed()

One pitfall is that the return order of future objects of as_completed does not match the order of the submitted tasks. Therefore, a common programming idiom is to use a dictionary to maintain this mapping manually, as shown below.

python

def exetutor_submit_completed_enhanced():
    result = {}
    with futures.ProcessPoolExecutor() as exetutor:
        todo = {exetutor.submit(task, x, y): (x, y) for x, y in zip(xs, ys)}

        for future in futures.as_completed(todo):
            input_args = todo[future]
            result[input_args] = future.result()
            print(f"result={future.result()}")


if __name__ == "__main__":
    exetutor_submit_completed_enhanced()

In my opinion, the concurrent.futures.ProcessPoolExecutor should be used for the following reasons.

First, exetutor.map has similar semantics to the built-in map function, as both return a generator - meaning they are lazy. However, pool.map uses eager evaluation, and you need pool.imap to achieve a lazy version.

Second, when the task function takes multiple arguments (as shown in our example), executor.map(task, args1, args2) is more elegant than pool.starmap(task, zip(args1, args2)). Moreover, these two APIs have different semantics: the former is lazy, while the latter is eager. A potential counterpart should be pool.istarmap, but it does not exist at all. Although we could write a tedious helper function and use pool.imap like this

python

def task_helper(seq):
    return task(*seq)

def mp_imap():
    with mp.Pool() as pool:
        for result in pool.imap(task_helper, zip(xs, ys)):
            print(f"{result=}")


if __name__ == "__main__":
    mp_imap()

Third, if a process is killed for some reason, the program using multiprocessing.Pool will hang indefinitely. However, with concurrent.futures.ProcessPoolExecutor, you would get a nice error message:

text

concurrent.futures.process.BrokenProcessPool: A process in the process pool was\
terminated abruptly while the future was running or pending.

Prefer concurrent.futures.ProcessPoolExecutor when creating a process pool.