Concurrent Servers: Part 4 - libuv



This is part 4 of a series of posts on writing concurrent network servers. In this part we're going to use libuv to rewrite our server once again, and also talk about handling time-consuming tasks in callbacks using a thread pool. Finally, we're going to look under the hood of libuv for a bit to study how it wraps blocking file-system operations with an asynchronous API.

All posts in the series:

Abstracting away event-driven loops with libuv

In part 3, we've seen how similar select-based and epoll-based servers are, and I mentioned it's very tempting to abstract away the minor differences between them. Numerous libraries are already doing this, however, so in this part I'm going to pick one and use it. The library I'm picking is libuv, which was originally designed to serve as the underlying portable platform layer for Node.js, and has since found use in additional projects. libuv is written in C, which makes it highly portable and very suitable for tying into high-level languages like JavaScript and Python.

While libuv has grown to be a fairly large framework for abstracting low-level platform details, it remains centered on the concept of an event loop. In our event-driven servers in part 3, the event loop was explicit in the main function; when using libuv, the loop is usually hidden inside the library itself, and user code just registers event handlers (as callback functions) and runs the loop. Furthermore, libuv will use the fastest event loop implementation for a given platform: for Linux this is epoll, etc.

libuv loop

libuv supports multiple event loops, and thus an event loop is a first class citizen within the library; it has a handle - uv_loop_t, and functions for creating/destroying/starting/stopping loops. That said, I will only use the "default" loop in this post, which libuv makes available via uv_default_loop(); multiple loops are mosly useful for multi-threaded event-driven servers, a more advanced topic I'll leave for future parts in the series.

A concurrent server using libuv

To get a better feel for libuv, let's jump to our trusty protocol server that we've been vigorously reimplementing throughout the series. The structure of this server is going to be somewhat similar to the select and epoll-based servers of part 3, since it also relies on callbacks. The full code sample is here; we start with setting up the server socket bound to a local port:

int portnum = 9090;
if (argc >= 2) {
  portnum = atoi(argv[1]);
}
printf("Serving on port %d\n", portnum);

int rc;
uv_tcp_t server_stream;
if ((rc = uv_tcp_init(uv_default_loop(), &server_stream)) < 0) {
  die("uv_tcp_init failed: %s", uv_strerror(rc));
}

struct sockaddr_in server_address;
if ((rc = uv_ip4_addr("0.0.0.0", portnum, &server_address)) < 0) {
  die("uv_ip4_addr failed: %s", uv_strerror(rc));
}

if ((rc = uv_tcp_bind(&server_stream, (const struct sockaddr*)&server_address, 0)) < 0) {
  die("uv_tcp_bind failed: %s", uv_strerror(rc));
}

Fairly standard socket fare here, except that it's all wrapped in libuv APIs. In return we get a portable interface that should work on any platform libuv supports.

This code also demonstrates conscientious error handling; most libuv functions return an integer status, with a negative number meaning an error. In our server we treat these errors as fatals, but one may imagine a more graceful recovery.

Now that the socket is bound, it's time to listen on it. Here we run into our first callback registration:

// Listen on the socket for new peers to connect. When a new peer connects,
// the on_peer_connected callback will be invoked.
if ((rc = uv_listen((uv_stream_t*)&server_stream, N_BACKLOG, on_peer_connected)) < 0) {
  die("uv_listen failed: %s", uv_strerror(rc));
}

uv_listen registers a callback that the event loop will invoke when new peers connect to the socket. Our callback here is called on_peer_connected, and we'll examine it soon.

Finally, main runs the libuv loop until it's stopped (uv_run only returns when the loop has stopped or some error occurred).

// Run the libuv event loop.
uv_run(uv_default_loop(), UV_RUN_DEFAULT);

// If uv_run returned, close the default loop before exiting.
return uv_loop_close(uv_default_loop());

Note that only a single callback was registered by main prior to running the event loop; we'll soon see how additional callbacks are added. It's not a problem to add and remove callbacks throughout the runtime of the event loop - in fact, this is how most servers are expected to be written.

This is on_peer_connected, which handles new client connections to the server:

void on_peer_connected(uv_stream_t* server_stream, int status) {
  if (status < 0) {
    fprintf(stderr, "Peer connection error: %s\n", uv_strerror(status));
    return;
  }

  // client will represent this peer; it's allocated on the heap and only
  // released when the client disconnects. The client holds a pointer to
  // peer_state_t in its data field; this peer state tracks the protocol state
  // with this client throughout interaction.
  uv_tcp_t* client = (uv_tcp_t*)xmalloc(sizeof(*client));
  int rc;
  if ((rc = uv_tcp_init(uv_default_loop(), client)) < 0) {
    die("uv_tcp_init failed: %s", uv_strerror(rc));
  }
  client->data = NULL;

  if (uv_accept(server_stream, (uv_stream_t*)client) == 0) {
    struct sockaddr_storage peername;
    int namelen = sizeof(peername);
    if ((rc = uv_tcp_getpeername(client, (struct sockaddr*)&peername,
                                 &namelen)) < 0) {
      die("uv_tcp_getpeername failed: %s", uv_strerror(rc));
    }
    report_peer_connected((const struct sockaddr_in*)&peername, namelen);

    // Initialize the peer state for a new client: we start by sending the peer
    // the initial '*' ack.
    peer_state_t* peerstate = (peer_state_t*)xmalloc(sizeof(*peerstate));
    peerstate->state = INITIAL_ACK;
    peerstate->sendbuf[0] = '*';
    peerstate->sendbuf_end = 1;
    peerstate->client = client;
    client->data = peerstate;

    // Enqueue the write request to send the ack; when it's done,
    // on_wrote_init_ack will be called. The peer state is passed to the write
    // request via the data pointer; the write request does not own this peer
    // state - it's owned by the client handle.
    uv_buf_t writebuf = uv_buf_init(peerstate->sendbuf, peerstate->sendbuf_end);
    uv_write_t* req = (uv_write_t*)xmalloc(sizeof(*req));
    req->data = peerstate;
    if ((rc = uv_write(req, (uv_stream_t*)client, &writebuf, 1,
                       on_wrote_init_ack)) < 0) {
      die("uv_write failed: %s", uv_strerror(rc));
    }
  } else {
    uv_close((uv_handle_t*)client, on_client_closed);
  }
}

This code is well commented, but there are a couple of important libuv idioms I'd like to highlight:

  • Passing custom data into callbacks: since C has no closures, this can be challenging. libuv has a void* data field in all its handle types; these fields can be used to pass user data. For example, note how client->data is made to point to a peer_state_t structure so that the callbacks registered by uv_write and uv_read_start can know which peer data they're dealing with.
  • Memory management: event-driven programming is much easier in languages with garbage collection, because callbacks usually run in a completely different stack frame from where they were registered, making stack-based memory management difficult. It's almost always necessary to pass heap-allocated data to libuv callbacks (except in main, which remains alive on the stack when all callbacks run), and to avoid leaks much care is required about when these data are safe to free(). This is something that comes with a bit of practice [1].

The peer state for this server is:

typedef struct {
  ProcessingState state;
  char sendbuf[SENDBUF_SIZE];
  int sendbuf_end;
  uv_tcp_t* client;
} peer_state_t;

It's fairly similar to the state in part 3; we no longer need sendptr, since uv_write will make sure to send the whole buffer it's given before invoking the "done writing" callback. We also keep a pointer to the client for other callbacks to use. Here's on_wrote_init_ack:

void on_wrote_init_ack(uv_write_t* req, int status) {
  if (status) {
    die("Write error: %s\n", uv_strerror(status));
  }
  peer_state_t* peerstate = (peer_state_t*)req->data;
  // Flip the peer state to WAIT_FOR_MSG, and start listening for incoming data
  // from this peer.
  peerstate->state = WAIT_FOR_MSG;
  peerstate->sendbuf_end = 0;

  int rc;
  if ((rc = uv_read_start((uv_stream_t*)peerstate->client, on_alloc_buffer,
                          on_peer_read)) < 0) {
    die("uv_read_start failed: %s", uv_strerror(rc));
  }

  // Note: the write request doesn't own the peer state, hence we only free the
  // request itself, not the state.
  free(req);
}

Then we know for sure that the initial '*' was sent to the peer, we start listening to incoming data from this peer by calling uv_read_start, which registers a callback (on_peer_read) that will be invoked by the event loop whenever new data is received on the socket from the client:

void on_peer_read(uv_stream_t* client, ssize_t nread, const uv_buf_t* buf) {
  if (nread < 0) {
    if (nread != uv_eof) {
      fprintf(stderr, "read error: %s\n", uv_strerror(nread));
    }
    uv_close((uv_handle_t*)client, on_client_closed);
  } else if (nread == 0) {
    // from the documentation of uv_read_cb: nread might be 0, which does not
    // indicate an error or eof. this is equivalent to eagain or ewouldblock
    // under read(2).
  } else {
    // nread > 0
    assert(buf->len >= nread);

    peer_state_t* peerstate = (peer_state_t*)client->data;
    if (peerstate->state == initial_ack) {
      // if the initial ack hasn't been sent for some reason, ignore whatever
      // the client sends in.
      free(buf->base);
      return;
    }

    // run the protocol state machine.
    for (int i = 0; i < nread; ++i) {
      switch (peerstate->state) {
      case initial_ack:
        assert(0 && "can't reach here");
        break;
      case wait_for_msg:
        if (buf->base[i] == '^') {
          peerstate->state = in_msg;
        }
        break;
      case in_msg:
        if (buf->base[i] == '$') {
          peerstate->state = wait_for_msg;
        } else {
          assert(peerstate->sendbuf_end < sendbuf_size);
          peerstate->sendbuf[peerstate->sendbuf_end++] = buf->base[i] + 1;
        }
        break;
      }
    }

    if (peerstate->sendbuf_end > 0) {
      // we have data to send. the write buffer will point to the buffer stored
      // in the peer state for this client.
      uv_buf_t writebuf =
          uv_buf_init(peerstate->sendbuf, peerstate->sendbuf_end);
      uv_write_t* writereq = (uv_write_t*)xmalloc(sizeof(*writereq));
      writereq->data = peerstate;
      int rc;
      if ((rc = uv_write(writereq, (uv_stream_t*)client, &writebuf, 1,
                         on_wrote_buf)) < 0) {
        die("uv_write failed: %s", uv_strerror(rc));
      }
    }
  }
  free(buf->base);
}

The runtime behavior of this server is very similar to the event-driven servers of part 3: all clients are handled concurrently in a single thread. Also similarly, a certain discipline has to be maintained in the server's code: the server's logic is implemented as an ensemble of callbacks, and long-running operations are a big no-no since they block the event loop. Let's explore this issue a bit further.

Long-running operations in event-driven loops

The single-threaded nature of event-driven code makes it very susceptible to a common issue: long-running code blocks the entire loop. Consider this program:

void on_timer(uv_timer_t* timer) {
  uint64_t timestamp = uv_hrtime();
  printf("on_timer [%" PRIu64 " ms]\n", (timestamp / 1000000) % 100000);

  // "Work"
  if (random() % 5 == 0) {
    printf("Sleeping...\n");
    sleep(3);
  }
}

int main(int argc, const char** argv) {
  uv_timer_t timer;
  uv_timer_init(uv_default_loop(), &timer);
  uv_timer_start(&timer, on_timer, 0, 1000);
  return uv_run(uv_default_loop(), UV_RUN_DEFAULT);
}

It runs a libuv event loop with a single registered callback: on_timer, which is invoked by the loop every second. The callback reports a timestamp, and once in a while simulates some long-running task by sleeping for 3 seconds. Here's a sample run:

$ ./uv-timer-sleep-demo
on_timer [4840 ms]
on_timer [5842 ms]
on_timer [6843 ms]
on_timer [7844 ms]
Sleeping...
on_timer [11845 ms]
on_timer [12846 ms]
Sleeping...
on_timer [16847 ms]
on_timer [17849 ms]
on_timer [18850 ms]
...

on_timer dutifully fires every second, until the random sleep hits in. At that point, on_timer is not invoked again until the sleep is over; in fact, no other callbacks will be invoked in this time frame. The sleep call blocks the current thread, which is the only thread involved and is also the thread the event loop uses. When this thread is blocked, the event loop is blocked.

This example demonstrates why it's so important for callbacks to never block in event-driven calls, and applies equally to Node.js servers, client-side Javascript, most GUI programming frameworks, and many other asynchronous programming models.

But sometimes running time-consuming tasks is unavoidable. Not all tasks have asynchronous APIs; for example, we may be dealing with some library that only has a synchronous API, or just have to perform a potentially long computation. How can we combine such code with event-driven programming? Threads to the rescue!

Threads for "converting" blocking calls into asynchronous calls

A thread pool can be used to turn blocking calls into asynchronous calls, by running alongside the event loop and posting events onto it when tasks are completed. Here's how it works, for a given blocking function do_work():

  1. Instead of directly calling do_work() in a callback, we package it into a "task" and ask the thread pool to execute the task. We also register a callback for the loop to invoke when the task has finished; let's call it on_work_done().
  2. At this point our callback can return and the event loop keeps spinning; at the same time, a thread in the pool is executing the task.
  3. Once the task has finished executing, the main thread (the one running the event loop) is notified and on_work_done() is invoked by the event loop.

Let's see how this solves our previous timer/sleep example, using libuv's work scheduling API:

void on_after_work(uv_work_t* req, int status) {
  free(req);
}

void on_work(uv_work_t* req) {
  // "Work"
  if (random() % 5 == 0) {
    printf("Sleeping...\n");
    sleep(3);
  }
}

void on_timer(uv_timer_t* timer) {
  uint64_t timestamp = uv_hrtime();
  printf("on_timer [%" PRIu64 " ms]\n", (timestamp / 1000000) % 100000);

  uv_work_t* work_req = (uv_work_t*)malloc(sizeof(*work_req));
  uv_queue_work(uv_default_loop(), work_req, on_work, on_after_work);
}

int main(int argc, const char** argv) {
  uv_timer_t timer;
  uv_timer_init(uv_default_loop(), &timer);
  uv_timer_start(&timer, on_timer, 0, 1000);
  return uv_run(uv_default_loop(), UV_RUN_DEFAULT);
}

Instead of calling sleep directly in on_timer, we enqueue a task, represented by a handle of type work_req [2], the function to run in the task (on_work) and the function to invoke once the task is completed (on_after_work). on_work is where the "work" (the blocking/time-consuming operation) happens. Note a crucial difference between the two callbacks passed into uv_queue_work: on_work runs in the thread pool, while on_after_work runs on the main thread which also runs the event loop - just like any other callback.

Let's see this version run:

$ ./uv-timer-work-demo
on_timer [89571 ms]
on_timer [90572 ms]
on_timer [91573 ms]
on_timer [92575 ms]
Sleeping...
on_timer [93576 ms]
on_timer [94577 ms]
Sleeping...
on_timer [95577 ms]
on_timer [96578 ms]
on_timer [97578 ms]
...

The timer ticks every second, even though the sleeping function is still invoked; sleeping is now done on a separate thread and doesn't block the event loop.

A primality-testing server, with exercises

Since sleep isn't a very exciting way to simulate work, I've prepared a more comprehensive example - a server that accepts numbers from clients over a socket, checks whether these numbers are prime and sends back either "prime" or "composite". The full code for this server is here - I won't post it here since it's long, but will rather give readers the opportunity to explore it on their own with a couple of exercises.

The server deliberatly uses a naive primality test algorithm, so for large primes it can take quite a while to return an answer. On my machine it takes ~5 seconds to compute the answer for 2305843009213693951, but YMMV.

Exercise 1: the server has a setting (via an environment variable named MODE) to either run the primality test in the socket callback (meaning on the main thread) or in the libuv work queue. Play with this setting to observe the server's behavior when multiple clients are connecting simultaneously. In blocking mode, the server will not answer other clients while it's computing a big task; in non-blocking mode it will.

Exercise 2: libuv has a default thread-pool size, and it can be configured via an environment variable. Can you use multiple clients to discover experimentally what the default size is? Having found the default thread-pool size, play with different settings to see how it affects the server's responsiveness under heavy load.

Non-blocking file-system operations using work queues

Delegating potentially-blocking operations to a thread pool isn't good for just silly demos and CPU-intensive computations; libuv itself makes heavy use of this capability in its file-system APIs. This way, libuv accomplishes the superpower of exposing the file-system with an asynchronous API, in a portable way.

Let's take uv_fs_read(), for example. This function reads from a file (represented by a uv_fs_t handle) into a buffer [3], and invokes a callback when the reading is completed. That is, uv_fs_read() always returns immediately, even if the file sits on an NFS-like system and it may take a while for the data to get to the buffer. In other words, this API is asynchronous in the way other libuv APIs are. How does this work?

At this point we're going to look under the hood of libuv; the internals are actually fairly straightforward, and it's a good exercise. Being a portable library, libuv has different implementations of many of its functions for Windows and Unix systems. We're going to be looking at src/unix/fs.c in the libuv source tree.

The code for uv_fs_read is:

int uv_fs_read(uv_loop_t* loop, uv_fs_t* req,
               uv_file file,
               const uv_buf_t bufs[],
               unsigned int nbufs,
               int64_t off,
               uv_fs_cb cb) {
  if (bufs == NULL || nbufs == 0)
    return -EINVAL;

  INIT(READ);
  req->file = file;

  req->nbufs = nbufs;
  req->bufs = req->bufsml;
  if (nbufs > ARRAY_SIZE(req->bufsml))
    req->bufs = uv__malloc(nbufs * sizeof(*bufs));

  if (req->bufs == NULL) {
    if (cb != NULL)
      uv__req_unregister(loop, req);
    return -ENOMEM;
  }

  memcpy(req->bufs, bufs, nbufs * sizeof(*bufs));

  req->off = off;
  POST;
}

It may seem puzzling at first, because it defers the real work to the INIT and POST macros, with some local variable setup for POST. This is done to avoid too much code duplication within the file.

The INIT macro is:

#define INIT(subtype)                                                         \
  do {                                                                        \
    req->type = UV_FS;                                                        \
    if (cb != NULL)                                                           \
      uv__req_init(loop, req, UV_FS);                                         \
    req->fs_type = UV_FS_ ## subtype;                                         \
    req->result = 0;                                                          \
    req->ptr = NULL;                                                          \
    req->loop = loop;                                                         \
    req->path = NULL;                                                         \
    req->new_path = NULL;                                                     \
    req->cb = cb;                                                             \
  }                                                                           \
  while (0)

It sets up the request, and most importantly sets the req->fs_type field to the actual FS request type. Since uv_fs_read invokes INIT(READ), it means req->fs_type gets assigned the constant UV_FS_READ.

The POST macro is:

#define POST                                                                  \
  do {                                                                        \
    if (cb != NULL) {                                                         \
      uv__work_submit(loop, &req->work_req, uv__fs_work, uv__fs_done);        \
      return 0;                                                               \
    }                                                                         \
    else {                                                                    \
      uv__fs_work(&req->work_req);                                            \
      return req->result;                                                     \
    }                                                                         \
  }                                                                           \
  while (0)

What it does depends on whether the callback is NULL. In libuv file-system APIs, a NULL callback means we actually want to perform the operation synchronously. In this case POST invokes uv__fs_work directly (we'll get to what this function does in just a bit), whereas for a non-NULL callback, it submits uv__fs_work as a work item to the work queue (which is the thread pool), and registers uv__fs_done as the callback; that function does a bit of book-keeping and invokes the user-provided callback.

If we look at the code of uv__fs_work, we'll see it uses more macros to route work to the actual file-system call as needed. In our case, for UV_FS_READ the call will be made to uv__fs_read, which (at last!) does the reading using regular POSIX APIs. This function can be safely implemented in a blocking manner, since it's placed on a thread-pool when called through the asynchronous API.

In Node.js, the fs.readFile function is mapped to uv_fs_read. Thus, reading files can be done in a non-blocking fashion even though the underlying file-system API is blocking.


[1]To ensure that this server doesn't leak memory, I ran it under Valgrind with the leak checker enabled. Since servers are often designed to run forever, this was a bit challenging; to overcome this issue I've added a "kill switch" to the server - a special sequence received from a client makes it stop the event loop and exit. The code for this is in the on_wrote_buf handler.
[2]Here we don't use work_req for much; the primality testing server discussed next will show how it's used to pass context information into the callback.
[3]uv_fs_read() provides a generalized API similar to the preadv Linux system call: it takes multiple buffers which it fills in order, and supports an offset into the file. We can ignore these features for the sake of our discussion.

Concurrent Servers: Part 3 - Event-driven



This is part 3 of a series of posts on writing concurrent network servers. Part 1 introduced the series with some building blocks, and part 2 - Threads discussed multiple threads as one viable approach for concurrency in the server.

Another common approach to achieve concurrency is called event-driven programming, or alternatively asynchronous programming [1]. The range of variations on this approach is very large, so we're going to start by covering the basics - using some of the fundamental APIs than form the base of most higher-level approaches. Future posts in the series will cover higher-level abstractions, as well as various hybrid approaches.

All posts in the series:

Blocking vs. nonblocking I/O

As an introduction to the topic, let's talk about the difference between blocking and nonblocking I/O. Blocking I/O is easier to undestand, since this is the "normal" way we're used to I/O APIs working. While receiving data from a socket, a call to recv blocks until some data is received from the peer connected to the other side of the socket. This is precisely the issue with the sequential server of part 1.

So blocking I/O has an inherent performance problem. We saw one way to tackle this problem in part 2, using multiple threads. As long as one thread is blocked on I/O, other threads can continue using the CPU. In fact, blocking I/O is usually very efficient on resource usage while the thread is waiting - the thread is put to sleep by the OS and only wakes up when whatever it was waiting for is available.

Nonblocking I/O is a different approach. When a socket is set to nonblocking mode, a call to recv (and to send, but let's just focus on receiving here) will always return very quickly, even if there's no data to receive. In this case, it will return a special error status [2] notifying the caller that there's no data to receive at this time. The caller can then go do something else, or try to call recv again.

The difference between blocking and nonblocking recv is easiest to demonstrate with a simple code sample. Here's a small program that listens on a socket, continuously blocking on recv; when recv returns data, the program just reports how many bytes were received [3]:

int main(int argc, const char** argv) {
  setvbuf(stdout, NULL, _IONBF, 0);

  int portnum = 9988;
  if (argc >= 2) {
    portnum = atoi(argv[1]);
  }
  printf("Listening on port %d\n", portnum);

  int sockfd = listen_inet_socket(portnum);
  struct sockaddr_in peer_addr;
  socklen_t peer_addr_len = sizeof(peer_addr);

  int newsockfd = accept(sockfd, (struct sockaddr*)&peer_addr, &peer_addr_len);
  if (newsockfd < 0) {
    perror_die("ERROR on accept");
  }
  report_peer_connected(&peer_addr, peer_addr_len);

  while (1) {
    uint8_t buf[1024];
    printf("Calling recv...\n");
    int len = recv(newsockfd, buf, sizeof buf, 0);
    if (len < 0) {
      perror_die("recv");
    } else if (len == 0) {
      printf("Peer disconnected; I'm done.\n");
      break;
    }
    printf("recv returned %d bytes\n", len);
  }

  close(newsockfd);
  close(sockfd);

  return 0;
}

The main loop repeatedly calls recv and reports what it returned (recall that recv returns 0 when the peer has disconnected). To try it out, we'll run this program in one terminal, and in a separate terminal connect to it with nc, sending a couple of short lines, separated by a delay of a couple of seconds:

$ nc localhost 9988
hello                                   # wait for 2 seconds after typing this
socket world
^D                                      # to end the connection>

The listening program will print the following:

$ ./blocking-listener 9988
Listening on port 9988
peer (localhost, 37284) connected
Calling recv...
recv returned 6 bytes
Calling recv...
recv returned 13 bytes
Calling recv...
Peer disconnected; I'm done.

Now let's try a nonblocking version of the same listening program. Here it is:

int main(int argc, const char** argv) {
  setvbuf(stdout, NULL, _IONBF, 0);

  int portnum = 9988;
  if (argc >= 2) {
    portnum = atoi(argv[1]);
  }
  printf("Listening on port %d\n", portnum);

  int sockfd = listen_inet_socket(portnum);
  struct sockaddr_in peer_addr;
  socklen_t peer_addr_len = sizeof(peer_addr);

  int newsockfd = accept(sockfd, (struct sockaddr*)&peer_addr, &peer_addr_len);
  if (newsockfd < 0) {
    perror_die("ERROR on accept");
  }
  report_peer_connected(&peer_addr, peer_addr_len);

  // Set nonblocking mode on the socket.
  int flags = fcntl(newsockfd, F_GETFL, 0);
  if (flags == -1) {
    perror_die("fcntl F_GETFL");
  }

  if (fcntl(newsockfd, F_SETFL, flags | O_NONBLOCK) == -1) {
    perror_die("fcntl F_SETFL O_NONBLOCK");
  }

  while (1) {
    uint8_t buf[1024];
    printf("Calling recv...\n");
    int len = recv(newsockfd, buf, sizeof buf, 0);
    if (len < 0) {
      if (errno == EAGAIN || errno == EWOULDBLOCK) {
        usleep(200 * 1000);
        continue;
      }
      perror_die("recv");
    } else if (len == 0) {
      printf("Peer disconnected; I'm done.\n");
      break;
    }
    printf("recv returned %d bytes\n", len);
  }

  close(newsockfd);
  close(sockfd);

  return 0;
}

A couple of notable differences from the blocking version:

  1. The newsockfd socket returned by accept is set to nonblocking mode by calling fcntl.
  2. When examining the return status of recv, we check whether errno is set to a value saying that no data is available for receiving. In this case we just sleep for 200 milliseconds and continue to the next iteration of the loop.

The same expermient with nc yields the following printout from this nonblocking listener:

$ ./nonblocking-listener 9988
Listening on port 9988
peer (localhost, 37288) connected
Calling recv...
Calling recv...
Calling recv...
Calling recv...
Calling recv...
Calling recv...
Calling recv...
Calling recv...
Calling recv...
recv returned 6 bytes
Calling recv...
Calling recv...
Calling recv...
Calling recv...
Calling recv...
Calling recv...
Calling recv...
Calling recv...
Calling recv...
Calling recv...
Calling recv...
recv returned 13 bytes
Calling recv...
Calling recv...
Calling recv...
Peer disconnected; I'm done.

As an exercise, add a timestamp to the printouts and convince yourself that the total time elapsed between fruitful calls to recv is more or less the delay in typing the lines into nc (rounded to the next 200 ms).

So there we have it - using nonblocking recv makes it possible for the listener the check in with the socket, and regain control if no data is available yet. Another word to describe this in the domain of programming is polling - the main program periodically polls the socket for its readiness.

It may seem like a potential solution to the sequential serving issue. Nonblocking recv makes it possible to work with multiple sockets simulatenously, polling them for data and only handling those that have new data. This is true - concurrent servers could be written this way; but in reality they don't, because the polling approach scales very poorly.

First, the 200 ms delay I introduced in the code above is nice for the demonstration (the listener prints only a few lines of "Calling recv..." between my typing into nc as opposed to thousands), but it also incurs a delay of up to 200 ms to the server's response time, which is almost certainly undesirable. In real programs the delay would have to be much shorter, and the shorter the sleep, the more CPU the process consumes. These are cycles consumed for just waiting, which isn't great, especially on mobile devices where power matters.

But the bigger problem happens when we actually have to work with multiple sockets this way. Imagine this listener is handling 1000 clients concurrently. This means that in every loop iteration, it has to do a nonblocking recv on each and every one of those 1000 sockets, looking for one which has data ready. This is terribly inefficient, and severely limits the number of clients this server can handle concurrently. There's a catch-22 here: the longer we wait between polls, the less responsive the server is; the shorter we wait, the more CPU resources we burn on useless polling.

Frankly, all this polling also feels like useless work. Surely somewhere in the OS it is known which socket is actually ready with data, so we don't have to scan all of them. Indeed, it is, and the rest of this post will showcase a couple of APIs that let us handle multiple clients much more gracefully.

select

The select system call is a portable (POSIX), venerable part of the standard Unix API. It was designed precisely for the problem described towards the end of the previous section - to allow a single thread to "watch" a non-trivial number of file descriptors [4] for changes, without needlessly spinning in a polling loop. I don't plan to include a comprehensive tutorial for select in this post - there are many websites and book chapters for that - but I will describe its API in the context of the problem we're trying to solve, and will present a fairly complete example.

select enables I/O multiplexing - monitoring multiple file descriptors to see if I/O is possible on any of them.

int select(int nfds, fd_set *readfds, fd_set *writefds,
           fd_set *exceptfds, struct timeval *timeout);

readfds points to a buffer of file descriptors we're watching for read events; fd_set is an opaque data structure users manipulate using FD_* macros. writefds is the same for write events. nfds is the highest file descriptor number (file descriptors are just integers) in the watched buffers. timeout lets the user specify how long select should block waiting for one of the file descriptors to be ready (timeout == NULL means block indefinitely). I'll ignore exceptfds for now.

The contract of calling select is as follows:

  1. Prior to the call, the user has to create fd_set instances for all the different kinds of descriptors to watch. If we want to watch for both read events and write events, both readfds and writefds should be created and populated.
  2. The user uses FD_SET to set specific descriptors to watch in the set. For example, if we want to watch descriptors 2, 7 and 10 for read events, we call FD_SET three times on readfds, once for each of 2, 7 and 10.
  3. select is called.
  4. When select returns (let's ignore timeouts for now), it says how many descriptors in the sets passed to it are ready. It also modifies the readfds and writefds sets to mark only those descriptors that are ready. All the other descriptors are cleared.
  5. At this point the user has to iterate over readfds and writefds to find which descriptors are ready (using FD_ISSET).

As a complete example, I've reimplemented our protocol in a concurrent server that uses select. The full code is here; what follows is some highlights from the code, with explanations. Warning: this code sample is fairly substantial - so feel free to skip it on first reading if you're short on time.

A concurrent server using select

Using an I/O multiplexing API like select imposes certain constraints on the design of our server; these may not be immediately obvious, but are worth discussing since they are key to understanding what event-driven programming is all about.

Most importantly, always keep in mind that such an approach is, in its core, single-threaded [5]. The server really is just doing one thing at a time. Since we want to handle multiple clients concurrently, we'll have to structure the code in an unusual way.

First, let's talk about the main loop. How would that look? To answer this question let's imagine our server during a flurry of activity - what should it watch for? Two kinds of socket activities:

  1. New clients trying to connect. These clients should be accept-ed.
  2. Existing client sending data. This data has to go through the usual protocol described in part 1, with perhaps some data being sent back.

Even though these two activities are somewhat different in nature, we'll have to mix them into the same loop, because there can only be one main loop. Our loop will revolve around calls to select. This select call will watch for the two kinds of events described above.

Here's the part of the code that sets up the file descriptor sets and kicks off the main loop with a call to select:

// The "master" sets are owned by the loop, tracking which FDs we want to
// monitor for reading and which FDs we want to monitor for writing.
fd_set readfds_master;
FD_ZERO(&readfds_master);
fd_set writefds_master;
FD_ZERO(&writefds_master);

// The listenting socket is always monitored for read, to detect when new
// peer connections are incoming.
FD_SET(listener_sockfd, &readfds_master);

// For more efficiency, fdset_max tracks the maximal FD seen so far; this
// makes it unnecessary for select to iterate all the way to FD_SETSIZE on
// every call.
int fdset_max = listener_sockfd;

while (1) {
  // select() modifies the fd_sets passed to it, so we have to pass in copies.
  fd_set readfds = readfds_master;
  fd_set writefds = writefds_master;

  int nready = select(fdset_max + 1, &readfds, &writefds, NULL, NULL);
  if (nready < 0) {
    perror_die("select");
  }
  ...

A couple of points of interest here:

  1. Since every call to select overwrites the sets given to the function, the caller has to maintain a "master" set to keep track of all the active sockets it monitors across loop iterations.
  2. Note how, initially, the only socket we care about is listener_sockfd, which is the original socket on which the server accepts new clients.
  3. The return value of select is the number of descriptors that are ready among those in the sets passed as arguments. The sets are modified by select to mark ready descriptors. The next step is iterating over the descriptors.
...
for (int fd = 0; fd <= fdset_max && nready > 0; fd++) {
  // Check if this fd became readable.
  if (FD_ISSET(fd, &readfds)) {
    nready--;

    if (fd == listener_sockfd) {
      // The listening socket is ready; this means a new peer is connecting.
      ...
    } else {
      fd_status_t status = on_peer_ready_recv(fd);
      if (status.want_read) {
        FD_SET(fd, &readfds_master);
      } else {
        FD_CLR(fd, &readfds_master);
      }
      if (status.want_write) {
        FD_SET(fd, &writefds_master);
      } else {
        FD_CLR(fd, &writefds_master);
      }
      if (!status.want_read && !status.want_write) {
        printf("socket %d closing\n", fd);
        close(fd);
      }
    }

This part of the loop checks the readable descriptors. Let's skip the listener socket (for the full scoop - read the code) and see what happens when one of the client sockets is ready. When this happens, we call a callback function named on_peer_ready_recv with the file descriptor for the socket. This call means the client connected to that socket sent some data and a call to recv on the socket isn't expected to block [6]. This callback returns a struct of type fd_status_t:

typedef struct {
  bool want_read;
  bool want_write;
} fd_status_t;

Which tells the main loop whether the socket should be watched for read events, write events, or both. The code above shows how FD_SET and FD_CLR are called on the appropriate descriptor sets accordingly. The code for a descriptor being ready for writing in the main loop is similar, except that the callback it invokes is called on_peer_ready_send.

Now it's time to look at the code for the callback itself:

typedef enum { INITIAL_ACK, WAIT_FOR_MSG, IN_MSG } ProcessingState;

#define SENDBUF_SIZE 1024

typedef struct {
  ProcessingState state;

  // sendbuf contains data the server has to send back to the client. The
  // on_peer_ready_recv handler populates this buffer, and on_peer_ready_send
  // drains it. sendbuf_end points to the last valid byte in the buffer, and
  // sendptr at the next byte to send.
  uint8_t sendbuf[SENDBUF_SIZE];
  int sendbuf_end;
  int sendptr;
} peer_state_t;

// Each peer is globally identified by the file descriptor (fd) it's connected
// on. As long as the peer is connected, the fd is uqique to it. When a peer
// disconnects, a new peer may connect and get the same fd. on_peer_connected
// should initialize the state properly to remove any trace of the old peer on
// the same fd.
peer_state_t global_state[MAXFDS];

fd_status_t on_peer_ready_recv(int sockfd) {
  assert(sockfd < MAXFDs);
  peer_state_t* peerstate = &global_state[sockfd];

  if (peerstate->state == INITIAL_ACK ||
      peerstate->sendptr < peerstate->sendbuf_end) {
    // Until the initial ACK has been sent to the peer, there's nothing we
    // want to receive. Also, wait until all data staged for sending is sent to
    // receive more data.
    return fd_status_W;
  }

  uint8_t buf[1024];
  int nbytes = recv(sockfd, buf, sizeof buf, 0);
  if (nbytes == 0) {
    // The peer disconnected.
    return fd_status_NORW;
  } else if (nbytes < 0) {
    if (errno == EAGAIN || errno == EWOULDBLOCK) {
      // The socket is not *really* ready for recv; wait until it is.
      return fd_status_R;
    } else {
      perror_die("recv");
    }
  }
  bool ready_to_send = false;
  for (int i = 0; i < nbytes; ++i) {
    switch (peerstate->state) {
    case INITIAL_ACK:
      assert(0 && "can't reach here");
      break;
    case WAIT_FOR_MSG:
      if (buf[i] == '^') {
        peerstate->state = IN_MSG;
      }
      break;
    case IN_MSG:
      if (buf[i] == '$') {
        peerstate->state = WAIT_FOR_MSG;
      } else {
        assert(peerstate->sendbuf_end < SENDBUF_SIZE);
        peerstate->sendbuf[peerstate->sendbuf_end++] = buf[i] + 1;
        ready_to_send = true;
      }
      break;
    }
  }
  // Report reading readiness iff there's nothing to send to the peer as a
  // result of the latest recv.
  return (fd_status_t){.want_read = !ready_to_send,
                       .want_write = ready_to_send};
}

A peer_state_t is the full state object used to represent a client connection between callback calls from the main loop. Since a callback is invoked on some partial data sent by the client, it cannot assume it will be able to communicate with the client continuously, and it has to run quickly without blocking. It never blocks because the socket is set to non-blocking mode and recv will always return quickly. Other than calling recv, all this handler does is manipulate the state - there are no additional calls that could potentially block.

An an exercise, can you figure out why this code needs an extra state? Our servers so far in the series managed with just two states, but this one needs three.

Let's also have a look at the "socket ready to send" callback:

fd_status_t on_peer_ready_send(int sockfd) {
  assert(sockfd < MAXFDs);
  peer_state_t* peerstate = &global_state[sockfd];

  if (peerstate->sendptr >= peerstate->sendbuf_end) {
    // Nothing to send.
    return fd_status_RW;
  }
  int sendlen = peerstate->sendbuf_end - peerstate->sendptr;
  int nsent = send(sockfd, peerstate->sendbuf, sendlen, 0);
  if (nsent == -1) {
    if (errno == EAGAIN || errno == EWOULDBLOCK) {
      return fd_status_W;
    } else {
      perror_die("send");
    }
  }
  if (nsent < sendlen) {
    peerstate->sendptr += nsent;
    return fd_status_W;
  } else {
    // Everything was sent successfully; reset the send queue.
    peerstate->sendptr = 0;
    peerstate->sendbuf_end = 0;

    // Special-case state transition in if we were in INITIAL_ACK until now.
    if (peerstate->state == INITIAL_ACK) {
      peerstate->state = WAIT_FOR_MSG;
    }

    return fd_status_R;
  }
}

Same here - the callback calls a non-blocking send and performs state manipulation. In asynchronous code, it's critical for callbacks to do their work quickly - any delay blocks the main loop from making progress, and thus blocks the whole server from handling other clients.

Let's once again repeat a run of the server with the script that connects 3 clients simultaneously. In one terminal window we'll run:

$ ./select-server

In another:

$ python3.6 simple-client.py  -n 3 localhost 9090
INFO:2017-09-26 05:29:15,864:conn1 connected...
INFO:2017-09-26 05:29:15,864:conn2 connected...
INFO:2017-09-26 05:29:15,864:conn0 connected...
INFO:2017-09-26 05:29:15,865:conn1 sending b'^abc$de^abte$f'
INFO:2017-09-26 05:29:15,865:conn2 sending b'^abc$de^abte$f'
INFO:2017-09-26 05:29:15,865:conn0 sending b'^abc$de^abte$f'
INFO:2017-09-26 05:29:15,865:conn1 received b'bcdbcuf'
INFO:2017-09-26 05:29:15,865:conn2 received b'bcdbcuf'
INFO:2017-09-26 05:29:15,865:conn0 received b'bcdbcuf'
INFO:2017-09-26 05:29:16,866:conn1 sending b'xyz^123'
INFO:2017-09-26 05:29:16,867:conn0 sending b'xyz^123'
INFO:2017-09-26 05:29:16,867:conn2 sending b'xyz^123'
INFO:2017-09-26 05:29:16,867:conn1 received b'234'
INFO:2017-09-26 05:29:16,868:conn0 received b'234'
INFO:2017-09-26 05:29:16,868:conn2 received b'234'
INFO:2017-09-26 05:29:17,868:conn1 sending b'25$^ab0000$abab'
INFO:2017-09-26 05:29:17,869:conn1 received b'36bc1111'
INFO:2017-09-26 05:29:17,869:conn0 sending b'25$^ab0000$abab'
INFO:2017-09-26 05:29:17,870:conn0 received b'36bc1111'
INFO:2017-09-26 05:29:17,870:conn2 sending b'25$^ab0000$abab'
INFO:2017-09-26 05:29:17,870:conn2 received b'36bc1111'
INFO:2017-09-26 05:29:18,069:conn1 disconnecting
INFO:2017-09-26 05:29:18,070:conn0 disconnecting
INFO:2017-09-26 05:29:18,070:conn2 disconnecting

Similarly to the threaded case, there's no delay between clients - they are all handled concurrently. And yet, there are no threads in sight in select-server! The main loop multiplexes all the clients by efficient polling of multiple sockets using select. Recall the sequential vs. multi-threaded client handling diagrams from part 2. For our select-server, the time flow for three clients looks something like this:

Multiplexed client-handling flow

All clients are handled concurrently within the same thread, by multiplexing - doing some work for a client, switching to another, then another, then going back to the original client, etc. Note that there's no specific round-robin order here - the clients are handled when they send data to the server, which really depends on the client.

Synchronous, asynchronous, event-driven, callback-based

The select-server code sample provides a good background for discussing just what is meant by "asynchronous" programming, and how it relates to event-driven and callback-based programming, because all these terms are common in the (rather inconsistent) discussion of concurrent servers.

Let's start with a quote from select's man page:

select, pselect, FD_CLR, FD_ISSET, FD_SET, FD_ZERO - synchronous I/O multiplexing

So select is for synchronous multiplexing. But I've just presented a substantial code sample using select as an example of an asynchronous server; what gives?

The answer is: it depends on your point of view. Synchronous is often used as a synonym for blocking, and the calls to select are, indeed, blocking. So are the calls to send and recv in the sequential and threaded servers presented in parts 1 and 2. So it is fair to say that select is a synchronous API. However, the server design emerging from the use of select is actually asynchronous, or callback-based, or event-driven. Note that the on_peer_* functions presented in this post are callbacks; they should never block, and they get invoked due to network events. They can get partial data, and are expected to retain coherent state in-between invocations.

If you've done any amont of GUI programming in the past, all of this is very familiar. There's an "event loop" that's often entirely hidden in frameworks, and the application's "business logic" is built out of callbacks that get invoked by the event loop due to various events - user mouse clicks, menu selections, timers firing, data arriving on sockets, etc. The most ubiquitous model of programming these days is, of course, client-side Javascript, which is written as a bunch of callbacks invoked by user activity on a web page.

The limitations of select

Using select for our first example of an asynchronous server makes sense to present the concept, and also because select is such an ubiquitous and portable API. But it also has some significant limitations that manifest when the number of watched file descriptors is very large:

  1. Limited file descriptor set size.
  2. Bad performance.

Let's start with the file descriptor size. FD_SETSIZE is a compile-time constant that's usually equal to 1024 on modern systems. It's hard-coded deep in the guts of glibc, and isn't easy to modify. It limits the number of file descriptors a select call can watch to 1024. These days folks want to write servers that handle 10s of thousands of concurrent clients and more, so this problem is real. There are workarounds, but they aren't portable and aren't easy.

The bad performance issue is a bit more subtle, but still very serious. Note that when select returns, the information it provides to the caller is the number of "ready" descriptors, and the updated descriptor sets. The descriptor sets map from desrciptor to "ready/not ready" but they don't provide a way to iterate over all the ready descriptors efficiently. If there's only a single descriptor that is ready in the set, in the worst case the caller has to iterate over the entire set to find it. This works OK when the number of descriptors watched is small, but if it gets to high numbers this overhead starts hurting [7].

For these reasons select has recently fallen out of favor for writing high-performance concurrent servers. Every popular OS has its own, non-portable APIs that permit users to write much more performant event loops; higher-level interfaces like frameworks and high-level languages usually wrap these APIs in a single portable interface.

epoll

As an example, let's look at epoll, Linux's solution to the high-volume I/O event notification problem. The key to epoll's efficiency is greater cooperation from the kernel. Instead of using a file descriptor set, epoll_wait fills a buffer with events that are currently ready. Only the ready events are added to the buffer, so there is no need to iterate over all the currently watched file descriptors in the client. This changes the process of discovering which descriptors are ready from O(N) in select's case to O(1).

A full presentation of the epoll API is not the goal here - there are plenty of online resources for that. As you may have guessed, though, I am going to write yet another version of our concurrent server - this time using epoll instead of select. The full code sample is here. In fact, since the vast majority of the code is the same as select-server, I'll only focus on the novelty - the use of epoll in the main loop:

struct epoll_event accept_event;
accept_event.data.fd = listener_sockfd;
accept_event.events = EPOLLIN;
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, listener_sockfd, &accept_event) < 0) {
  perror_die("epoll_ctl EPOLL_CTL_ADD");
}

struct epoll_event* events = calloc(MAXFDS, sizeof(struct epoll_event));
if (events == NULL) {
  die("Unable to allocate memory for epoll_events");
}

while (1) {
  int nready = epoll_wait(epollfd, events, MAXFDS, -1);
  for (int i = 0; i < nready; i++) {
    if (events[i].events & EPOLLERR) {
      perror_die("epoll_wait returned EPOLLERR");
    }

    if (events[i].data.fd == listener_sockfd) {
      // The listening socket is ready; this means a new peer is connecting.
      ...
    } else {
      // A peer socket is ready.
      if (events[i].events & EPOLLIN) {
        // Ready for reading.
        ...
      } else if (events[i].events & EPOLLOUT) {
        // Ready for writing.
        ...
      }
    }
  }
}

We start by configuring epoll with a call to epoll_ctl. In this case, the configuration amounts to adding the listening socket to the descriptors epoll is watching for us. We then allocate a buffer of ready events to pass to epoll for modification. The call to epoll_wait in the main loop is where the magic's at. It blocks until one of the watched descriptors is ready (or until a timeout expires), and returns the number of ready descriptors. This time, however, instead of blindly iterating over all the watched sets, we know that epoll_write populated the events buffer passed to it with the ready events, from 0 to nready-1, so we iterate only the strictly necessary number of times.

To reiterate this critical difference from select: if we're watching 1000 descriptors and two become ready, epoll_waits returns nready=2 and populates the first two elements of the events buffer - so we only "iterate" over two descriptors. With select we'd still have to iterate over 1000 descriptors to find out which ones are ready. For this reason epoll scales much better than select for busy servers with many active sockets.

The rest of the code is straightforward, since we're already familiar with select-server. In fact, all the "business logic" of epoll-server is exactly the same as for select-server - the callbacks consist of the same code.

This similarity is tempting to exploit by abstracting away the event loop into a library/framework. I'm going to resist this itch, because so many great programmers succumbed to it in the past. Instead, in the next post we're going to look at libuv - one of the more popular event loop abstractions emerging recently. Libraries like libuv allow us to write concurrent asynchronous servers without worrying about the greasy details of the underlying system calls.


[1]I tried enlightening myself on the actual semantic difference between the two by doing some web browsing and reading, but got a headache fairly quickly. There are many different opinions ranging from "they're the same thing", to "one is a subset of another" to "they're completely different things". When faced with such divergent views on the semantics, it's best to abandon the issue entirely, focusing instead on specific examples and use cases.
[2]POSIX mandates that this can be either EAGAIN or EWOULDBLOCK, and portable applications should check for both.
[3]Similarly to all C samples in this series, this code uses some helper utilities to set up listening sockets. The full code for these utilities lives in the utils module in the repository.
[4]select is not a network/socket-specific function; it watches arbitrary file descriptors, which could be disk files, pipes, terminals, sockets or anything else Unix systems represent with file descriptors. In this post we're focusing on its uses for sockets, of course.
[5]There are ways to intermix event-driven programming with multiple threads, but I'll defer this discussion to later in the series.
[6]Due to various non-trivial reasons it could still block, even after select says it's ready. Therefore, all sockets opened by this server are set to nonblocking mode, and if the call to recv or send returns EAGAIN or EWOULDBLOCK, the callbacks just assumed no event really happened. Read the code sample comments for more details.
[7]Note that this still isn't as bad as the asynchronous polling example presented earlier in the post. The polling has to happen all the time, while select actually blocks until one or more sockets are ready for reading/writing; far less CPU time is wasted with select than with repeated polling.

Concurrent Servers: Part 2 - Threads



This is part 2 of a series on writing concurrent network servers. Part 1 presented the protocol implemented by the server, as well as the code for a simple sequential server, as a baseline for the series.

In this part, we're going to look at multi-threading as one approach to concurrency, with a bare-bones threaded server implementation in C, as well as a thread pool based implementation in Python.

All posts in the series:

The multi-threaded approach to concurrent server design

When discussing the performance of the sequential server in part 1, it was immediately obvious that a lot of compute resources are wasted while the server processes a client connection. Even assuming a client that sends messages immediately and doesn't do any waiting, network communication is still involved; networks tend to be millions (or more) times slower than a modern CPU, so the CPU running the sequential server will spend the vast majority of time in gloriuos boredom waiting for new socket traffic to arrive.

Here's a chart showing how sequential client processing happens over time:

Sequential client-handling flow

The diagrams shows 3 clients. The diamond shapes denote the client's "arrival time" (the time at which the client attempted to connect to the server). The black lines denote "wait time" (the time clients spent waiting for the server to actually accept their connection), and the colored bars denote actual "processing time" (the time server and client are interacting using the protocol). At the end of the colored bar, the client disconnects.

In the diagram above, even though the green and orange clients arrived shortly after the blue one, they have to wait for a while until the server is done with the blue client. At this point the green client is accepted, while the orange one has to wait even longer.

A multi-threaded server would launch multiple control threads, letting the OS manage concurrency on the CPU (and across multiple CPU cores). When a client connects, a thread is created to serve it, while the server is ready to accept more clients in the main thread. The time chart for this mode looks like the following:

Concurrent client-handling flow

One thread per client, in C using pthreads

Our first code sample in this post is a simple "one thread per client" server, written in C using the foundational pthreads API for multi-threading. Here's the main loop:

while (1) {
  struct sockaddr_in peer_addr;
  socklen_t peer_addr_len = sizeof(peer_addr);

  int newsockfd =
      accept(sockfd, (struct sockaddr*)&peer_addr, &peer_addr_len);

  if (newsockfd < 0) {
    perror_die("ERROR on accept");
  }

  report_peer_connected(&peer_addr, peer_addr_len);
  pthread_t the_thread;

  thread_config_t* config = (thread_config_t*)malloc(sizeof(*config));
  if (!config) {
    die("OOM");
  }
  config->sockfd = newsockfd;
  pthread_create(&the_thread, NULL, server_thread, config);

  // Detach the thread - when it's done, its resources will be cleaned up.
  // Since the main thread lives forever, it will outlive the serving threads.
  pthread_detach(the_thread);
}

And this is the server_thread function:

void* server_thread(void* arg) {
  thread_config_t* config = (thread_config_t*)arg;
  int sockfd = config->sockfd;
  free(config);

  // This cast will work for Linux, but in general casting pthread_id to an
  // integral type isn't portable.
  unsigned long id = (unsigned long)pthread_self();
  printf("Thread %lu created to handle connection with socket %d\n", id,
         sockfd);
  serve_connection(sockfd);
  printf("Thread %lu done\n", id);
  return 0;
}

The thread "configuration" is passed as a thread_config_t structure:

typedef struct { int sockfd; } thread_config_t;

The pthread_create call in the main loop launches a new thread that runs the server_thread function. This thread terminates when server_thread returns. In turn, server_thread returns when serve_connection returns. serve_connection is exactly the same function from part 1.

In part 1 we used a script to launch multiple clients concurrently and observe how the server handles them. Let's do the same with the multithreaded server:

$ python3.6 simple-client.py  -n 3 localhost 9090
INFO:2017-09-20 06:31:56,632:conn1 connected...
INFO:2017-09-20 06:31:56,632:conn2 connected...
INFO:2017-09-20 06:31:56,632:conn0 connected...
INFO:2017-09-20 06:31:56,632:conn1 sending b'^abc$de^abte$f'
INFO:2017-09-20 06:31:56,632:conn2 sending b'^abc$de^abte$f'
INFO:2017-09-20 06:31:56,632:conn0 sending b'^abc$de^abte$f'
INFO:2017-09-20 06:31:56,633:conn1 received b'b'
INFO:2017-09-20 06:31:56,633:conn2 received b'b'
INFO:2017-09-20 06:31:56,633:conn0 received b'b'
INFO:2017-09-20 06:31:56,670:conn1 received b'cdbcuf'
INFO:2017-09-20 06:31:56,671:conn0 received b'cdbcuf'
INFO:2017-09-20 06:31:56,671:conn2 received b'cdbcuf'
INFO:2017-09-20 06:31:57,634:conn1 sending b'xyz^123'
INFO:2017-09-20 06:31:57,634:conn2 sending b'xyz^123'
INFO:2017-09-20 06:31:57,634:conn1 received b'234'
INFO:2017-09-20 06:31:57,634:conn0 sending b'xyz^123'
INFO:2017-09-20 06:31:57,634:conn2 received b'234'
INFO:2017-09-20 06:31:57,634:conn0 received b'234'
INFO:2017-09-20 06:31:58,635:conn1 sending b'25$^ab0000$abab'
INFO:2017-09-20 06:31:58,635:conn2 sending b'25$^ab0000$abab'
INFO:2017-09-20 06:31:58,636:conn1 received b'36bc1111'
INFO:2017-09-20 06:31:58,636:conn2 received b'36bc1111'
INFO:2017-09-20 06:31:58,637:conn0 sending b'25$^ab0000$abab'
INFO:2017-09-20 06:31:58,637:conn0 received b'36bc1111'
INFO:2017-09-20 06:31:58,836:conn2 disconnecting
INFO:2017-09-20 06:31:58,836:conn1 disconnecting
INFO:2017-09-20 06:31:58,837:conn0 disconnecting

Indeed, all clients connected at the same time, and their communication with the server occurs concurrently.

Challenges with one thread per client

Even though threads are fairly efficient in terms of resource usage on modern OSes, the approach outlined in the previous section can still present challenges with some workloads.

Imagine a scenario where many clients are connecting simultaneously, and some of the sessions are long-lived. This means that many threads may be active at the same time in the server. Too many threads can consume a large amount of memory and CPU time just for the context switching [1]. An alternative way to look at it is as a security problem: this design makes it the server an easy target for a DoS attack - connect a few 100,000s of clients at the same time and let them all sit idle - this will likely kill the server due to excessive resource usage.

A larger problem occurs when there's a non-trivial amount of CPU-bound computation the server has to do for each client. In this case, swamping the server is considerably easier - just a few dozen clients can bring a server to its knees.

For these reasons, it's prudent the do some rate-limiting on the number of concurrent clients handled by a multi-threaded server. There's a number of ways to do this. The simplest that comes to mind is simply count the number of clients currently connected and restrict that number to some quantity (that was determined by careful benchmarking, hopefully). A variation on this approach that's very popular in concurrent application design is using a thread pool.

Thread pools

The idea of a thread pool is simple, yet powerful. The server creates a number of working threads that all expect to get tasks from some queue. This is the "pool". Then, each client connection is dispatched as a task to the pool. As long as there's an idle thread in the pool, it's handed the task. If all the threads in the pool are currently busy, the server blocks until the pool accepts the task (which happens after one of the busy threads finished processing its current task and went back to an idle state).

Here's a diagram showing a pool of 4 threads, each processing a task. Tasks (client connections in our case) are waiting until one of the threads in the pool is ready to accept new tasks.

Thread pool diagram

It should be fairly obvious that the thread pool approach provides a rate-limiting mechanism in its very definition. We can decide ahead of time how many threads we want our server to have. Then, this is the maximal number of clients processed concurrently - the rest are waiting until one of the threads becomes free. If we have 8 threads in the pool, 8 is the maximal number of concurrent clients the server handles - even if thousands are attempting to connect simultaneously.

How do we decide how many threads should be in the pool? By a careful analysis of the problem domain, benchmarking, experimentation and also by the HW we have. If we have a single-core cloud instance that's one answer, if we have a 100-core dual socket server available, the answer is different. Picking the thread pool size can also be done dynamically at runtime based on load - I'll touch upon this topic in future posts in this series.

Servers that use thread pools manifest graceful degradation in the face of high load - clients are accepted at some steady rate, potentially slower than their rate of arrival for some periods of time; that said, no matter how many clients are trying to connect simultaneously, the server will remain responsive and will just churn through the backlog of clients to its best ability. Contrast this with the one-thread-per-client server which can merrily accept a large number of clients until it gets overloaded, at which point it's likely to either crash or start working very slowly for all processed clients due to resource exhaustion (such as virtual memory thrashing).

Using a thread pool for our network server

For this variation of the server I've switched to Python, which comes with a robust implementation of a thread pool in the standard library (ThreadPoolExecutor from the concurrent.futures module) [2].

This server creates a thread pool, then loops to accept new clients on the main listening socket. Each connected client is dispatched into the pool with submit:

pool = ThreadPoolExecutor(args.n)
sockobj = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sockobj.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sockobj.bind(('localhost', args.port))
sockobj.listen(15)

try:
    while True:
        client_socket, client_address = sockobj.accept()
        pool.submit(serve_connection, client_socket, client_address)
except KeyboardInterrupt as e:
    print(e)
    sockobj.close()

The serve_connection function is very similar to its C counterpart, serving a single client until the client disconnects, while following our protocol:

ProcessingState = Enum('ProcessingState', 'WAIT_FOR_MSG IN_MSG')


def serve_connection(sockobj, client_address):
    print('{0} connected'.format(client_address))
    sockobj.sendall(b'*')
    state = ProcessingState.WAIT_FOR_MSG

    while True:
        try:
            buf = sockobj.recv(1024)
            if not buf:
                break
        except IOError as e:
            break
        for b in buf:
            if state == ProcessingState.WAIT_FOR_MSG:
                if b == ord(b'^'):
                    state = ProcessingState.IN_MSG
            elif state == ProcessingState.IN_MSG:
                if b == ord(b'$'):
                    state = ProcessingState.WAIT_FOR_MSG
                else:
                    sockobj.send(bytes([b + 1]))
            else:
                assert False

    print('{0} done'.format(client_address))
    sys.stdout.flush()
    sockobj.close()

Let's see how the thread pool size affects the blocking behavior for multiple concurrent clients. For demonstration purposes, I'll run the threadpool server with a pool size of 2 (only two threads are created to service clients):

$ python3.6 threadpool-server.py -n 2

And in a separate terminal, let's run the client simulator again, with 3 concurrent clients:

$ python3.6 simple-client.py  -n 3 localhost 9090
INFO:2017-09-22 05:58:52,815:conn1 connected...
INFO:2017-09-22 05:58:52,827:conn0 connected...
INFO:2017-09-22 05:58:52,828:conn1 sending b'^abc$de^abte$f'
INFO:2017-09-22 05:58:52,828:conn0 sending b'^abc$de^abte$f'
INFO:2017-09-22 05:58:52,828:conn1 received b'b'
INFO:2017-09-22 05:58:52,828:conn0 received b'b'
INFO:2017-09-22 05:58:52,867:conn1 received b'cdbcuf'
INFO:2017-09-22 05:58:52,867:conn0 received b'cdbcuf'
INFO:2017-09-22 05:58:53,829:conn1 sending b'xyz^123'
INFO:2017-09-22 05:58:53,829:conn0 sending b'xyz^123'
INFO:2017-09-22 05:58:53,830:conn1 received b'234'
INFO:2017-09-22 05:58:53,831:conn0 received b'2'
INFO:2017-09-22 05:58:53,831:conn0 received b'34'
INFO:2017-09-22 05:58:54,831:conn1 sending b'25$^ab0000$abab'
INFO:2017-09-22 05:58:54,832:conn1 received b'36bc1111'
INFO:2017-09-22 05:58:54,832:conn0 sending b'25$^ab0000$abab'
INFO:2017-09-22 05:58:54,833:conn0 received b'36bc1111'
INFO:2017-09-22 05:58:55,032:conn1 disconnecting
INFO:2017-09-22 05:58:55,032:conn2 connected...
INFO:2017-09-22 05:58:55,033:conn2 sending b'^abc$de^abte$f'
INFO:2017-09-22 05:58:55,033:conn0 disconnecting
INFO:2017-09-22 05:58:55,034:conn2 received b'b'
INFO:2017-09-22 05:58:55,071:conn2 received b'cdbcuf'
INFO:2017-09-22 05:58:56,036:conn2 sending b'xyz^123'
INFO:2017-09-22 05:58:56,036:conn2 received b'234'
INFO:2017-09-22 05:58:57,037:conn2 sending b'25$^ab0000$abab'
INFO:2017-09-22 05:58:57,038:conn2 received b'36bc1111'
INFO:2017-09-22 05:58:57,238:conn2 disconnecting

Recall the behavior of previously discussed servers:

  1. In the sequential server, all connections were serialized. One finished, and only then the next started.
  2. In the thread-per-client server earlier in this post, all connections wer accepted and serviced concurrently.

Here we see another possibility: two connections are serviced concurrently, and only when one of them is done the third is admitted. This is a direct result of the thread pool size set to 2. For a more realistic use case we'd set the thread pool size to much higher, depending on the machine and the exact protocol. This buffering behavior of thread pools is well understood - I've written about it more in detail just a few months ago in the context of Clojure's core.async module.

Summary and next steps

This post discusses multi-threading as a means of concurrency in network servers. The one-thread-per-client approach is presented for an initial discussion, but this method is not common in practice since it's a security hazard.

Thread pools are much more common, and most popular programming languages have solid implementations (for some, like Python, it's in the standard library). The thread pool server presented here doesn't suffer from the problems of one-thread-per-client.

However, threads are not the only way to handle multiple clients concurrently. In the next post we're going to look at some solutions using asynchronous, or event-driven programming.


[1]To be fair, modern Linux kernels can tolerate a significant number of concurrent threads - as long as these threads are mostly blocked on I/O, of course. Here's a sample program that launches a configurable number of threads that sleep in a loop, waking up every 50 ms. On my 4-core Linux machine I can easily launch 10000 threads; even though these threads sleep almost all the time, they still consume between one and two cores for the context switching. Also, they occupy 80 GB of virtual memory (8 MB is the default per-thread stack size for Linux). More realistic threads that actually use memory and not just sleep in a loop can therefore exhaust the physical memory of a machine fairly quickly.
[2]Implementing a thread pool from scratch is a fun exercise, but I'll leave it for another day. I've written about hand-rolled thread pools for specific tasks in the past. That's in Python; doing it in C would be more challenging, but shouldn't take more than a few of hours for an experienced programmer.