Python threads: communication and stopping

December 27th, 2011 at 5:42 am

A very common doubt developers new to Python have is how to use its threads correctly. Specifically, a large amount of questions on StackOverflow show that people struggle most with two aspects:

  1. How to stop / kill a thread
  2. How to safely pass data to a thread and back

I already have a blog post touching on these issues right here, but I feel it’s too task-specific for sockets, and a more basic and general post would be appropriate. I assume the reader has a basic familiarity with Python threads, i.e. has at least went over the documentation.

So, without further ado, here’s a sample "worker" thread implementation. It can be given tasks, where each task is a directory name, and it does useful work. This work is recursively listing all the files contained in the given directory and its sub-directories.

import os, time
import threading, Queue

class WorkerThread(threading.Thread):
    """ A worker thread that takes directory names from a queue, finds all
        files in them recursively and reports the result.

        Input is done by placing directory names (as strings) into the
        Queue passed in dir_q.

        Output is done by placing tuples into the Queue passed in result_q.
        Each tuple is (thread name, dirname, [list of files]).

        Ask the thread to stop by calling its join() method.
    """
    def __init__(self, dir_q, result_q):
        super(WorkerThread, self).__init__()
        self.dir_q = dir_q
        self.result_q = result_q
        self.stoprequest = threading.Event()

    def run(self):
        # As long as we weren't asked to stop, try to take new tasks from the
        # queue. The tasks are taken with a blocking 'get', so no CPU
        # cycles are wasted while waiting.
        # Also, 'get' is given a timeout, so stoprequest is always checked,
        # even if there's nothing in the queue.
        while not self.stoprequest.isSet():
            try:
                dirname = self.dir_q.get(True, 0.05)
                filenames = list(self._files_in_dir(dirname))
                self.result_q.put((self.name, dirname, filenames))
            except Queue.Empty:
                continue

    def join(self, timeout=None):
        self.stoprequest.set()
        super(WorkerThread, self).join(timeout)

    def _files_in_dir(self, dirname):
        """ Given a directory name, yields the names of all files (not dirs)
            contained in this directory and its sub-directories.
        """
        for path, dirs, files in os.walk(dirname):
            for file in files:
                yield os.path.join(path, file)

For experienced programmers, going over the code and its documentation and comments should be enough. For folks with less experience, I will elaborate. Let’s address the two issues mentioned at the top of the post.

First, killing a thread. This is accomplished by politely asking the thread to die. The join method of Thread is overridden, and before calling the actual join of the parent class, it "sets" the self.stoprequest attribute, which is a threading.Event. The main loop in the thread’s run method checks this flag, and exits when it’s set. You can think of threading.Event as a synchronized boolean flag. Keep in mind that the join method is called in the context of the main thread, while the body of the run method is executed in the context of the worker thread.

Second, passing data into and out of a thread. This is best done with the help of Queue objects from the Queue module (yep, it’s Queue.Queue – a bit awkward, I agree. In Python 3 this was fixed and the module is named in lowercase – queue). When the worker thread is created, it is given a reference to one queue for input, and one queue for output. Queue objects can be safely shared between threads (any amount of threads, actually), and provide a synchronized FIFO queue interface.

Probably the most important part of the code to understand is these lines:

while not self.stoprequest.isSet():
    try:
        dirname = self.dir_q.get(True, 0.05)
        ... # do work
    except Queue.Empty:
        continue

This is a simple way to wait on two conditions simultaneously. The thread does work only if wasn’t asked to stop, and there’s work to do in the queue. Therefore, the Queue.get method is used as follows:

  • Its blocking argument is set to true, meaning that the call will block the worker thread until there’s an item in the queue, or…
  • The timeout argument is also set, meaning that get will block for at most timeout seconds. If no item appeared in the queue within this timeout, the method throws a Queue.Empty exception.

So what we have here is a way to both wait on the queue without wasting CPU cycles (get uses special OS services deep underneath to implement a non-spinning wait), and check the stoprequest event occasionally. There’s just one gotcha, however. If the work is taking long to perform, the next check of stoprequest may be delayed. If this is important for your application, consider doing the work in short chunks, checking stoprequest after each chunk.

Here’s some simple code that uses this worker thread. It creates a thread pool with 4 threads, feeds them work and waits for all the results to arrive:

def main(args):
    # Create a single input and a single output queue for all threads.
    dir_q = Queue.Queue()
    result_q = Queue.Queue()

    # Create the "thread pool"
    pool = [WorkerThread(dir_q=dir_q, result_q=result_q) for i in range(4)]

    # Start all threads
    for thread in pool:
        thread.start()

    # Give the workers some work to do
    work_count = 0
    for dir in args:
        if os.path.exists(dir):
            work_count += 1
            dir_q.put(dir)

    print 'Assigned %s dirs to workers' % work_count

    # Now get all the results
    while work_count > 0:
        # Blocking 'get' from a Queue.
        result = result_q.get()
        print 'From thread %s: %s files found in dir %s' % (
            result[0], len(result[2]), result[1])
        work_count -= 1

    # Ask threads to die and wait for them to do it
    for thread in pool:
        thread.join()


if __name__ == '__main__':
    import sys
    main(sys.argv[1:])

All worker threads in the pool share the same input queue and output queue. There’s absolutely no problem with this. On the contrary, as you can see it enables a very simple implementation of a thread pool to be quite functional.

Please treat this sample as a template rather than a one-size-fits-all solution for every possible threading scenario. Concurrency is a complex topic, and there are a lot of tradeoffs involved in the design of even the simplest multi-threaded programs.

Finally, note that such worker threads in Python are only useful if the work they do is not CPU bound. The thread shown here is a good example – listing directories and files is mostly an I/O bound task (well, less so if you have a really fast SSD). Other good candidates are socket I/O, user interaction, and anything involving the web (i.e. fetching data from HTTP or RPC services).

CPU-bound tasks are not a good fit for Python threads, due to the Global Interpreter Lock (GIL). Parallel computations in Python should be done in multiple processes, not threads. In a future article I’ll discuss how to use the multiprocessing module to manage worker processes in a similar manner.

Related posts:

  1. How (not) to set a timeout on a computation in Python
  2. Python – parallelizing CPU-bound tasks with multiprocessing
  3. Code sample – socket client thread in Python
  4. Creating threads in Win32 C/C++ programming
  5. Distributed computing in Python with multiprocessing

13 Responses to “Python threads: communication and stopping”

  1. RogerNo Gravatar Says:

    I use the above pattern frequently with two changes. I use a sentinel to indicate that the worker should exit. The sentinel is usually None. This is because there is a Queue class in the multiprocessing package and a sentinel will work better for that than an Event. I can usually switch between threading and multiprocessing just by choosing which module to import Queue from.

    The other thing is that I return a tuple of (exception, result). exception is None if no exception occurred. This allows me to reraise any exception that occurred in a worker.

  2. elibenNo Gravatar Says:

    Roger,

    Yes, a sentinel is a common alternative. It’s a bit more cumbersome to manage if you have multiple workers, but it doesn’t require an extra synchronized object.

  3. Tadeas MoravecNo Gravatar Says:

    We can send as many sentinels as there are workers. The worker won’t get() anything after it received the sentinels, so every thread will get exactly one. This approach don’t require active waiting (a common source of nasty race conditions).

  4. ThreadNewbieNo Gravatar Says:

    Thanks for a good article with a realistic example. I which I would have had this example years ago when I was first learning threads.

    First, I think it was a bad choice for the creator(s) of the threading.Thread class to use the name join().
    It is the most unintuitive name.

    I also don’t think it was a good idea for you to override the join method, just to add the call to stoprequest.set().

    It makes much better sense to create a new method like “stopThread”, since that is what you’re trying to do.

    I look forward to your article on multiprocessing.
    I wish there was a way to subscribe, so that i would be notified when it is published.

  5. ThreadNewbieNo Gravatar Says:

    Would you add links to all future posts so that we can download the source code?

    thanks

  6. elibenNo Gravatar Says:

    ThreadNewbie,

    I think you’re missing a piece of historic context here. “Join” has been used for waiting on threads & processes to end for many years. It means “wait until the thread dies”, without explicitly killing it. I chose to override the same method, although as you say it’s possible to create another method. Then, it would take a call to that new method (ask_thread_to_stop() or something) followed by join.

    As for subscribing, there’s RSS for that. http://eli.thegreenplace.net/feed/

  7. PatrickNo Gravatar Says:

    I am also a starter with threads and I have a question:

    Why do you get the results of the threads before they are terminated. Could it not be possible that you get the results from a thread before it is completly finished?

  8. elibenNo Gravatar Says:

    Patrick,

    What do you mean? I don’t think I fully understand your question.

    Note that join doesn’t kill the thread, it just waits (and blocks) until the thread has finished running.

  9. hcg levelsNo Gravatar Says:

    Nice. Thank you for posting this. I will definitely check to your site to read more and tell my friends about your site.

  10. simonNo Gravatar Says:

    Interesting read, thanks. Not a question about threading… but why implement _files_in_dir as a generator here if you’re going to forcibly evaluate it as soon as it’s called?

  11. elibenNo Gravatar Says:

    @Simon,

    How would you implement it? I like using yield for simple code like this because it makes the function succinct. The alternative is to manually collect a list and return it, but why do that?

  12. simonNo Gravatar Says:

    I guess I’ve only used generators in such a situation when being careful of very large data sets. But you’re right, it’s simpler and tidier than creating an empty list and collecting each piece of data. Thanks.

  13. Mark MikofskiNo Gravatar Says:

    Excellent post! I was banging my head against the wall trying to figure out what was blocking when I realized, that it was Queue’s get method. I finally implemented a similar solution to yours here, but initially I used get_nowait() in a try block, with pass on Queue.Empty. I didn’t know about the low level OS services that keep the CPU from spinning, but I thought that there might be something like it – Tk has after_idle() – so after reading this post I changed mine to continue after a timeout like yours. I start urlretrieve on a thread, passing it a reporthook that puts its output into the queue. The main thread gets the thread’s output from the queue and writes it to stdout or where ever I want – e.g. a ttk.Progressbar. It quits when the thread’s is_alive() method returns False, so I didn’t implement the Event as you did. Thanks again!

Leave a Reply

To post code with preserved formatting, enclose it in `backticks` (even multiple lines)