Distributed computing in Python with multiprocessing
January 24th, 2012 at 5:23 amIn the previous post, I discussed how the multiprocessing package can be used to run CPU-bound computation tasks in parallel on a multi-core machine. But the utility of multiprocessing doesn’t end here. It can also be used to run computations distributed over several machines.
This enters the exciting domain of distributed computing. There are many tools available for addressing various aspects of this domain, but here I want to specifically focus on what Python offers right in the standard library, with multiprocessing. The part of the package that makes distributed computing possible is called "managers".
The documentation of multiprocessing.managers leaves something to be desired. It’s not entirely clear what the full capabilities of this tool are from just skimming the docs. For example, it starts by saying:
Managers provide a way to create data which can be shared between different processes. A manager object controls a server process which manages shared objects. Other processes can access the shared objects by using proxies.
Which is somewhat confusing, since multiprocessing already has synchronization primitives available without using managers (for example Value and Lock). So why are managers needed?
For two main reasons:
- Managers provide additional synchronization tools, such as a list or a dictionary that can be shared between processes.
- Managers allow their synchronized objects to be used between processes running across a network, and not just on the same machine. This is why, for example, managers also provide a Lock, which at first sight appears to be a duplication of the multiprocessing.Lock. It isn’t a duplication, because multiprocessing.Lock is only available for processes running on the same machine, while the multiprocessing.SyncManager.Lock can be shared across machines (which is why it’s also slower).
I don’t want to delve too far into the synchronization primitives, and instead focus on the distributing computing made possible by managers.
The task will be the same as before – factoring lists of numbers. The worker function is:
def factorizer_worker(job_q, result_q):
""" A worker function to be launched in a separate process. Takes jobs from
job_q - each job a list of numbers to factorize. When the job is done,
the result (dict mapping number -> list of factors) is placed into
result_q. Runs until job_q is empty.
"""
while True:
try:
job = job_q.get_nowait()
outdict = {n: factorize_naive(n) for n in job}
result_q.put(outdict)
except Queue.Empty:
return
This worker runs in a single process on some machine. All it cares about is that it has a queue of "jobs" to look at, and a queue of results to write into. Each job in the queue is a list of numbers to factorize. Once the worker has finished factoring these numbers, it puts a result dict into the result queue. The worker stops and returns when it notices that the job queue is empty. That’s it.
Adding an abstraction level, here’s how this worker can be used:
def mp_factorizer(shared_job_q, shared_result_q, nprocs):
""" Split the work with jobs in shared_job_q and results in
shared_result_q into several processes. Launch each process with
factorizer_worker as the worker function, and wait until all are
finished.
"""
procs = []
for i in range(nprocs):
p = multiprocessing.Process(
target=factorizer_worker,
args=(shared_job_q, shared_result_q))
procs.append(p)
p.start()
for p in procs:
p.join()
mp_factorizer takes the same pair of queues and the number of processes to create. It then uses multiprocessing.Process to spawn several workers, each into a process of its own. When all the workers are done, mp_factorizer exits. Note how this code is still independent of where it actually executes – its interface with the world is via the job and result queues.
There’s nothing really new here, so let’s get to the interesting stuff – starting a server that manages the shared queues:
def runserver():
# Start a shared manager server and access its queues
manager = make_server_manager(PORTNUM, AUTHKEY)
shared_job_q = manager.get_job_q()
shared_result_q = manager.get_result_q()
N = 999
nums = make_nums(N)
# The numbers are split into chunks. Each chunk is pushed into the job
# queue.
chunksize = 43
for i in range(0, len(nums), chunksize):
shared_job_q.put(nums[i:i + chunksize])
# Wait until all results are ready in shared_result_q
numresults = 0
resultdict = {}
while numresults < N:
outdict = shared_result_q.get()
resultdict.update(outdict)
numresults += len(outdict)
# Sleep a bit before shutting down the server - to give clients time to
# realize the job queue is empty and exit in an orderly way.
time.sleep(2)
manager.shutdown()
What this code does is:
- Create the manager (which actually starts the server running in the background) – more on this step later
- Generate some input numbers and break them to chunks
- Feed the job queue with chunks of numbers for the workers to churn on
- Wait until the expected amount of results has been placed in the result queue
- Shut down the server and exit
Note that no computation is actually performed in the server – it just manages the sharing for clients. And this is make_server_manager:
def make_server_manager(port, authkey):
""" Create a manager for the server, listening on the given port.
Return a manager object with get_job_q and get_result_q methods.
"""
job_q = Queue.Queue()
result_q = Queue.Queue()
# This is based on the examples in the official docs of multiprocessing.
# get_{job|result}_q return synchronized proxies for the actual Queue
# objects.
class JobQueueManager(SyncManager):
pass
JobQueueManager.register('get_job_q', callable=lambda: job_q)
JobQueueManager.register('get_result_q', callable=lambda: result_q)
manager = JobQueueManager(address=('', port), authkey=authkey)
manager.start()
print 'Server started at port %s' % port
return manager
I won’t explain what each line of this code does – it’s all available in the documentation page of multiprocessing. I’ll just note that the manager starts a TCP server at the given port, running in the background, and uses this server to let clients access its internal objects – in this case a couple of queues.
Finally, to complete the puzzle here’s the make_nums utility function. Nothing smart about it:
def make_nums(N):
""" Create N large numbers to factorize.
"""
nums = [999999999999]
for i in xrange(N):
nums.append(nums[-1] + 2)
return nums
Alright, so this is the server. It will run, put input into the job queue and then wait for results to start trickling into the result queue. How would they get there though? From clients. Here’s a simple client:
def runclient():
manager = make_client_manager(IP, PORTNUM, AUTHKEY)
job_q = manager.get_job_q()
result_q = manager.get_result_q()
mp_factorizer(job_q, result_q, 4)
The client accesses the server by means of another manager object. It then asks for the queues and just runs mp_factorizer (with nprocs=4). The client’s manager is this:
def make_client_manager(ip, port, authkey):
""" Create a manager for a client. This manager connects to a server on the
given address and exposes the get_job_q and get_result_q methods for
accessing the shared queues from the server.
Return a manager object.
"""
class ServerQueueManager(SyncManager):
pass
ServerQueueManager.register('get_job_q')
ServerQueueManager.register('get_result_q')
manager = ServerQueueManager(address=(ip, port), authkey=authkey)
manager.connect()
print 'Client connected to %s:%s' % (ip, port)
return manager
This manager is simpler. Instead of starting a server, it connects to one (given an IP address, port and authorization key). A similar method has to be used to register the get_*_q methods, just to let the manager know they are part of the protocol. Think of it as a kind of RPC.
This client can be executed on the same machine with the server, or on a different machine, which can be located anywhere as long as it can reach the server by IP address. It will connect to the server and start pulling work from the job queue, placing results into the result queue. Theoretically, any amount of clients can connect simultaneously. The beauty of this method is that it only uses the Python standard library, so the code is very much platform independent. I had a Windows client machine connecting a Linux server, which also had a client running, happily sharing the work between them. It just works.
To summarize, I want to stress once again the goal of this post. Lest there be any misunderstanding, I’m not claiming this is the best way to do distributed programming in Python. It wouldn’t be easy to find the "best way", since it’s a complex problem domain with many tradeoffs, and many solutions that optimize for different things.
However, it is useful to know that such capabilities exist in the Python standard library. The multiprocessing package provides many useful building blocks. These can be used together or separately to implement all kinds of interesting solutions both for paralellizing work across multiple processes and distributing it across different machines. All of this, as you saw above, without writing too much code.
Related posts:

January 25th, 2012 at 02:15
I was wondering is there any benefit to using this method over a library like parallelpython? Is this method faster or more effecient?
January 25th, 2012 at 05:06
gmon,
I’m not familiar with the parallelpython library, but one benefit is that this method doesn’t require any library to be installed.
February 7th, 2012 at 21:42
Eli,
Just curious, I’m working on a Python based distributed test harness. Have you built anything substantial using multiprocessing in a distributed environment? As you mentioned, the doc for multiprocessing is pretty bad but I’m also wondering if its robust enough to actually use in the real world.
I’ve looked at a bunch of the available libraries/toolsets and at this point, I think Rpyc works best for my needs.
Awesome blog btw!
Thanks,
Matt
February 8th, 2012 at 05:31
Matt,
Personally I haven’t used it for any large project, but I know that many projects use it. For example, Celery does. Since
multiprocessingis in the standard library and is a convenient low-level building block, I would not hesitate to use it. It has a big chance of being more robust than any random library you find online.February 16th, 2012 at 11:58
Thanks for the examples, you’re not wrong about the docs being lacking in this respect.
Any particular reason why you’re using a
Queue.Queueinstead of amultiprocessing.Queue?February 16th, 2012 at 12:14
bilbo,
No special reason.
February 17th, 2012 at 18:13
Hi! Could you post all the relevant code including all import statements + a sequence of calls to the functions in one file (or may be 2 if the client needs to be run separately). I am new to multiprocessing, and not so experienced in Python itself, and am getting interpreter errors with my attempt.
February 17th, 2012 at 20:39
Bagi,
Sure, here it is: http://paste.pocoo.org/show/552680/
It’s not too tidy though.
February 20th, 2012 at 15:00
Hi Eli,
Many thanks for posting the code. There were some issues that I could figure out and the code runs smoothly now on a single CPU multi core machine. I will try it on a batch next.
I note the issues I had below, just fyi. One of those could be an issue with my python version (2.6.6)
1. The statement to create the dict (line 49) in factorizer_worker(…) did not work for me and gave a syntax error. The statement that worked was –
outdict = dict([(n, factorize_naive(n)) for n in job])
Is this due to differences in the python version, or is it indeed true that there is a syntax error in the original line?
2. Your library eblib.utils that is referred in the last import statement, I could find after looking on google at
http://ottawarobotics.org/subversion/Members/andrzej/PYTHON/PyDaq/src/PyDaq/Sandbox/Eli_Benderskys_demos/SerialMon/eblib/
3. For your factorize library I found the bits of code on a previous post.
Many thanks once again for the very nice blog posts you have and for your reply to my queries.
Cheers,
Bagi
February 20th, 2012 at 17:19
Hi!,
I have a question now. In the posting you mention that – “Note that no computation is actually performed in the server – it just manages the sharing for clients.”, However I see that the mp_factorizer(…) function is called both in the runserver() and the runclient() functions. As a result of this the server actually does all the computation with or without a client being run.
I think mp_factorizer(…) should only be called from the runclient() function. I commented the line 90 in the code you posted. Then run a ‘server’ on one terminal and a ‘client’ (by having ‘client’ as a command line argument) in a second terminal and get the behaviour one would expect as described in the post.
What do you think?
Cheers,
Bagi
February 23rd, 2012 at 14:01
Bagi,
Yep, the full code is more general. The server may as well do something, right?
Just managing the queue doesn’t take much time from its CPU, so it can do some computations on its own.
As for the issues you hit. I think the first one is because dict comprehensions are new in 2.7, which is the version I’m using (and you should too!). The others – true, this is why I didn’t originally release the full source into the blog post. It builds on top of some utils I have lying around.
May 25th, 2012 at 16:31
Hi — Thank you so much for your tutorial!
Unfortunately, the pocoo pastebin has been shut down, so that your complete code is no longer available. Would it be possible for you to repost it, possibly as a gist?
Sorry for the trouble and thanks again!
Chestnut
May 27th, 2012 at 04:57
Chestnut,
Here it is: https://gist.github.com/2798682
July 18th, 2012 at 23:59
Hi,
I just wanted to chime in with a thank you!
This example is much more comprehensive than the RemoteManager one in the multiprocessing docs, not to mention the docs version makes bad (e.g. locking the parent process) use of the serve_forever() method, rather than actually utilize the manager’s start() and shutdown() methods.
September 4th, 2012 at 20:53
Hi,
Thank you for this tutorial.
I run into a problem using the framework. The server machine starts to complain “too many connections” once I increase of the number of processes running on the client machine beyond 500. Is there limitation on the volume of inter-process communication? Is there is a good way to debug this problem.
Thanks.
February 28th, 2013 at 07:29
Very nice tutorial! everything is clear!
great work!
thanks!