Python – paralellizing CPU-bound tasks with concurrent.futures

January 16th, 2013 at 5:50 am

A year ago, I wrote a series of posts about using the Python multiprocessing module. One of the posts contrasted compute-intensive task parallelization using threads vs. processes. Today I want to revisit that topic, this time employing the concurrent.futures module which is part of the standard library since Python 3.2

First of all, what are "futures"? The Wikipedia page says:

In computer science, future, promise, and delay refer to constructs used for synchronizing in some concurrent programming languages. They describe an object that acts as a proxy for a result that is initially unknown, usually because the computation of its value is yet incomplete.

I wouldn’t say "futures" is the best name choice, but this is what we’re stuck with, so let’s move on. Futures are actually a very nice tool that helps bridge an annoying gap that always exists in concurrent execution – the gap between launching some computation concurrently and obtaining the result of that computation. As my previous post showed, one of the common ways to deal with this gap is to pass a synchronized Queue object into every worker process (or thread) and then collect the results once the workers are done. Futures make this much easier and more elegant, as we’ll see.

For completeness, here is the computation we’re going to apply in parallel over a large amount of inputs:

def factorize_naive(n):
    """ A naive factorization method. Take integer 'n', return list of
        factors.
    """
    if n < 2:
        return []
    factors = []
    p = 2

    while True:
        if n == 1:
            return factors

        r = n % p
        if r == 0:
            factors.append(p)
            n = n // p
        elif p * p >= n:
            factors.append(n)
            return factors
        elif p > 2:
            # Advance in steps of 2 over odd numbers
            p += 2
        else:
            # If p == 2, get to 3
            p += 1
    assert False, "unreachable"

And here’s the first attempt at doing that with concurrent.futures:

from concurrent.futures import ProcessPoolExecutor, as_completed

def chunked_worker(nums):
    """ Factorize a list of numbers, returning a num:factors mapping.
    """
    return {n: factorize_naive(n) for n in nums}


def pool_factorizer_chunked(nums, nprocs):
    # Manually divide the task to chunks of equal length, submitting each
    # chunk to the pool.
    chunksize = int(math.ceil(len(nums) / float(nprocs)))
    futures = []

    with ProcessPoolExecutor() as executor:
        for i in range(nprocs):
            chunk = nums[(chunksize * i) : (chunksize * (i + 1))]
            futures.append(executor.submit(chunked_worker, chunk))

    resultdict = {}
    for f in as_completed(futures):
        resultdict.update(f.result())
    return resultdict

The end result of pool_factorizer_chunked is a dictionary mapping numbers to lists of their factors. The most interesting thing to note here is this: the function run in a worker process (chunked_worker in this case) can simply return a value. For each such "call" (submission to the executor), a future is returned. This future encapsulates the result of the execution, which is probably not ready immediately but will be at some point. The concurrent.futures.as_completed helper allows to simply wait on all futures and yield the results of those that are done, whenever they’re done.

It’s easy to see that this code is conceptually simpler than manually launching the processes, passing some sort of synchronization queues to workers and collecting results. This, IMHO, is the main goal of futures. Futures aren’t there to make your code faster, they’re there to make it simpler. And any simplification is a blessing when parallel programming is concerned.

Note also that ProcessPoolExecutor is used as a context manager – this makes process cleanup automatic and reliable. For more fine grained control, it has a shutdown method that can be called manually.

There’s more. Since the concurrent.futures module allows to simply return a value from a concurrent call, it has another tool to make computations like the above even simpler – Executor.map. Here’s the same task rewritten with map:

def pool_factorizer_map(nums, nprocs):
    # Let the executor divide the work among processes by using 'map'.
    with ProcessPoolExecutor(max_workers=nprocs) as executor:
        return {num:factors for num, factors in
                                zip(nums,
                                    executor.map(factorize_naive, nums))}

Amazingly, this is it. This small function (a potential 2-liner, if not the wrapping for readability) creates a process pool, submits a bunch of tasks to it, collects all the results when they’re ready, puts them into a single dictionary and returns it.

As for performance, the second method is also a bit faster in my benchmarks. I think this makes sense because the manual division to chunks doesn’t take into account which chunks will take longer and the division of work between workers may be unbalanced. The map method keeps a pool of workers to which it submits new computations when they’re ready, which means that the workers are all kept busy until everything is done.

To conclude, I strongly recommend using concurrent.futures whenever the possibility presents itself. They’re much simpler conceptually, and hence less error prone, than the manual method of creating processes and keeping track of their results. In practice, a future is a nice way to convey the result of a computation from a process producing it to a process consuming it. It’s like creating a result queue manually, but with a lot of useful semantics implemented. Futures can be polled, cancelled, provide useful access to exceptions and have callbacks attached to them. My examples here are simplistic and don’t show how to use the cooler features, but if you need them – the documentation is pretty good.

Related posts:

  1. Python – parallelizing CPU-bound tasks with multiprocessing
  2. Distributed computing in Python with multiprocessing
  3. tasks that stick
  4. perl master, C++ slave, bound for serial port programming
  5. Free and bound variables in Lisp

12 Responses to “Python – paralellizing CPU-bound tasks with concurrent.futures”

  1. xavierNo Gravatar Says:

    Talking about python and performances, I guess you know numba :
    http://numba.pydata.org/
    unladen swallow was a dead end but I think that numba got it right.

  2. Matěj CeplNo Gravatar Says:

    What are your opinions on actors? I haven’t managed to learn enough about them to make them useful for me, but it seems to me so far, that they are a proper abstraction for something which concurrent.futures are just partial solution of (yes, inside actors use futures as well). And yes, there is pykka (which is a library for actors in Python). I would be interested in your opinion.

    Thank you very much,

    Matěj

  3. elibenNo Gravatar Says:

    @Matěj,

    I don’t really have an opinion about actors since I haven’t used them. I tried to stick to the standard lib, and ventured outside only to Twisted.

  4. Xavier LapointeNo Gravatar Says:

    Just thought it would worth mentioning that it’s available in python 2.7 as well (backport) http://pypi.python.org/pypi/futures

    Great post!

  5. Daniel From ParisNo Gravatar Says:

    Hi Eli,

    I am impressed by the brevity-and-clarity of your demo. As always!

    Currently prudently starting the rewrite in python+wxpython of an application that may (or may not) be cpu-intensive but certainly cpu-bound, I’d like to try to use an effective way to harness the power of the current cpu(s) without diving into to highly complex synchronisation code.

    I definitely hate the idea of sharing code on a multi-thread basis since the focus of this app is definitely not performance.

    However since 3.x should be available soon within wxpython, concurrent.futures will be as well.

    This application does not requires massively parallel code but could possibly take advantage of a clear cut between a UI process and a calculation one!

    I would definitely like the idea of using your code, at least for a start, as a an entry point. Would that make sense in your opinion?

  6. elibenNo Gravatar Says:

    @Daniel,

    This very much depends on the application, of course. As my earlier posts discuss, in Python multi-processing is the only way you can really parallelize cpu-bound computations. Threads are not an option.

  7. Matěj CeplNo Gravatar Says:

    @Daniel you are aware that the module concurrent.futures has been backported to python 2.[5-7], aren’t you? (http://pypi.python.org/pypi/futures/)

  8. RobNo Gravatar Says:

    Hi guys, great post! I have a question. Is it possible for a job to return a result that is a custom object? The only examples I have found online are for the case that future.result() is a simple type like an int… In the context of this example, what if factorize_naive was to return a custom object? I have tried this and I get the error “TypeError: ‘method’ object is not subscriptable” …. is it possible to return custom objects or is it only possible to return simple types? Thanks! Rob.

  9. elibenNo Gravatar Says:

    @Rob, the workers in my example return dicts. You should be able to return other kinds of objects too.

  10. RobNo Gravatar Says:

    @eliben … thanks… yes I have it working now. PS suppose that I want to fit a regression model to a dataset, and I want each process to fit a regression model to a bootstrap resampling of the original data set…. so I am fitting N models over N processes …. do you guys have any advice for how I should share the data across processes? I have it developed a working example already… However, I want to better understand the problem so that I can design the code to be as efficient as possible.

  11. sarahNo Gravatar Says:

    Hello
    Just thought it would worth mentioning that it’s available in python 2.7 as well
    thanks for posting.

  12. Juan LuisNo Gravatar Says:

    Hello, I’m reading your articles on paralellizing CPU bound tasks and I don’t really see the advantage on using concurrent.future.ProcessPoolExecutor against multiprocessing.Pool. The syntax is almost the same, at least for simple cases like this one:

    https://gist.github.com/Juanlu001/6470913

    Yet the performance of pool_futures.py is way lower than pool_multiprocessing.py in my computer:

    [juanlu@nebulae]─[~/Development/Python/test]
    └[00:49:07] $ time python pool_multiprocessing.py

    real 0m0.205s
    user 0m0.230s
    sys 0m0.023s
    [juanlu@nebulae]─[~/Development/Python/test]
    └[00:49:11] $ time python pool_futures.py

    real 0m2.368s
    user 0m3.380s
    sys 0m0.507s

    What do you think? Thanks in advance.

    (I made this exact comment here http://code.saghul.net/index.php/2012/09/07/trying-out-concurrent-futures/)

Leave a Reply

To post code with preserved formatting, enclose it in `backticks` (even multiple lines)