In the previous post, I discussed how the multiprocessing package can be used to run CPU-bound computation tasks in parallel on a multi-core machine. But the utility of multiprocessing doesn't end here. It can also be used to run computations distributed over several machines.

This enters the exciting domain of distributed computing. There are many tools available for addressing various aspects of this domain, but here I want to specifically focus on what Python offers right in the standard library, with multiprocessing. The part of the package that makes distributed computing possible is called "managers".

The documentation of multiprocessing.managers leaves something to be desired. It's not entirely clear what the full capabilities of this tool are from just skimming the docs. For example, it starts by saying:

Managers provide a way to create data which can be shared between different processes. A manager object controls a server process which manages shared objects. Other processes can access the shared objects by using proxies.

Which is somewhat confusing, since multiprocessing already has synchronization primitives available without using managers (for example Value and Lock). So why are managers needed?

For two main reasons:

  1. Managers provide additional synchronization tools, such as a list or a dictionary that can be shared between processes.
  2. Managers allow their synchronized objects to be used between processes running across a network, and not just on the same machine. This is why, for example, managers also provide a Lock, which at first sight appears to be a duplication of the multiprocessing.Lock. It isn't a duplication, because multiprocessing.Lock is only available for processes running on the same machine, while the multiprocessing.SyncManager.Lock can be shared across machines (which is why it's also slower).

I don't want to delve too far into the synchronization primitives, and instead focus on the distributing computing made possible by managers.

The task will be the same as before - factoring lists of numbers. The worker function is:

def factorizer_worker(job_q, result_q):
    """ A worker function to be launched in a separate process. Takes jobs from
        job_q - each job a list of numbers to factorize. When the job is done,
        the result (dict mapping number -> list of factors) is placed into
        result_q. Runs until job_q is empty.
    """
    while True:
        try:
            job = job_q.get_nowait()
            outdict = {n: factorize_naive(n) for n in job}
            result_q.put(outdict)
        except Queue.Empty:
            return

This worker runs in a single process on some machine. All it cares about is that it has a queue of "jobs" to look at, and a queue of results to write into. Each job in the queue is a list of numbers to factorize. Once the worker has finished factoring these numbers, it puts a result dict into the result queue. The worker stops and returns when it notices that the job queue is empty. That's it.

Adding an abstraction level, here's how this worker can be used:

def mp_factorizer(shared_job_q, shared_result_q, nprocs):
    """ Split the work with jobs in shared_job_q and results in
        shared_result_q into several processes. Launch each process with
        factorizer_worker as the worker function, and wait until all are
        finished.
    """
    procs = []
    for i in range(nprocs):
        p = multiprocessing.Process(
                target=factorizer_worker,
                args=(shared_job_q, shared_result_q))
        procs.append(p)
        p.start()

    for p in procs:
        p.join()

mp_factorizer takes the same pair of queues and the number of processes to create. It then uses multiprocessing.Process to spawn several workers, each into a process of its own. When all the workers are done, mp_factorizer exits. Note how this code is still independent of where it actually executes - its interface with the world is via the job and result queues.

There's nothing really new here, so let's get to the interesting stuff - starting a server that manages the shared queues:

def runserver():
    # Start a shared manager server and access its queues
    manager = make_server_manager(PORTNUM, AUTHKEY)
    shared_job_q = manager.get_job_q()
    shared_result_q = manager.get_result_q()

    N = 999
    nums = make_nums(N)

    # The numbers are split into chunks. Each chunk is pushed into the job
    # queue.
    chunksize = 43
    for i in range(0, len(nums), chunksize):
        shared_job_q.put(nums[i:i + chunksize])

    # Wait until all results are ready in shared_result_q
    numresults = 0
    resultdict = {}
    while numresults < N:
        outdict = shared_result_q.get()
        resultdict.update(outdict)
        numresults += len(outdict)

    # Sleep a bit before shutting down the server - to give clients time to
    # realize the job queue is empty and exit in an orderly way.
    time.sleep(2)
    manager.shutdown()

What this code does is:

  1. Create the manager (which actually starts the server running in the background) - more on this step later
  2. Generate some input numbers and break them to chunks
  3. Feed the job queue with chunks of numbers for the workers to churn on
  4. Wait until the expected amount of results has been placed in the result queue
  5. Shut down the server and exit

Note that no computation is actually performed in the server - it just manages the sharing for clients. And this is make_server_manager:

def make_server_manager(port, authkey):
    """ Create a manager for the server, listening on the given port.
        Return a manager object with get_job_q and get_result_q methods.
    """
    job_q = Queue.Queue()
    result_q = Queue.Queue()

    # This is based on the examples in the official docs of multiprocessing.
    # get_{job|result}_q return synchronized proxies for the actual Queue
    # objects.
    class JobQueueManager(SyncManager):
        pass

    JobQueueManager.register('get_job_q', callable=lambda: job_q)
    JobQueueManager.register('get_result_q', callable=lambda: result_q)

    manager = JobQueueManager(address=('', port), authkey=authkey)
    manager.start()
    print 'Server started at port %s' % port
    return manager

I won't explain what each line of this code does - it's all available in the documentation page of multiprocessing. I'll just note that the manager starts a TCP server at the given port, running in the background, and uses this server to let clients access its internal objects - in this case a couple of queues.

Finally, to complete the puzzle here's the make_nums utility function. Nothing smart about it:

def make_nums(N):
    """ Create N large numbers to factorize.
    """
    nums = [999999999999]
    for i in xrange(N):
        nums.append(nums[-1] + 2)
    return nums

Alright, so this is the server. It will run, put input into the job queue and then wait for results to start trickling into the result queue. How would they get there though? From clients. Here's a simple client:

def runclient():
    manager = make_client_manager(IP, PORTNUM, AUTHKEY)
    job_q = manager.get_job_q()
    result_q = manager.get_result_q()
    mp_factorizer(job_q, result_q, 4)

The client accesses the server by means of another manager object. It then asks for the queues and just runs mp_factorizer (with nprocs=4). The client's manager is this:

def make_client_manager(ip, port, authkey):
    """ Create a manager for a client. This manager connects to a server on the
        given address and exposes the get_job_q and get_result_q methods for
        accessing the shared queues from the server.
        Return a manager object.
    """
    class ServerQueueManager(SyncManager):
        pass

    ServerQueueManager.register('get_job_q')
    ServerQueueManager.register('get_result_q')

    manager = ServerQueueManager(address=(ip, port), authkey=authkey)
    manager.connect()

    print 'Client connected to %s:%s' % (ip, port)
    return manager

This manager is simpler. Instead of starting a server, it connects to one (given an IP address, port and authorization key). A similar method has to be used to register the get_*_q methods, just to let the manager know they are part of the protocol. Think of it as a kind of RPC.

This client can be executed on the same machine with the server, or on a different machine, which can be located anywhere as long as it can reach the server by IP address. It will connect to the server and start pulling work from the job queue, placing results into the result queue. Theoretically, any amount of clients can connect simultaneously. The beauty of this method is that it only uses the Python standard library, so the code is very much platform independent. I had a Windows client machine connecting a Linux server, which also had a client running, happily sharing the work between them. It just works.

To summarize, I want to stress once again the goal of this post. Lest there be any misunderstanding, I'm not claiming this is the best way to do distributed programming in Python. It wouldn't be easy to find the "best way", since it's a complex problem domain with many tradeoffs, and many solutions that optimize for different things.

However, it is useful to know that such capabilities exist in the Python standard library. The multiprocessing package provides many useful building blocks. These can be used together or separately to implement all kinds of interesting solutions both for paralellizing work across multiple processes and distributing it across different machines. All of this, as you saw above, without writing too much code.

Code

The code for this post is available on GitHub.