Distributed computing in Python with multiprocessing

January 24th, 2012 at 5:23 am

In the previous post, I discussed how the multiprocessing package can be used to run CPU-bound computation tasks in parallel on a multi-core machine. But the utility of multiprocessing doesn’t end here. It can also be used to run computations distributed over several machines.

This enters the exciting domain of distributed computing. There are many tools available for addressing various aspects of this domain, but here I want to specifically focus on what Python offers right in the standard library, with multiprocessing. The part of the package that makes distributed computing possible is called "managers".

The documentation of multiprocessing.managers leaves something to be desired. It’s not entirely clear what the full capabilities of this tool are from just skimming the docs. For example, it starts by saying:

Managers provide a way to create data which can be shared between different processes. A manager object controls a server process which manages shared objects. Other processes can access the shared objects by using proxies.

Which is somewhat confusing, since multiprocessing already has synchronization primitives available without using managers (for example Value and Lock). So why are managers needed?

For two main reasons:

  1. Managers provide additional synchronization tools, such as a list or a dictionary that can be shared between processes.
  2. Managers allow their synchronized objects to be used between processes running across a network, and not just on the same machine. This is why, for example, managers also provide a Lock, which at first sight appears to be a duplication of the multiprocessing.Lock. It isn’t a duplication, because multiprocessing.Lock is only available for processes running on the same machine, while the multiprocessing.SyncManager.Lock can be shared across machines (which is why it’s also slower).

I don’t want to delve too far into the synchronization primitives, and instead focus on the distributing computing made possible by managers.

The task will be the same as before – factoring lists of numbers. The worker function is:

def factorizer_worker(job_q, result_q):
    """ A worker function to be launched in a separate process. Takes jobs from
        job_q - each job a list of numbers to factorize. When the job is done,
        the result (dict mapping number -> list of factors) is placed into
        result_q. Runs until job_q is empty.
    """
    while True:
        try:
            job = job_q.get_nowait()
            outdict = {n: factorize_naive(n) for n in job}
            result_q.put(outdict)
        except Queue.Empty:
            return

This worker runs in a single process on some machine. All it cares about is that it has a queue of "jobs" to look at, and a queue of results to write into. Each job in the queue is a list of numbers to factorize. Once the worker has finished factoring these numbers, it puts a result dict into the result queue. The worker stops and returns when it notices that the job queue is empty. That’s it.

Adding an abstraction level, here’s how this worker can be used:

def mp_factorizer(shared_job_q, shared_result_q, nprocs):
    """ Split the work with jobs in shared_job_q and results in
        shared_result_q into several processes. Launch each process with
        factorizer_worker as the worker function, and wait until all are
        finished.
    """
    procs = []
    for i in range(nprocs):
        p = multiprocessing.Process(
                target=factorizer_worker,
                args=(shared_job_q, shared_result_q))
        procs.append(p)
        p.start()

    for p in procs:
        p.join()

mp_factorizer takes the same pair of queues and the number of processes to create. It then uses multiprocessing.Process to spawn several workers, each into a process of its own. When all the workers are done, mp_factorizer exits. Note how this code is still independent of where it actually executes – its interface with the world is via the job and result queues.

There’s nothing really new here, so let’s get to the interesting stuff – starting a server that manages the shared queues:

def runserver():
    # Start a shared manager server and access its queues
    manager = make_server_manager(PORTNUM, AUTHKEY)
    shared_job_q = manager.get_job_q()
    shared_result_q = manager.get_result_q()

    N = 999
    nums = make_nums(N)

    # The numbers are split into chunks. Each chunk is pushed into the job
    # queue.
    chunksize = 43
    for i in range(0, len(nums), chunksize):
        shared_job_q.put(nums[i:i + chunksize])

    # Wait until all results are ready in shared_result_q
    numresults = 0
    resultdict = {}
    while numresults < N:
        outdict = shared_result_q.get()
        resultdict.update(outdict)
        numresults += len(outdict)

    # Sleep a bit before shutting down the server - to give clients time to
    # realize the job queue is empty and exit in an orderly way.
    time.sleep(2)
    manager.shutdown()

What this code does is:

  1. Create the manager (which actually starts the server running in the background) – more on this step later
  2. Generate some input numbers and break them to chunks
  3. Feed the job queue with chunks of numbers for the workers to churn on
  4. Wait until the expected amount of results has been placed in the result queue
  5. Shut down the server and exit

Note that no computation is actually performed in the server – it just manages the sharing for clients. And this is make_server_manager:

def make_server_manager(port, authkey):
    """ Create a manager for the server, listening on the given port.
        Return a manager object with get_job_q and get_result_q methods.
    """
    job_q = Queue.Queue()
    result_q = Queue.Queue()

    # This is based on the examples in the official docs of multiprocessing.
    # get_{job|result}_q return synchronized proxies for the actual Queue
    # objects.
    class JobQueueManager(SyncManager):
        pass

    JobQueueManager.register('get_job_q', callable=lambda: job_q)
    JobQueueManager.register('get_result_q', callable=lambda: result_q)

    manager = JobQueueManager(address=('', port), authkey=authkey)
    manager.start()
    print 'Server started at port %s' % port
    return manager

I won’t explain what each line of this code does – it’s all available in the documentation page of multiprocessing. I’ll just note that the manager starts a TCP server at the given port, running in the background, and uses this server to let clients access its internal objects – in this case a couple of queues.

Finally, to complete the puzzle here’s the make_nums utility function. Nothing smart about it:

def make_nums(N):
    """ Create N large numbers to factorize.
    """
    nums = [999999999999]
    for i in xrange(N):
        nums.append(nums[-1] + 2)
    return nums

Alright, so this is the server. It will run, put input into the job queue and then wait for results to start trickling into the result queue. How would they get there though? From clients. Here’s a simple client:

def runclient():
    manager = make_client_manager(IP, PORTNUM, AUTHKEY)
    job_q = manager.get_job_q()
    result_q = manager.get_result_q()
    mp_factorizer(job_q, result_q, 4)

The client accesses the server by means of another manager object. It then asks for the queues and just runs mp_factorizer (with nprocs=4). The client’s manager is this:

def make_client_manager(ip, port, authkey):
    """ Create a manager for a client. This manager connects to a server on the
        given address and exposes the get_job_q and get_result_q methods for
        accessing the shared queues from the server.
        Return a manager object.
    """
    class ServerQueueManager(SyncManager):
        pass

    ServerQueueManager.register('get_job_q')
    ServerQueueManager.register('get_result_q')

    manager = ServerQueueManager(address=(ip, port), authkey=authkey)
    manager.connect()

    print 'Client connected to %s:%s' % (ip, port)
    return manager

This manager is simpler. Instead of starting a server, it connects to one (given an IP address, port and authorization key). A similar method has to be used to register the get_*_q methods, just to let the manager know they are part of the protocol. Think of it as a kind of RPC.

This client can be executed on the same machine with the server, or on a different machine, which can be located anywhere as long as it can reach the server by IP address. It will connect to the server and start pulling work from the job queue, placing results into the result queue. Theoretically, any amount of clients can connect simultaneously. The beauty of this method is that it only uses the Python standard library, so the code is very much platform independent. I had a Windows client machine connecting a Linux server, which also had a client running, happily sharing the work between them. It just works.

To summarize, I want to stress once again the goal of this post. Lest there be any misunderstanding, I’m not claiming this is the best way to do distributed programming in Python. It wouldn’t be easy to find the "best way", since it’s a complex problem domain with many tradeoffs, and many solutions that optimize for different things.

However, it is useful to know that such capabilities exist in the Python standard library. The multiprocessing package provides many useful building blocks. These can be used together or separately to implement all kinds of interesting solutions both for paralellizing work across multiple processes and distributing it across different machines. All of this, as you saw above, without writing too much code.

Related posts:

  1. Python – parallelizing CPU-bound tasks with multiprocessing
  2. Shared counter with Python’s multiprocessing
  3. Python threads: communication and stopping
  4. It’s time for Python 2.6
  5. Code sample – socket client thread in Python

19 Responses to “Distributed computing in Python with multiprocessing”

  1. gmonNo Gravatar Says:

    I was wondering is there any benefit to using this method over a library like parallelpython? Is this method faster or more effecient?

  2. elibenNo Gravatar Says:

    gmon,

    I’m not familiar with the parallelpython library, but one benefit is that this method doesn’t require any library to be installed.

  3. MattNo Gravatar Says:

    Eli,
    Just curious, I’m working on a Python based distributed test harness. Have you built anything substantial using multiprocessing in a distributed environment? As you mentioned, the doc for multiprocessing is pretty bad but I’m also wondering if its robust enough to actually use in the real world.

    I’ve looked at a bunch of the available libraries/toolsets and at this point, I think Rpyc works best for my needs.

    Awesome blog btw!

    Thanks,
    Matt

  4. elibenNo Gravatar Says:

    Matt,

    Personally I haven’t used it for any large project, but I know that many projects use it. For example, Celery does. Since multiprocessing is in the standard library and is a convenient low-level building block, I would not hesitate to use it. It has a big chance of being more robust than any random library you find online.

  5. bilboNo Gravatar Says:

    Thanks for the examples, you’re not wrong about the docs being lacking in this respect.

    Any particular reason why you’re using a Queue.Queue instead of a multiprocessing.Queue?

  6. elibenNo Gravatar Says:

    bilbo,

    No special reason.

  7. BagiNo Gravatar Says:

    Hi! Could you post all the relevant code including all import statements + a sequence of calls to the functions in one file (or may be 2 if the client needs to be run separately). I am new to multiprocessing, and not so experienced in Python itself, and am getting interpreter errors with my attempt.

  8. elibenNo Gravatar Says:

    Bagi,

    Sure, here it is: http://paste.pocoo.org/show/552680/

    It’s not too tidy though.

  9. BagiNo Gravatar Says:

    Hi Eli,
    Many thanks for posting the code. There were some issues that I could figure out and the code runs smoothly now on a single CPU multi core machine. I will try it on a batch next.

    I note the issues I had below, just fyi. One of those could be an issue with my python version (2.6.6)

    1. The statement to create the dict (line 49) in factorizer_worker(…) did not work for me and gave a syntax error. The statement that worked was –
    outdict = dict([(n, factorize_naive(n)) for n in job])
    Is this due to differences in the python version, or is it indeed true that there is a syntax error in the original line?
    2. Your library eblib.utils that is referred in the last import statement, I could find after looking on google at
    http://ottawarobotics.org/subversion/Members/andrzej/PYTHON/PyDaq/src/PyDaq/Sandbox/Eli_Benderskys_demos/SerialMon/eblib/
    3. For your factorize library I found the bits of code on a previous post.

    Many thanks once again for the very nice blog posts you have and for your reply to my queries.
    Cheers,
    Bagi

  10. BagiNo Gravatar Says:

    Hi!,
    I have a question now. In the posting you mention that – “Note that no computation is actually performed in the server – it just manages the sharing for clients.”, However I see that the mp_factorizer(…) function is called both in the runserver() and the runclient() functions. As a result of this the server actually does all the computation with or without a client being run.
    I think mp_factorizer(…) should only be called from the runclient() function. I commented the line 90 in the code you posted. Then run a ‘server’ on one terminal and a ‘client’ (by having ‘client’ as a command line argument) in a second terminal and get the behaviour one would expect as described in the post.
    What do you think?
    Cheers,
    Bagi

  11. elibenNo Gravatar Says:

    Bagi,

    Yep, the full code is more general. The server may as well do something, right? :-) Just managing the queue doesn’t take much time from its CPU, so it can do some computations on its own.

    As for the issues you hit. I think the first one is because dict comprehensions are new in 2.7, which is the version I’m using (and you should too!). The others – true, this is why I didn’t originally release the full source into the blog post. It builds on top of some utils I have lying around.

  12. ChestnutNo Gravatar Says:

    Hi — Thank you so much for your tutorial!

    Unfortunately, the pocoo pastebin has been shut down, so that your complete code is no longer available. Would it be possible for you to repost it, possibly as a gist?

    Sorry for the trouble and thanks again!

    Chestnut

  13. elibenNo Gravatar Says:

    Chestnut,

    Here it is: https://gist.github.com/2798682

  14. NisanNo Gravatar Says:

    Hi,
    I just wanted to chime in with a thank you!
    This example is much more comprehensive than the RemoteManager one in the multiprocessing docs, not to mention the docs version makes bad (e.g. locking the parent process) use of the serve_forever() method, rather than actually utilize the manager’s start() and shutdown() methods.

  15. YuhangNo Gravatar Says:

    Hi,

    Thank you for this tutorial.

    I run into a problem using the framework. The server machine starts to complain “too many connections” once I increase of the number of processes running on the client machine beyond 500. Is there limitation on the volume of inter-process communication? Is there is a good way to debug this problem.

    Thanks.

  16. Oscar Martinez RubiNo Gravatar Says:

    Very nice tutorial! everything is clear!
    great work!
    thanks!

  17. Ramon IglesiasNo Gravatar Says:

    This helped A LOT!
    Thanks!
    (Just had to go around some picklability trouble)

  18. WintchNo Gravatar Says:

    Thanks! Very helpful as an example of what Python can do in terms of MP

  19. RahulNo Gravatar Says:

    Thanks much for this Eli, awesome post with very clear explanation.

Leave a Reply

To post code with preserved formatting, enclose it in `backticks` (even multiple lines)