Tags Python

When creating a GUI that has to communicate with the outer world, a common stumbling block that comes up is how to combine GUI code with I/O. I/O, whether HTTP requests, RPC protocols, plain socket communication or the serial port, tends to be blocking in nature, which doesn't play well with GUI code. No one wants their GUI to "freeze" while the program is blocking on a read call from a socket.

There are many solutions to this issue, the two most common of which are:

  1. Doing the I/O in a separate thread
  2. Using asynchronous I/O with callbacks integrated into the GUI event loop

In my opinion, option 1 is the simpler of the two, and it's the one I usually end up with. Here I want to present a simple code sample that implements a socket client thread in Python. Although this class is general enough to be used in many scenarios, I see it more as a pattern than as a completed black-box. Networking code tends to depend on a lot of factors, and it's easy to modify this sample to various scenarios. For example, while this is a client, re-implementing a similar server is easy. Without further ado, here's the code:

import socket
import struct
import threading
import Queue


class ClientCommand(object):
    """ A command to the client thread.
        Each command type has its associated data:

        CONNECT:    (host, port) tuple
        SEND:       Data string
        RECEIVE:    None
        CLOSE:      None
    """
    CONNECT, SEND, RECEIVE, CLOSE = range(4)

    def __init__(self, type, data=None):
        self.type = type
        self.data = data


class ClientReply(object):
    """ A reply from the client thread.
        Each reply type has its associated data:

        ERROR:      The error string
        SUCCESS:    Depends on the command - for RECEIVE it's the received
                    data string, for others None.
    """
    ERROR, SUCCESS = range(2)

    def __init__(self, type, data=None):
        self.type = type
        self.data = data


class SocketClientThread(threading.Thread):
    """ Implements the threading.Thread interface (start, join, etc.) and
        can be controlled via the cmd_q Queue attribute. Replies are
        placed in the reply_q Queue attribute.
    """
    def __init__(self, cmd_q=None, reply_q=None):
        super(SocketClientThread, self).__init__()
        self.cmd_q = cmd_q or Queue.Queue()
        self.reply_q = reply_q or Queue.Queue()
        self.alive = threading.Event()
        self.alive.set()
        self.socket = None

        self.handlers = {
            ClientCommand.CONNECT: self._handle_CONNECT,
            ClientCommand.CLOSE: self._handle_CLOSE,
            ClientCommand.SEND: self._handle_SEND,
            ClientCommand.RECEIVE: self._handle_RECEIVE,
        }

    def run(self):
        while self.alive.isSet():
            try:
                # Queue.get with timeout to allow checking self.alive
                cmd = self.cmd_q.get(True, 0.1)
                self.handlers[cmd.type](cmd)
            except Queue.Empty as e:
                continue

    def join(self, timeout=None):
        self.alive.clear()
        threading.Thread.join(self, timeout)

    def _handle_CONNECT(self, cmd):
        try:
            self.socket = socket.socket(
                socket.AF_INET, socket.SOCK_STREAM)
            self.socket.connect((cmd.data[0], cmd.data[1]))
            self.reply_q.put(self._success_reply())
        except IOError as e:
            self.reply_q.put(self._error_reply(str(e)))

    def _handle_CLOSE(self, cmd):
        self.socket.close()
        reply = ClientReply(ClientReply.SUCCESS)
        self.reply_q.put(reply)

    def _handle_SEND(self, cmd):
        header = struct.pack('<L', len(cmd.data))
        try:
            self.socket.sendall(header + cmd.data)
            self.reply_q.put(self._success_reply())
        except IOError as e:
            self.reply_q.put(self._error_reply(str(e)))

    def _handle_RECEIVE(self, cmd):
        try:
            header_data = self._recv_n_bytes(4)
            if len(header_data) == 4:
                msg_len = struct.unpack('<L', header_data)[0]
                data = self._recv_n_bytes(msg_len)
                if len(data) == msg_len:
                    self.reply_q.put(self._success_reply(data))
                    return
            self.reply_q.put(self._error_reply('Socket closed prematurely'))
        except IOError as e:
            self.reply_q.put(self._error_reply(str(e)))

    def _recv_n_bytes(self, n):
        """ Convenience method for receiving exactly n bytes from
            self.socket (assuming it's open and connected).
        """
        data = ''
        while len(data) < n:
            chunk = self.socket.recv(n - len(data))
            if chunk == '':
                break
            data += chunk
        return data

    def _error_reply(self, errstr):
        return ClientReply(ClientReply.ERROR, errstr)

    def _success_reply(self, data=None):
        return ClientReply(ClientReply.SUCCESS, data)

SocketClientThread is the main class here. It's a Python thread that can be started and terminated (joined), and communicated with by passing it commands and getting back replies. ClientCommand and ClientReply are simple data classes to encapsulate these commands and replies.

This code, while simple, demonstrates many patterns in Python threading and networking code. Here's a brief description of some points of interest, in no particular order:

  • The standard Queue.Queue is used to pass data between the thread and the user code. Queue is a great tool in a Python programmer's toolbox - I use it all the time to decouple multi-threaded code. The biggest difficulty in writing multi-threaded programs is protecting shared data. A Queue makes this a non-issue, essentially transforming the sharing model into message passing, which is much simpler to use safely.

    You will notice that SocketClientThread uses two queues, one for getting commands from the main thread, the other for passing replies. This is a common idiom that works well for most scenarios.

  • In general, you can't force a thread to die in Python. If you need to manually terminate threads, they have to agree to die. The alive attribute of SocketClientThread demonstrates one common and safe way to achieve it. alive is a threading.Event - a thread-safe flag that can be cleared in the main thread by calling alive.clear() (which is done in the join method). The communication thread occasionally checks if this flag is still set and if not, it exits gracefully.

    There is a very important implementation detail here. Note how the thread's run method is implemented. The loop runs while alive is set, but to actually be able to execute this test, the loop can't block. So pulling commands from the command queue is done with get(True, 0.1), which means the action is blocking but with a 100 millisecond timeout. This has two benefits: on one hand, it doesn't block indefinitely, and at most 100 ms will pass until the thread notices that its alive flag is clear. On the other hand, since this does block for 100 ms, the thread doesn't just spin on the CPU while waiting for commands. In fact, its CPU utilization is negligible. Note that the thread can still block and refuse to die if it's waiting on the socket's recv with no data coming in.

  • SocketClientThread uses a TCP socket, which will transmit all data faithfully, but can do so in chunks of unpredictable size. This requires to delimit the messages somehow, to let the other side know when a message begins and ends. I'm using the length prefix technique here. Before a message is sent, its length is sent as a packed 4-byte little-endian integer. When a message is received, first 4 bytes are received to unpack the length, and then the actual message can be received since we now know how long it is.

  • For the same reason as stated in the previous bullet, some care must be taken when sending and receiving data on a TCP socket. Under network load, it's not guaranteed that it will actually send or receive all the bytes you expected in one try. To handle this potential problem while sending, Python provides the socket.sendall function. When receiving, it's just a bit more tricky, requiring to loop on recv until the correct amount of bytes has been received.

To show an example of how to use SocketClientThread, this code also contains a sample GUI implemented with PyQt. This GUI uses the client thread to connect to a server (by default localhost:50007), send "hello" and wait for a reply. In the mean time, the GUI keeps painting a pretty circle animation to demonstrate it's not blocked by the socket operations. To achieve this effect, the GUI employs yet another interesting idiom - a timer which is used to periodically check if SocketClientThread placed new data in its reply queue, by calling reply_q.get(block=False). This timer + non-blocking get combination allows effective communication between the thread and the GUI.

I hope this code sample will be useful to others. If you have any questions about it, don't hesitate to ask in a comment or send me an email.

P.S. As almost all samples posted here, this code is in the public domain.