Python threads: communication and stopping
December 27th, 2011 at 5:42 amA very common doubt developers new to Python have is how to use its threads correctly. Specifically, a large amount of questions on StackOverflow show that people struggle most with two aspects:
- How to stop / kill a thread
- How to safely pass data to a thread and back
I already have a blog post touching on these issues right here, but I feel it’s too task-specific for sockets, and a more basic and general post would be appropriate. I assume the reader has a basic familiarity with Python threads, i.e. has at least went over the documentation.
So, without further ado, here’s a sample "worker" thread implementation. It can be given tasks, where each task is a directory name, and it does useful work. This work is recursively listing all the files contained in the given directory and its sub-directories.
import os, time
import threading, Queue
class WorkerThread(threading.Thread):
""" A worker thread that takes directory names from a queue, finds all
files in them recursively and reports the result.
Input is done by placing directory names (as strings) into the
Queue passed in dir_q.
Output is done by placing tuples into the Queue passed in result_q.
Each tuple is (thread name, dirname, [list of files]).
Ask the thread to stop by calling its join() method.
"""
def __init__(self, dir_q, result_q):
super(WorkerThread, self).__init__()
self.dir_q = dir_q
self.result_q = result_q
self.stoprequest = threading.Event()
def run(self):
# As long as we weren't asked to stop, try to take new tasks from the
# queue. The tasks are taken with a blocking 'get', so no CPU
# cycles are wasted while waiting.
# Also, 'get' is given a timeout, so stoprequest is always checked,
# even if there's nothing in the queue.
while not self.stoprequest.isSet():
try:
dirname = self.dir_q.get(True, 0.05)
filenames = list(self._files_in_dir(dirname))
self.result_q.put((self.name, dirname, filenames))
except Queue.Empty:
continue
def join(self, timeout=None):
self.stoprequest.set()
super(WorkerThread, self).join(timeout)
def _files_in_dir(self, dirname):
""" Given a directory name, yields the names of all files (not dirs)
contained in this directory and its sub-directories.
"""
for path, dirs, files in os.walk(dirname):
for file in files:
yield os.path.join(path, file)
For experienced programmers, going over the code and its documentation and comments should be enough. For folks with less experience, I will elaborate. Let’s address the two issues mentioned at the top of the post.
First, killing a thread. This is accomplished by politely asking the thread to die. The join method of Thread is overridden, and before calling the actual join of the parent class, it "sets" the self.stoprequest attribute, which is a threading.Event. The main loop in the thread’s run method checks this flag, and exits when it’s set. You can think of threading.Event as a synchronized boolean flag. Keep in mind that the join method is called in the context of the main thread, while the body of the run method is executed in the context of the worker thread.
Second, passing data into and out of a thread. This is best done with the help of Queue objects from the Queue module (yep, it’s Queue.Queue – a bit awkward, I agree. In Python 3 this was fixed and the module is named in lowercase – queue). When the worker thread is created, it is given a reference to one queue for input, and one queue for output. Queue objects can be safely shared between threads (any amount of threads, actually), and provide a synchronized FIFO queue interface.
Probably the most important part of the code to understand is these lines:
while not self.stoprequest.isSet():
try:
dirname = self.dir_q.get(True, 0.05)
... # do work
except Queue.Empty:
continue
This is a simple way to wait on two conditions simultaneously. The thread does work only if wasn’t asked to stop, and there’s work to do in the queue. Therefore, the Queue.get method is used as follows:
- Its blocking argument is set to true, meaning that the call will block the worker thread until there’s an item in the queue, or…
- The timeout argument is also set, meaning that get will block for at most timeout seconds. If no item appeared in the queue within this timeout, the method throws a Queue.Empty exception.
So what we have here is a way to both wait on the queue without wasting CPU cycles (get uses special OS services deep underneath to implement a non-spinning wait), and check the stoprequest event occasionally. There’s just one gotcha, however. If the work is taking long to perform, the next check of stoprequest may be delayed. If this is important for your application, consider doing the work in short chunks, checking stoprequest after each chunk.
Here’s some simple code that uses this worker thread. It creates a thread pool with 4 threads, feeds them work and waits for all the results to arrive:
def main(args):
# Create a single input and a single output queue for all threads.
dir_q = Queue.Queue()
result_q = Queue.Queue()
# Create the "thread pool"
pool = [WorkerThread(dir_q=dir_q, result_q=result_q) for i in range(4)]
# Start all threads
for thread in pool:
thread.start()
# Give the workers some work to do
work_count = 0
for dir in args:
if os.path.exists(dir):
work_count += 1
dir_q.put(dir)
print 'Assigned %s dirs to workers' % work_count
# Now get all the results
while work_count > 0:
# Blocking 'get' from a Queue.
result = result_q.get()
print 'From thread %s: %s files found in dir %s' % (
result[0], len(result[2]), result[1])
work_count -= 1
# Ask threads to die and wait for them to do it
for thread in pool:
thread.join()
if __name__ == '__main__':
import sys
main(sys.argv[1:])
All worker threads in the pool share the same input queue and output queue. There’s absolutely no problem with this. On the contrary, as you can see it enables a very simple implementation of a thread pool to be quite functional.
Please treat this sample as a template rather than a one-size-fits-all solution for every possible threading scenario. Concurrency is a complex topic, and there are a lot of tradeoffs involved in the design of even the simplest multi-threaded programs.
Finally, note that such worker threads in Python are only useful if the work they do is not CPU bound. The thread shown here is a good example – listing directories and files is mostly an I/O bound task (well, less so if you have a really fast SSD). Other good candidates are socket I/O, user interaction, and anything involving the web (i.e. fetching data from HTTP or RPC services).
CPU-bound tasks are not a good fit for Python threads, due to the Global Interpreter Lock (GIL). Parallel computations in Python should be done in multiple processes, not threads. In a future article I’ll discuss how to use the multiprocessing module to manage worker processes in a similar manner.
Related posts:

December 27th, 2011 at 09:27
I use the above pattern frequently with two changes. I use a sentinel to indicate that the worker should exit. The sentinel is usually None. This is because there is a Queue class in the multiprocessing package and a sentinel will work better for that than an Event. I can usually switch between threading and multiprocessing just by choosing which module to import Queue from.
The other thing is that I return a tuple of (exception, result). exception is None if no exception occurred. This allows me to reraise any exception that occurred in a worker.
December 27th, 2011 at 10:11
Roger,
Yes, a sentinel is a common alternative. It’s a bit more cumbersome to manage if you have multiple workers, but it doesn’t require an extra synchronized object.
December 27th, 2011 at 13:44
We can send as many sentinels as there are workers. The worker won’t get() anything after it received the sentinels, so every thread will get exactly one. This approach don’t require active waiting (a common source of nasty race conditions).
December 31st, 2011 at 05:20
Thanks for a good article with a realistic example. I which I would have had this example years ago when I was first learning threads.
First, I think it was a bad choice for the creator(s) of the threading.Thread class to use the name join().
It is the most unintuitive name.
I also don’t think it was a good idea for you to override the join method, just to add the call to stoprequest.set().
It makes much better sense to create a new method like “stopThread”, since that is what you’re trying to do.
I look forward to your article on multiprocessing.
I wish there was a way to subscribe, so that i would be notified when it is published.
December 31st, 2011 at 06:21
Would you add links to all future posts so that we can download the source code?
thanks
December 31st, 2011 at 06:49
ThreadNewbie,
I think you’re missing a piece of historic context here. “Join” has been used for waiting on threads & processes to end for many years. It means “wait until the thread dies”, without explicitly killing it. I chose to override the same method, although as you say it’s possible to create another method. Then, it would take a call to that new method (
ask_thread_to_stop()or something) followed byjoin.As for subscribing, there’s RSS for that. http://eli.thegreenplace.net/feed/
January 19th, 2012 at 17:28
I am also a starter with threads and I have a question:
Why do you get the results of the threads before they are terminated. Could it not be possible that you get the results from a thread before it is completly finished?
January 19th, 2012 at 20:51
Patrick,
What do you mean? I don’t think I fully understand your question.
Note that
joindoesn’t kill the thread, it just waits (and blocks) until the thread has finished running.January 10th, 2013 at 10:34
Nice. Thank you for posting this. I will definitely check to your site to read more and tell my friends about your site.
February 3rd, 2013 at 07:28
Interesting read, thanks. Not a question about threading… but why implement _files_in_dir as a generator here if you’re going to forcibly evaluate it as soon as it’s called?
February 6th, 2013 at 05:24
@Simon,
How would you implement it? I like using
yieldfor simple code like this because it makes the function succinct. The alternative is to manually collect a list and return it, but why do that?February 9th, 2013 at 03:43
I guess I’ve only used generators in such a situation when being careful of very large data sets. But you’re right, it’s simpler and tidier than creating an empty list and collecting each piece of data. Thanks.