Two different APIs related to Process Pool in Python
Intro
As a machine learning engineer, I work with vast amounts of data, performing tasks such as data cleaning and information extraction. These tasks are typically data-parallel and do not involve race conditions. Such workloads are usually referred to as CPU-intensive tasks. Typically, these tasks can be formulated as map(fn, data)
.
Python is limited by its Global Interpreter Lock (GIL). As a result, using multithreading could not improve the performance. Instead, multiprocessing should be used. Usually, you would not manually manage all the launched processes but would instead use a process pool. Today, I’d like to discuss two different APIs related to multiprocessing: multiprocessing.Pool
and concurrent.futures.ProcessPoolExecutor
, and provide a simple comparison.
Before we get started, let’s define a function called task
and import all the modules we need. We assume that the task
function is computationally expansive and uses time.sleep
to simulate its workload.
import time
import random
import multiprocessing as mp
from concurrent import futures
def task(x: int, y: int) -> int:
# Let's assume it's a heavy task
time.sleep(random.randint(1, 5))
return x + y
The arguments of the task
function are (1, 1), (2, 2), (3, 3), (4, 4)
which can be constructed by xs
and ys
.
xs = range(1, 5)
ys = range(1, 5)
multiprocessing.Pool
The multiprocessing
library provides a process pool implementation called Pool
. By setting the processes
parameter we can control the size of the process pool. multiprocessing.Pool
has many methods available, such as map, imap, apply
. In our example, the most suitable method would be starmap
, as shown below.
def mp_starmap():
with mp.Pool(processes=4) as pool:
for result in pool.starmap(task, zip(xs, ys)):
print(f"{result=}")
if __name__ == "__main__":
mp_starmap()
If the task
function has single argument, the pool.map
method should be used instead.
concurrent.futures.ProcessPoolExecutor
Starting from Python 3.2, the concurrent.futures
module has been added to the official libraries. It also provides a process pool implementation called ProcexsPoolExecutor
1.
def exetutor_map():
with futures.ProcessPoolExecutor(max_workers=4) as exetutor:
for result in exetutor.map(task, xs, ys):
print(f"{result=}")
if __name__ == "__main__":
exetutor_map()
From the two different implementations with the same functionality, we can see that they are quite similar. One noticeable difference is that when passing multiple arguments to the exetutor.map
method, it feels more natural and doesn’t require manually using zip
.
Behind the scenes, executor.map(fn, *iterables)
will wrap each function call (fn(*iterables)
in a future object and return a generator. When we iterate over the generator, it calls the .result()
method of each future object and blocks until the result is available.
If you prefer to manually submit the asynchronous tasks and retrieve results, you can use the executor.submit
and futures.as_completed
methods.
executor.submit(fn, *args, **kwargs)
submits the taskfn(*args, **kwargs)
and it will transform it to a future object.futures.as_completed(futures)
takes iterable future objects and returns an iterator that yields each future object as it completes. Note that you must call the.result()
method on each future object to retrieve its result, as the iterator returns the future object itself.
See the code below.
def exetutor_submit_completed():
with futures.ProcessPoolExecutor() as exetutor:
todo = [exetutor.submit(task, x, y) for x, y in zip(xs, ys)]
for future in futures.as_completed(todo):
print(f"result={future.result()}")
if __name__ == "__main__":
exetutor_submit_completed()
One pitfall is that the return order of future objects of as_completed
does not match the order of the submitted tasks. Therefore, a common programming idiom is to use a dictionary to maintain this mapping manually, as shown below.
def exetutor_submit_completed_enhanced():
result = {}
with futures.ProcessPoolExecutor() as exetutor:
todo = {exetutor.submit(task, x, y): (x, y) for x, y in zip(xs, ys)}
for future in futures.as_completed(todo):
input_args = todo[future]
result[input_args] = future.result()
print(f"result={future.result()}")
if __name__ == "__main__":
exetutor_submit_completed_enhanced()
Which one to use?
In my opinion, the concurrent.futures.ProcessPoolExecutor
should be used for the following reasons.
First, exetutor.map
has similar semantics to the built-in map
function, as both return a generator - meaning they are lazy. However, pool.map
uses eager evaluation, and you need pool.imap
to achieve a lazy version.
Second, when the task
function takes multiple arguments (as shown in our example), executor.map(task, args1, args2)
is more elegant than pool.starmap(task, zip(args1, args2))
. Moreover, these two APIs have different semantics: the former is lazy, while the latter is eager. A potential counterpart should be pool.istarmap
, but it does not exist at all. Although we could write a tedious helper function and use pool.imap
like this
def task_helper(seq):
return task(*seq)
def mp_imap():
with mp.Pool() as pool:
for result in pool.imap(task_helper, zip(xs, ys)):
print(f"{result=}")
if __name__ == "__main__":
mp_imap()
Third, if a process is killed for some reason, the program using multiprocessing.Pool
will hang indefinitely. However, with concurrent.futures.ProcessPoolExecutor
, you would get a nice error message:
concurrent.futures.process.BrokenProcessPool: A process in the process pool was\
terminated abruptly while the future was running or pending.
Takeaway
Prefer concurrent.futures.ProcessPoolExecutor
when creating a process pool.