Clojure concurrency and blocking with core.async



This article is an attempt to dig into the performance problem of concurrent applications using core.async in situations where blocking operations are involved. "Blocking" operations happen when the running program has to wait for something happening outside it; a canonical example is issuing an HTTP request and waiting for the remote server to respond. Such operations are also sometimes called "synchronous".

The core.async library comes with many high-level features like transducers and pipelines; in this article I want to focus on the two fundamental mechanisms it provides for launching a new computation concurrently: threads and go-blocks.

New threads can be created with (thread ...). This call runs the body in a new thread and (immediately) returns a channel to which the result of the body will be posted. Similarly, a go-block is created with (go ...) - it also launches the computation concurrently, but instead of creating a new thread it posts the computation onto a thread pool of fixed size that the library maintains for all its go-blocks. Most of the article is focusing on exploring the differences between these two methods.

The go-block thread pool

In any given executing Clojure process, a single thread pool is dedicated to running all go-blocks. A quick glance at the Clojure source code shows that the size of this pool is 8, meaning that 8 physical threads are launched [1]. This number is hard-coded, though it can be modified by setting the clojure.core.async.pool-size property for the JVM running the program. So 8 is the default number of threads core.async has at its disposal to implement its ad-hoc cooperative multitasking.

Let's start with a cute little experiment to determine the size of the thread pool empirically; this exercise will also shed some light on the effect of blocking calls inside go-blocks:

(defn launch-n-go-blocks
  [n]
  (let [c (async/chan)]
    (dotimes [i n]
      (async/go
        (Thread/sleep 10)
        (async/>! c i)))
    (receive-n c n)))

This function launches n go-blocks, each sleeping for 10 milliseconds and then pushing a number into a shared channel. Then it waits to receive all numbers from the channel and returns; the effect is to block until all the go-blocks are done. receive-in is a simple function used throughout this article:

(defn receive-n
  "Receive n items from the given channel and return them as a vector."
  [c n]
  (loop [i 0
         res []]
    (if (= i n)
      res
      (recur (inc i) (conj res (async/<!! c))))))

Now let's call launch-n-go-blocks several times, with an increasing n and observe what happens:

Launching  1 -> "Elapsed time: 11.403985 msecs"
Launching  2 -> "Elapsed time: 11.050685 msecs"
Launching  3 -> "Elapsed time: 10.37412 msecs"
Launching  4 -> "Elapsed time: 10.342037 msecs"
Launching  5 -> "Elapsed time: 10.359517 msecs"
Launching  6 -> "Elapsed time: 10.409539 msecs"
Launching  7 -> "Elapsed time: 10.543612 msecs"
Launching  8 -> "Elapsed time: 10.429726 msecs"
Launching  9 -> "Elapsed time: 20.480441 msecs"
Launching 10 -> "Elapsed time: 20.442724 msecs"
Launching 11 -> "Elapsed time: 21.115002 msecs"
Launching 12 -> "Elapsed time: 21.192993 msecs"
Launching 13 -> "Elapsed time: 21.113135 msecs"
Launching 14 -> "Elapsed time: 21.376159 msecs"
Launching 15 -> "Elapsed time: 20.754207 msecs"
Launching 16 -> "Elapsed time: 20.654873 msecs"
Launching 17 -> "Elapsed time: 31.084513 msecs"
Launching 18 -> "Elapsed time: 31.152651 msecs"

Ignoring the minor fluctuations in measurements, there's a very clear pattern here; let's plot it:

Runtime of launching go-blocks with sleeps

The reason for this behavior is the blocking nature of Thread/sleep. This function blocks the current thread for the specified duration (10 ms in our case); so the go-block executing it will block the thread it's currently running on. This thread is then effectively out of the pool until the sleep finishes. The plot immediately suggests the pool size is 8; as long as 8 or fewer go-blocks are launched, they all finish within ~10 ms because they all run concurrently. As soon as we go above 8, the runtime jumps to ~20 ms because one of the go-blocks will have to wait until there's a free thread in the pool.

Let's try the same experiment using thread instead of go:

(defn launch-n-threads
  [n]
  (let [c (async/chan)]
    (dotimes [i n]
      (async/thread
        (Thread/sleep 10)
        (async/>!! c i)))
    (receive-n c n)))

Here, each time through the loop a new thread is launched, regardless of the number of threads already executing [2]. All these threads can run concurrently, so the runtime plot is:

Runtime of launching threads with sleeps

The Clojure documentation and talks / presentations by developers are careful to warn against running blocking operations in go-blocks [3]; it's also not hard to understand why this is so by thinking a bit about the fixed thread-pool based implementation. That said, it's still useful to actually see this in action using an easy-to-understand experiment. In the next section we'll explore the real-life performance implications of blocking inside go-blocks.

Blocking I/O

The sleeping example shown earlier is artificial, but the perils of blocking inside go-blocks are real. Blocking happens quite often in realistic programs, most often in the context of I/O. I/O devices tend to be significantly slower than the CPU executing our program, especially if by "I/O device" we mean a web server located half-way across the world to which we issue an HTTP request.

So the next example is going to be a simple concurrent HTTP client; again, two versions are studied and compared - one with go-blocks, another with threads. For this sample, we'll be using the clj-http library [4], which provides a simple API to issue blocking HTTP requests. The full code is available on Github.

(def url-template "https://github.com/eliben/pycparser/pull/%d")

(defn blocking-get-page [i]
  (clj-http.client/get (format url-template i)))

(defn go-blocking-generator
  [c start n]
  (doseq [i (range start (+ start n))]
    (async/go (async/>! c (blocking-get-page i)))))

When go-blocking-generator is called, it launches n go-blocks, each requesting a different page from pycparser's pull requests on Github. Fetching one page takes between 760 and 990 ms on my machine, depending on the exact page. When run with n=20, this version takes about 2300 ms. Now let's do the same with threads:

(defn thread-blocking-generator
  [c start n]
  (doseq [i (range start (+ start n))]
    (async/thread (async/>!! c (blocking-get-page i)))))

With n=20, this version takes only 1000 ms. As expected, all threads manage to run at the same time, which is mostly spent waiting on the remote server. In the go-blocks version, only 8 blocks run concurrently because of the thread pool size; this example should really drive home the notion of just how bad blocking I/O in go-blocks is. Most of the blocks sit there waiting for the thread pool to have a vacant spot, when all they have to do is just issue a HTTP request and wait anyway.

Parallelizing CPU-bound tasks

We've seen how go-blocks interact with blocking operations; now let's examine CPU-bound tasks, which spend their time doing actual computations on the CPU rather than waiting for I/O. In an older post, I explored the effects of using threads and processes in Python to parallelize a simple numeric problem. Here I'll be using a similar example: naïvely factorizing a large integer.

Here's the function that factorizes a number into a vector of factors:

(defn factorize
  "Naive factorization function; takes an integer n and returns a vector of
  factors."
  [n]
  (if (< n 2)
    []
    (loop [factors []
           n n
           p 2]
      (cond (= n 1) factors
            (= 0 (mod n p)) (recur (conj factors p) (quot n p) p)
            (>= (* p p) n) (conj factors n)
            (> p 2) (recur factors n (+ p 2))
            :else (recur factors n (+ p 1))))))

It takes around 2.3 ms to factorize the number 29 * 982451653; I'll refer to it as mynum from now on [5]. Let's examine a few strategies of factorizing a large set of numbers in parallel. We'll start with a simple "serial" factorizer, which should also introduce the API:

(defn serial-factorizer
  "Simple serial factorizer."
  [nums]
  (zipmap nums (map factorize nums)))

Each factorizer function in this sample takes a sequence of numbers and returns a new map, which maps a number to its vector of factors. If we run serial-factorizer on a sequence of 1000 mynums, it takes ~2.3 seconds; no surprises here!

Now, a parallel factorizer using go-blocks:

(defn async-go-factorizer
  "Parallel factorizer for nums, launching n go blocks."
  [nums n]
  ;;; Push nums into an input channel; spin up n go-blocks to read from this
  ;;; channel and add numbers to an output channel.
  (let [in-c (async/chan)
        out-c (async/chan)]
    (async/onto-chan in-c nums)
    (dotimes [i n]
      (async/go-loop []
        (when-let [nextnum (async/<! in-c)]
          (async/>! out-c {nextnum (factorize nextnum)})
          (recur))))
    (receive-n-maps out-c (count nums))))

In a pattern that should be familiar by now, this function creates a couple of local channels and spins up a number of go-blocks to read and write from these channels; the code should be self-explanatory. receive-n-maps is similar to the receive-n function we've seen earlier in the article, just with maps instead of vectors.

Knowing that my machine has 8 CPU threads (4 cores, hyper-threaded), I benchmarked async-go-factorizer with n=8, and it took around 680 ms, a 3.4x speedup over the serial version [6].

Let's try the same with threads instead of go-blocks:

(defn async-thread-factorizer
  "Same as async-go-factorizer, but with thread instead of go."
  [nums n]
  (let [in-c (async/chan)
        out-c (async/chan)]
    (async/onto-chan in-c nums)
    (dotimes [i n]
      (async/thread
        (loop []
          (when-let [nextnum (async/<!! in-c)]
            (async/>!! out-c {nextnum (factorize nextnum)})
            (recur)))))
    (receive-n-maps out-c (count nums))))

The performance is pretty much the same - 680 ms for 1000 numbers with parallelism of n=8.

This is an important point! On purely CPU-bound workloads, go-blocks are no worse than threads because all the physical cores are kept busy doing useful work at all time. There's no waiting involved, so there's no opportunity to steal an idle core for a different thread. One minor gotcha is to be wary of the go-block thread pool size; if you run your program on a dual socket machine with dozens of cores, you may want to bump that number up and use a wider parallelism setting.

For completeness (and fun!) let's try a couple more methods of parallelizing this computation. The pattern in these parallel factorizers is so common that core.async has a function for it - pipeline; here's how we use it:

(defn async-with-pipeline
  "Parallel factorizer using async/pipeline."
  [nums n]
  (let [in-c (async/chan)
        out-c (async/chan)]
    (async/onto-chan in-c nums)
    (async/pipeline n out-c (map #(hash-map % (factorize %))) in-c)
    (receive-n-maps out-c (count nums))))

async/pipeline takes an input channel, output channel and a transducer, as well as the parallelism. It takes care of spinning go-blocks and connecting all the channels properly [7]. This takes about the same amount of time as the other versions shown earlier, which isn't surprising.

Finally, let's try something slightly different and use Clojure's parallel fold from the clojure.core.reducers library (both fold and transducers are described in my earlier article - check it out!)

(defn conjmap
  ([xs x] (conj xs x))
  ([] {}))

(defn rfold
  "Parallel factorizer using r/fold."
  [nums]
  (r/fold conjmap (r/map #(hash-map % (factorize %)) nums)))

Here we don't have to set the parallelism; r/fold determines it on its own. This approach takes 1.15 seconds on 1000 numbers, quite a bit slower than the earlier attempts. It's entirely possible that the fork-join approach used by r/fold is less efficient than the manual chunking to different threads done by the other versions.

The conclusion from this section, however, should be that for purely CPU-bound tasks it doesn't matter much whether go-blocks or explicit threads are used - the performance should be more-or-less the same. That said, realistic programs don't often spend time purely in CPU-bound tasks; the reality is usually somewhere in between - some tasks do computations, other tasks wait on things. Let's see a benchmark that combines the two.

Combining blocking and CPU-bound tasks

This section shows an artificial benchmark that explores how a combination of blocking and CPU-bound tasks behaves when launched on go-blocs vs. threads. The CPU bound task will be the same factorization but this time with a larger number that was carefully tuned to take about 230 ms to factorize on my machine. The blocking "task" will be (Thread/sleep 250). I deliberately choose the same duration for the two kinds of tasks here to make comparisons easier, but the principle applies more generally.

Here is the go-block version of the benchmark:

(defn launch-go-blocking-and-compute
  [nblock ncompute]
  (let [c (async/chan)]
    (dotimes [i nblock]
      (async/go
        (Thread/sleep 250)
        (async/>! c i)))
    (dotimes [i ncompute]
      (async/go
        (async/>! c (factorize mynum))))
    (receive-n c (+ nblock ncompute))))

nblock is the number of blocking tasks to launch; ncompute is the number of CPU-bound tasks to launch. The rest of the code is straightforward. You can guess what the threading version looks like by now - check out the full code sample if not.

The parameter space here is pretty large; let's try 32 blocking and 16 compute tasks in parallel:

nblock=32, ncompute=16

launch-go-blocking-and-compute: 1521 ms
launch-thread-blocking-and-compute: 530 ms

The larger we set nblock, the worse the situation becomes for the go-block version:

nblock=64, ncompute=16

launch-go-blocking-and-compute: 3200 ms
launch-thread-blocking-and-compute: 530 ms

Up to some limit, the threading version is only limited by ncompute, since these actually occupy the CPU cores; all the blocking tasks are run in the background and can complete at the same time (after the initial 250 ms).

The go-block version fares much worse, because the blocking tasks can occupy threads while the compute tasks just wait in a queue. Depending on the exact mixture of blocking and compute-bound tasks, this can range from more-or-less the same to exteremely bad for the go-blocks version. YMMV!

Managing callback-hell with go-blocks

We've seen the issues that come up when mixing blocking I/o with go-blocks. The reason for this is the cooperative concurrency approach implemented by go-blocks on top of a fixed thread pool. For cooperative concurrency to work well with I/O, the language should either make the scheduler aware of the I/O calls (to be able to switch to another context while blocking) or the I/O should be non-blocking. The former requires runtime support in the language, like Go; the latter is what programming environments like Python (with asyncio) and Node.js (with its fully non-blocking standard library) do. The same applies to Clojure, where core.async is just a library without actual runtime support.

The good news is that non-blocking I/O libraries are very popular these days, and Clojure has a good number of them for all the common tasks you can think of. Another good news is that core.async's channels make it very easy to deal with non-blocking I/O without sliding into callback hell.

Here's a code sample that uses the asynchronous mode of clj-http to repeat the concurrent HTTP request benchmark:

(defn go-async-generator
  [c start n]
  (doseq [i (range start (+ start n))]
    (clj-http.client/get
      (format url-template i)
      {:async? true}
      (fn [response]
        (async/go (async/>! c response)))
      ;; Exception callback.
      (fn [exc]
        (throw exc)))))

When passed the {:async? true} option, clj-http.client/get does a non-blocking request with a callback for the response (and another callback for an error). Our "response callback" simply spins a go-block that places the response into a channel. Now another go-block (or thread) can wait on the channel to perform the next step; compare that to cascading callbacks!

The performance is good too - when run with multiple requests in parallel, this version runs as fast as the thread-launching example from earlier in the article (the full code is here). All the get requests are launched one after another, with no blocking. When the results arrive, go-blocks patiently "park" while sending them into a channel, but this is an explicit context-switch operation, so all of them peacefully run concurrently on the underlying thread pool.

Conclusion

Would launching a thread inside the callback work as well in the last example? Yes, I think it would. So why use go-blocks?

The reason is scalability. Launching threads is fine as long as you don't have too many, and as long as the latency of the launch is not too important. Threads are OS constructs and have fairly heavy resource requirements - in terms of memory consumption and context-switching time. go-blocks are extremely lightweight in comparison.

Therefore, if you want to serve 1000s of connections concurrently from a single machine - go-blocks are the way to go, combined with non-blocking APIs. Note that go-blocks use a thread pool that can use multipe cores, so this isn't just a single-core concurrent multitasking solution (such as you may encounter in Node.js or Python's asyncio).

If the number of concurrent tasks is not too large or blocking I/O is involved, I'd recommend using async/thread. It avoids the pitfalls of blocking I/O, and in other cases performance is the same. core.async's wonderful tools like channels and alts!! are still available, making concurrent programming much more pleasant.

However, note that Clojure is a multi-environment language, and in some environments (most notably ClojureScript), threads are simply unavailable. In these cases using go-blocks is your only chance at any kind of reasonable concurrency (the alternative being callbacks).

Another use case for go-blocks is to implement coroutines which can be useful in some cases - such as agents in games, as a replacement for complex state machines, etc. But here again, beware of the actual scale. If it's possible to use threads, just use threads. go-blocks are trickier to use correctly and one has to be always aware of what may block, lest performance is dramatically degraded.

If there's something I'm missing, please let me know!


[1]This is for the standard JVM implementation of Clojure; in ClojureScript there would just be a single thread, since JS doesn't support in-browser threads (yet).
[2]The JVM also has some limit on the number of threads it runs at the same time, but it's fairly high so we'll ignore it here.
[3]

W.r.t. transgressions committed in the code sample show here, Thread/sleep is a big no-no inside go-blocks. By now I hope that it's obvious why. timeout is the right function for "waiting" inside go-blocks, since it "parks" the go-block rather than blocking it. Parking is go-block friendly since it actually frees up the thread the go-block is running on. Similarly, >! is the right channel sending function to use inside go-blocks; >!! blocks the whole thread.

This is also a good place to mention that similar thread pool size "artifacts" can be found in Node.js, which uses libuv to handle events. libuv uses its own thread pool to execute blocking calls, thus giving the calling application a sense of concurrency (up to some thread pool size).

[4]This sample is inspired by Martin Trojer's blog post, which is the best introduction to the issues with blocking I/O in go-blocks I found before starting this article.
[5]Why this particular number? The second factor is a large-ish prime, so it will make the factorizer sweat a bit (by iterating over all the odd numbers up to its square root); the multiplication by another (prime) factor ensures more of the paths in the factorize function are exercised.
[6]Benchmarking multi-core CPU performance with modern CPUs is notoriously tricky; CPUs regulate their frequency based on load, so it's entirely possible that a single core runs faster than each one core in a group of 4; also, hyper-threading reuses some CPU resources within each core so its speedup is rarely linear.
[7]Note that pipeline spins up go-blocks by default, so the cautions explored in this article apply. There's also pipeline-blocking if you need blocking operations. Looking at the implementation if pipeline is actually pretty illuminating, and should be easy to understand given what we discuss here.

Reducers, transducers and core.async in Clojure



This is a whirlwind tour of some of the cool new features that appeared in Clojure in the past few years. I find it fascinating how one good idea (reducers) morphed into another (transducers), and ended up mating with yet another, apparently unrelated concept (concurrent pipelines) to produce some really powerful coding abstractions.

The article is not for beginners; otherwise it'd take a small book to cover all this material. Some experience with Clojure or a similar functional language is required.

Sequences, iterators and laziness

Let's start with the basics. Suppose we want to iterate over a sequence of items, performing some combination of transforming and filtering on the items in it. Here's an example:

=> (def s (range 0 10))
#'simplerepl.core/s
=> (reduce + (map inc (filter even? s)))
25

It's very simple, but our imagination and experience tell us that it is representative of many real data processing pipelines. Mapping, filtering and reducing is the bread and butter of functional programming, after all. However, our eternal concern with efficiency should sound the sirens here; it seems like we take a sequence and pull it through several operations that need the whole sequence to operate. Are there copies involved? The way the Clojure code is written above, the answer is no - because of laziness.

range returns a lazy sequence - a sequence whose members are not really materialized until we try to use them. Further, Clojure's functional primitives like map and filter respect laziness by not materializing more than they need for any given step (sans a bit of chunking/buffering for efficiency).

In other words, the code above does not create intermediate sequences between steps. It creates lazy sequences, which are just thunks holding a function to materialize the next item.

Python has iterators for the same purpose. When writing a function to generate a sequence of items in modern Python, returning a list is discouraged because this may incur unnecessary copying. Rather, such functions ought to return iterators (which is very easy using yield). Moreover, functions consuming sequences should be careful about not materializing the whole sequence but rather using elementwise iteration (which is the default in for...in loops). Python 3 made this style prevalent by switching range to return an iterable; same for map. In Python 2 both range and map return fully materialized lists.

In an imperative programming style, we'd probably have a loop to perform the operation shown above:

def process(s):
    result = 0
    for i in s:
        if i % 2 == 0:
            result += i + 1
    return result

It's easy to reason about the efficiency of this code, but it's also structured in a less modular way. A composition of maps, filters and reductions could be easier to reason about because it maps well to the actual business logic of our application. The imperative implementation does not compose as well.

Reducers

Pipe reducers

Alright, so laziness ensures that Clojure code as shown above is not as inefficient as we may have feared. But still, is it as efficient as an imperative implementation? It turns out the answer is no; while laziness avoids large copying costs, it does incur a constant overhead for boxing and unboxing thunks representing the rest of the sequence; it would be nice to avoid these costs as well. This brings us to reducers.

Expressing data transformation with reducers starts with the observation that reduce is a fundamental building block that can express other transformations fairly easily. Here's how we can perform the job of a map with reduce:

=> (reduce (fn [acc item] (conj acc (inc item))) [] [1 2 3 4 5])
[2 3 4 5 6]

filter is only slightly trickier:

=> (reduce (fn [acc item] (if (even? item)
#_=>                          (conj acc item)
#_=>                          acc))
#_=>         [] [1 2 3 4 5])
[2 4]

This is cool, but note something slightly disturbing. This code will work fine only on collections for which conj works well; it will blow up on lists. Also, we've just spent time talking about not building temporary collections in between transformations, but this exactly what this code is doing with that conj.

The clojure.core.reducers library solves both these problems by conceptually inverting the process of applying multiple transformations to a sequence inside-out. Reducers are now integrated into Clojure core, but it's worth spending a few minutes getting an intuitive feel for how they work by implementing a simplistic variant. For the full scoop, read Rich Hickey's article. Here's an abridged version.

We start by defining a "reducing function". A reducing function is what reduce takes - a function accepting an accumulator and a new item, and returning the new accumulator value. In classical reductions this function can just be +; in the sense of implementing-map-with-a-reduce, it can be as shown above. In pseudo-type-notation, it's something like [1]:

reducingf :: acc -> item -> acc

Next, let's define the concept of "transforming" a reducing function. This is simply a function that takes a reducing function and returns another reducing function:

transformingf :: (acc -> item -> acc) -> (acc -> item -> acc)

The main insight is that we can express all kinds of data transformations simply by transforming a reducing function. In the end we'll end up with a single reducing function that can be passed to reduce.

Here's another take at representing map with reduce, this time using the generalized approach described above:

(defn mapping-transform
  [mapf]
  (fn [reducingf]
    (fn [acc item]
      (reducingf acc (mapf item)))))

It may look scary, but it's just an code embodiment of the textual description above. A call to mapping-transform will create a function that transforms a reducing function into another reducing function. The actual parameter passed to mapping-transform is used to modify every item in the collection before that item is passed to the original reducing function. Here's how we can use this to compute a sum of squares for a given vector:

reducers.core=> (reduce ((mapping-transform #(* % %)) +) 0 [1 2 3 4 5 6])
91

There's nothing more magical than some higher-order function munging going on here. (mapping-transform #(* % %)) returns a function that takes a reducing function as argument, and returns another reducing function. Since the actual reduction we want to perform is +, this is what we pass in [2]. The returned reducing function is then given to reduce. Take a couple of minutes to convince yourself how this works. It may help tracing the + in the REPL:

reducers.core=> (trace-vars +)
#'clojure.core/+
reducers.core=> (reduce ((mapping-transform #(* % %)) +) 0 [1 2 3 4 5 6])
TRACE t8399: (clojure.core/+ 1 4)
TRACE t8399: => 5
TRACE t8400: (clojure.core/+ 5 9)
TRACE t8400: => 14
TRACE t8401: (clojure.core/+ 14 16)
TRACE t8401: => 30
TRACE t8402: (clojure.core/+ 30 25)
TRACE t8402: => 55
TRACE t8403: (clojure.core/+ 55 36)
TRACE t8403: => 91
91

Question: how many times do we process each item in the input vector? Note that we do two data transformation operations:

  1. Map the square function #(* % %) over each item.
  2. Sum all squared items together.

However, this code only walks over the input sequence once. What happens here is that, instead of generating a new lazy thunk after each step, we combine all steps into a single traversal. This combination is achieved via a composition of functions orchestrated by mapping-transform. We can take this approach further, and define a filtering-transform:

(defn filtering-transform
  [predicate]
  (fn [reducingf]
    (fn [acc item]
      (if (predicate item)
        (reducingf acc item)
        acc))))

Here's how we sum up the all the even numbers in a vector:

reducers.core=> (reduce ((filtering-transform even?) +) 0 [1 2 3 4 5 6])
12

Now it's time to go back to the first example we started the article with. Let's take all the even numbers in a sequence, increment them and sum them up, using the higher-order reducing transforms. Here goes:

reducers.core=> (reduce ((filtering-transform even?)
           #_=>            ((mapping-transform inc) +)) 0 (range 0 10))
25

Remember the mention of inverting the transformation process inside-out? The order of transformations is inverted from the usual Clojure function application order. We first filter the evens, then increment, then add them all up. The "standard" Clojure code is:

(reduce + (map inc (filter even? s)))

But we've flipped it with even? on the outside and + on the inside (with inc still in-between), due to the way our mapping-transform and filtering-transform are defined. The order doesn't really matter, and the actual Clojure reducers library lets us write it in the more expected order, as we will soon see [3].

What really matters is that this approach only walks the input sequence once, without any temporary sequences or lazy thunks in the interim. It really is similar to the imperative-style loop [4].

Is it faster in practice?

Let's leave this explanatory framework behind and just use clojure.core.reducers that exports its own versions of map and filter, which are designed to be passed to reduce. These functions do essentially the same thing as the trick explained above, but with a bit more sophistication so that invoking them looks exactly like invoking the built-in map and filter.

To use them in the REPL, we'll first require the module:

reducers.core=> (require '[clojure.core.reducers :as r])
nil

Now we can distinguish between the built-in map and the transforming r/map. Here is a benchmark that compares this new way of transforming collections [5]:

reducers.core=> (def s (range 0 9999999))
#'reducers.core/s
reducers.core=> (time (reduce + 0 (map inc (filter even? s))))
"Elapsed time: 599.985027 msecs"
25000000000000
reducers.core=> (time (reduce + 0 (r/map inc (r/filter even? s))))
"Elapsed time: 432.453733 msecs"
25000000000000

The reducers version is quite a bit faster. As an excercise, try to make the transforming chain longer by adding several more mapping and filtering steps. How does it affect the performance difference?

Great, so we can speed up our data processing with this one weird trick. However, if you have some experience in working on compiler backends, you may be feeling underwhelmed. Isn't this just loop fusion? Indeed, it is. Our original code had multiple loops going over the same data; but we could just fuse all the operations done on every element into a single loop. Indeed, this is what the imperative code in the beginning of the article does.

I suspect that Clojure is too dynamic and there are too many layers of abstraction (such as laziness, sequences, etc) to expect a perfect automatic loop fusion from the JVM here. This is why we resort to fusing the loops manually; well, not really manually - we actually use some higher-order function goodness to accomplish this for us. If you squint hard at the mapping-transform function above, you may notice that it fuses the mapping function into the reducing function.

All of this makes me wonder - what if we really fuse the loops manually, can we go even faster? It turns out yes, but only for some types of sequences. Let's start by changing the s in the benchmark to a materialized vector (range actually produces a lazy sequence):

reducers.core=> (def sv (vec s))
#'reducers.core/sv

And now let's re-run the benchmark of regular filter-map-reduce with the reducers-infused filter-map-reduce:

reducers.core=> (time (reduce + 0 (map inc (filter even? sv))))
"Elapsed time: 555.125033 msecs"
25000000000000
reducers.core=> (time (reduce + 0 (r/map inc (r/filter even? sv))))
"Elapsed time: 371.145887 msecs"
25000000000000

But now let's add another contender - a manually fused reducing function that combines the addition, filtering and increment mapping:

reducers.core=> (time (reduce (fn [acc item] (if (even? item)
                                               (+ acc (inc item))
                                               acc))
                               0 sv))
"Elapsed time: 324.793784 msecs"
25000000000000

Hah, even faster! Of course, writing such pipelines manually is not always practical, isn't composable and quickly gets unreadable, so in most cases using reducers is preferred. But that's not all. Reducers have another trick in the sleeve - effortless parallelism.

Folding in parallel

Let's begin this section right with the punchline:

reducers.core=> (time (r/fold + (r/map inc (r/filter even? sv))))
"Elapsed time: 145.529636 msecs"
25000000000000

Whoa, what happened? Given an appropriate collection (such as a vector) that is foldable, and an appropriate reducing function that is associative, we can actually perform a reduction in parallel on multiple cores. The trick is breaking the input sequence into chunks, reducing each chunk and then reducing the results of the chunks. For foldable collections and associative reducing functions this is mathematically equivalent to the original task; and the big win is that we can reduce separate chunk in parallel, on different CPU cores. Which is exactly what r/fold does for us, automatically. Even though the operation done on each item is trivial, r/fold generates a 2x speedup on an 8-core CPU. For longer operations, the speedup could be even better.

How awesome is that? We barely changed the code and get a considerably better performance, leveraging parallelism. All of this is possible due to the abstraction created by the reducers library. Remember the part about decoupling the actual collection from the operations to reduce it? This comes useful here.

Transducers

Remember our discussion of transforming reducing functions above? A transforming function (such as mapping-transform) has the pseudo-type:

transformingf :: (acc -> item -> acc) -> (acc -> item -> acc)

It takes a reducing function and returns another reducing function. We've just seen how this concept is used in the reducers library to decouple the transformation from the actual process of reduction.

Shortly after introducing reducers, the designers of Clojure had another insight. Such transforming functions are useful not just in the context of reductions. The Clojure core developers were designing utility functions for core.async to transform channels (more on this later), and found themselves rewriting a bunch of existing logic from existing mapping and transforming functions. The solution? Use the concept of transforming functions to define sequence transformations in a more abstract way, completely decoupling them from the underlying sequences.

So transforming functions got a new name - transducers, and got integrated more tightly into the language. Now many of the built-in sequence processing like map have an additional arity where no sequence is actually passed in. (map inc), for example, simply returns a transducer - a transforming function for other reducers. These transducers are composable, so another way to write the non-parallel reduction we've been using in the benchmarks is:

(reduce ((comp (filter even?) (map inc)) +) 0 sv)

Note that the order here is similar to the one we had to apply with our own filtering-transform and mapping-transform, to convey that we first filter and then map (it's also similar to the Clojure -> macro).

Aimed with the insights of this article, we can actually look under the hood of Clojure's built-in map:

(defn map
  ([f]
    (fn [rf]
      (fn
        ([] (rf))
        ([result] (rf result))
        ([result input]
           (rf result (f input)))
        ([result input & inputs]
           (rf result (apply f input inputs))))))
([f coll]
;; Implementation of other, non-transducer arities.
;; ...

To be a true transducer, a Clojure function must have three arities. I'll skip the no-argument and single-argument versions (please read a more comprehensive reference on transducers for the full scoop). The 2-arity version is where the real meat is and, oh wait, it's exactly the same as our mapping-transform from this article!

So OK, now "functions that transform reducing functions" have a shorter name - transducers; but what has actually changed?

What's really changed is that the map transducer has a wider range of applicability than just mapping collections. Its implementation has no collection-specific code. Let's see how it can be used to transform communication channels.

Pipelines and transducers

Clojure has borrowed go-routines and channels from Go, and placed them in a standard libray module named core.async. Unfortunately, a thorough discussion of core.async is outside the scope of this article. I'll just say that core.async is awesome; that it was implemented as a library without changing the language is beyond awesome - it's a terrific example of the power of Lisps to grow the language towards the problem domain.

That said, I do want to mention core.async in the context of transducers, since the two mix in an interesting way.

Let's take the simple squaring pipeline from Go concurrency patterns and rewrite it in Clojure. The first version is a pretty-much verbatim transcription:

(defn gen-1
  [& nums]
  (let [c (async/chan)]
    (async/go
      (doseq [n nums]
        (async/>! c n))
      (async/close! c))
    c))

(defn sq-1
  [cin]
  (let [cout (async/chan)]
    (async/go-loop [n (async/<! cin)]
      (if n
        (do
          (async/>! cout (* n n))
          (recur (async/<! cin)))
        (async/close! cout)))
    cout))

(defn main-1
  []
  (let [c-gen (gen-1 2 3 4 5)
        c-sq (sq-1 c-gen)]
    (loop [n (async/<!! c-sq)]
      (when n
        (println n)
        (recur (async/<!! c-sq))))))

There are two pipeline stages; the first, gen-1 generates a sequence of numbers into a channel. The second, sq-1 takes a channel of inputs, transforms them (by squaring each number) and puts the results in another channel. main-1 connects the two pipeline stages together.

The second version makes more use of higher-level core.async tools:

(defn gen-2
  [& nums]
  (async/to-chan nums))

(defn sq-2
  [cin]
  (async/map< #(* % %) cin))

(defn main-2
  []
  (let [c-gen (gen-2 2 3 4 5)
        c-sq (sq-2 c-gen)]
    (loop [n (async/<!! c-sq)]
      (when n
        (println n)
        (recur (async/<!! c-sq))))))

In gen-2, to-chan places its input collection into a channel, and closes the channel; all of this in a separate go-block, of course. sq-2 uses the map< function to create a mapping channel which takes items from its input channel, maps some function on them and returns a channel of results.

If you read the documentation of core.async, you'll notice that map< is now deprecated and you're advised to "use transducers instead". Let's see how to do that, in this third version:

(defn main-3
  []
  (let [c-sq (async/chan 1 (map #(* % %)))]
    (async/onto-chan c-sq [2 3 4 5])
    (loop [n (async/<!! c-sq)]
      (when n
        (println n)
        (recur (async/<!! c-sq))))))

Here we create a single channel named c-sq, with a mapping transducer. This means that every item going through the channel gets transformed with the given mapping function before being read out of the channel. We don't need a separate channel, and core.async doesn't need a separate mapping helper.

In fact, with the introduction of transducers core.async deprecated a whole bunch of functions. map<, filter<, unique, partition and so on. While implementing these for channels, the Clojure core devs had the epiphany that they're basically reimplementing all sequence processing functions in a different context [6]. Transducers is an elegant solution to abstract away the concept of transformation from the underlying context (be it collections or channels, or...)

It's easy to compose transducers on channels. Here's a silly example:

(defn square [x] (* x x))

(def xform
  (comp
    (filter even?)
    (filter #(< % 10))
    (map square)
    (map inc)))

(def c (async/chan 1 xform))

(async/go
  (async/onto-chan c [5 6 8 12 15]))
(loop [n (async/<!! c)]
  (when n
    (println n)
    (recur (async/<!! c))))

Here the transducer is more complex, applying several filtering and mapping steps on the items going through the channel. That said, as we've seen earlier in the article this is actually pretty efficient, with no unnecessary copying involved.

Closing thoughts - on the generality of reducing functions

It's interesting to ponder how reducing functions, from the humble beginning of the workhorse of reduce, became the most important building block of the concepts discussed in this article. Recall that a reducing function has the type:

reducingf :: acc -> item -> acc

I think the key insight is that taking and returning the new combined value permits a reducing function to implement things like filtering, because we can convey the concept of including or not including the current item in the combined result. Clojure's reductions also support the reduced call for early termination, and keeping state between reductions can help implement more involved processing steps. For a fairly complete example of all these tools, take a look at the implementation of the transducer variant of take in the Clojure core library sources.

The code snippets shown throughout the article are available on Github.


[1]This is a Haskell-y type notation. x -> y -> z means a function taking a parameter of type x and a parameter of type y and returning a value of type z. (x -> y) -> z means we take one parameter of type "function taking x and returning y) and return a value of type z.
[2]Execrise: modify this code to produce a new vector of squares, instead of summing them up. Your solution will still use reduce.
[3]The actual reducers library cleverly uses Clojure protocols to make this process even more abstract and let sequences decide the best way to reduce them. It's worth checking out, though there's quite a bit of added complexity that obscures away the main point I want to make in this article.
[4]

We have to give up laziness though, since Clojure's reduce is inherently eager. It implements left-folding rather than right-folding, so it can't be run on infinite sequences.

This is usually not a very big deal; while infinite sequences are a nice abstraction in some cases, most of the real data processing tasks we have are, luckily, finite.

[5]Benchmarking in Clojure is tricky since the JVM needs some warmup for the heavy-handed JIT to kick-in, so it's worth rerunning such benchmarks several times and collecting the fastest runtime. The numbers shown here are representative of the results obtained on my machine.
[6]There's a small caveat here to be aware of, IMHO. Pipelines as demonstrated in the Go article linked here aren't only useful to decouple the different steps. They are also useful to actually parallelize them. Squaring is a toy example, but imagine this step in the pipeline was time consuming. Then the go-routines running the generating step (or some other pipeline step) could actually run in parallel with the squaring step. When we use transducers as shown here, this flexibility goes away.

Notes on debugging Clojure code



Clojure is a great programming language, but a recurring complaint one keeps hearing from developers hacking on Clojure code is that debugging can be unpleasant. First of all, I agree! Debugging Clojure code can be more daunting on average than, say, debugging Python code. This is mainly due to two reasons:

  1. Clojure's Java legacy. Clojure is compiled to Java bytecode, which has some terminology and idiosyncracies Clojure programmers aren't always familiar with. These terms tend to pop up in stack traces and cause confusion (e.g. IFN).
  2. Clojure - being a Lisp - has a certain code structure which is different from, say, a more common imperative coding style. Rather than being a sequence of statements, Clojure programs tend to involve long call chains of nested expressions. Where only part of an expression fails, it's often non-trivial to figure out why.

In this post I want to share some notes from my own experience debugging Clojure programs.

Dealing with Clojure's cryptic exceptions

The first problem with Clojure's runtime exceptions is that we usually don't get to see the full stack trace by default. Let's say we have this silly, nonsensical, function in a file called sample.clj:

(defn foo
  [n]
  (cond (> n 40) (+ n 20)
        (> n 20) (- (first n) 20)
        :else 0))

Then to try how it works, we load the file into the REPL and type the following [1]:

debugging.core=> (foo 24)
IllegalArgumentException Don't know how to create ISeq from: java.lang.Long
  clojure.lang.RT.seqFrom (RT.java:542)

Uh oh. There are two problems here. First, what does this error message mean? What's ISeq and what's java.lang.Long? Second, it's not clear where it is actually failing (thanks for that pointer to RT.java though, Clojure!) Let's address the second problem first. The magic incantation to show the stack trace of the last exception is calling the pst function:

debugging.core=> (pst)
IllegalArgumentException Don't know how to create ISeq from: java.lang.Long
  clojure.lang.RT.seqFrom (RT.java:542)
  clojure.lang.RT.seq (RT.java:523)
  clojure.lang.RT.first (RT.java:668)
  clojure.core/first--4339 (core.clj:55)
  clojure.core/first--4339 (core.clj:55)
  debugging.sample/foo (sample.clj:10)
  debugging.sample/foo (sample.clj:7)
  debugging.core/eval13715 (form-init6539101589609174055.clj:1)
  debugging.core/eval13715 (form-init6539101589609174055.clj:1)
  clojure.lang.Compiler.eval (Compiler.java:6927)
  clojure.lang.Compiler.eval (Compiler.java:6890)
  clojure.core/eval (core.clj:3105)

This is much better because at least some files in this trace are familiar. core.clj is not our core.clj, it's Clojure's core library. But sample.clj is our file, and we can infer that on line 10 we call clojure,core/first and something goes wrong. Line 10 happens to be:

(> n 20) (- (first n) 20)

So now things become more clear. The call (first n) must be bad, and bad in a way that tries to coerce clojure into creating an ISeq from a Long. In other words, we're passing a number into a function that expects a sequence, and this is, indeed, bad. Learning to map from Clojure values and types to the JVM's expectations will take time and grit - especially if you (like me) don't have much Java experience. I suggest doing a bit of reading on Clojure/Java interoperability, and about other Java-isms Clojure inherits; it ain't pretty, and you may not always want to use it, but being familiar with the terms can go a long way in deciphering cryptic stack traces.

For a more detailed treatment of this debugging issue I highly recommend Aphyr's article on debugging Clojure.

Finding which form an exception comes from

Let's invoke the foo function in a different way that demonstrates another issue with debugging Clojure:

debugging.core=> (foo nil)

NullPointerException   clojure.lang.Numbers.ops (Numbers.java:1013)

OK, we know what to do next:

debugging.core=> (pst)
NullPointerException
  clojure.lang.Numbers.ops (Numbers.java:1013)
  clojure.lang.Numbers.gt (Numbers.java:229)
  clojure.lang.Numbers.gt (Numbers.java:3864)
  debugging.sample/foo (sample.clj:9)
  debugging.sample/foo (sample.clj:7)
  debugging.core/eval14693 (form-init6539101589609174055.clj:1)
  debugging.core/eval14693 (form-init6539101589609174055.clj:1)
  clojure.lang.Compiler.eval (Compiler.java:6927)
  clojure.lang.Compiler.eval (Compiler.java:6890)
  clojure.core/eval (core.clj:3105)
  clojure.core/eval (core.clj:3101)
  clojure.main/repl/read-eval-print--7408/fn--7411 (main.clj:240)

So the exception comes from line 9, which is:

(cond (> n 40) (+ n 20)

This exception also tells us it comes from clojure.lang.Numbers.gt from which we can infer it's the > operator that is complaining. But imagine for a second that we had two forms with the same operator on that line:

(cond (> c 40) (* (+ n 20) (+ m 30)))

If we got a NullPointerException about an addition, we wouldn't know which one fails. Luckily, Clojure comes with a very useful module that helps debugging - tools.trace. In this particular case, we'd use the trace-forms macro which tells us which nested form (expression) is failing. We can modify our function to be:

(defn foo
  [n]
  (trace-forms (cond (> n 40) (+ n 20)
                     (> n 20) (- (first n) 20)
                     :else 0)))

And now when called with nil, we get:

debugging.core=> (foo nil)
NullPointerException : No message attached to throwable java.lang.NullPointerException
  Form failed: (> n 40)
  Form failed: (if
 (> n 40)
 (+ n 20)
 (clojure.core/cond (> n 20) (- (first n) 20) :else 0))
  Form failed: (cond (> n 40) (+ n 20) (> n 20) (- (first n) 20) :else 0)
  clojure.lang.Numbers.ops (Numbers.java:1013)

Neat, huh? trace-forms breaks the form it traces to all the nested forms and reports precisely which one failed - propagating this information upwards towards the top form [2]. trace-forms is very useful when errors manifest as exceptions.

Unfortunately, this isn't sufficient for all cases. Our foo wasn't designed to handle nils, and the bug here is in the place where the nil came from. This may be quite a bit removed - and not on the same stack trace - from where foo is invoked. We'll get an exception when foo is called, but the real challenge is to find where the nil came from. More generally, bugs that manifest as thrown exceptions are the easier kind of bugs. The more insidious bugs hide in programs that run just fine end-to-end but compute slightly incorrect results.

Tracing and logging

This gets us into the more general domain of debugging, where the tricks and tools programmers use are as varied as the bugs hiding in our programs. When it comes to debugging, I'm firmly in the printf camp; I rarely prefer debuggers over printf-based debugging [3], and Clojure is no exception. In fact, due to the way Clojure programs look (nested forms), I find that debuggers are even less useful in Clojure than in other languages. On the other hand, Clojure's macros make it possible to trace / print stuff in a very nice way.

For example, I find that it's useful to be able to turn debugging printouts on and off frequently. So I have this trusty code in my utilities:

(def ^:dynamic *verbose* false)

(defmacro printfv
  [fmt & args]
  `(when *verbose*
     (printf ~fmt ~@args)))

Calls to printfv can be freely scattered around the code; by default, they will not print anything. When I do want to see what these printfvs have to say, another macro comes useful:

(defmacro with-verbose
  [& body]
  `(binding [*verbose* true] ~@body))

Here's how it works; Suppose we've written this factorial function, with a debugging printout:

(defn factorial
  [n]
  (printfv "factorial: %d%n" n)
  (if (< n 1)
    1
    (* n (factorial (- n 1)))))

Now, if we just call it as usual from the REPL, we get:

debugging.core=> (factorial 6)
720

But if we want to actually see the debugging output, we call:

debugging.core=> (with-verbose (factorial 6))
factorial: 6
factorial: 5
factorial: 4
factorial: 3
factorial: 2
factorial: 1
factorial: 0
720

This optional verbosity is perfect when you're in the middle of a furious bug hunt, adding printfvs in many places in your code. with-verbose can turn verbose logging on selectively and control the amount of debugging spew [4].

This example brings us back to the tools.trace library, which provides another awesome tool that helps trace function calls (the bread and butter of Clojure programs). Enter trace-vars. After importing it, all we need to do is invoke it on any functions we want traced; for example:

debugging.core=> (trace-vars factorial)
#'debugging.core/factorial

And now invoking our factorial produces:

debugging.core=> (factorial 6)
TRACE t16315: (debugging.core/factorial 6)
TRACE t16316: | (debugging.core/factorial 5)
TRACE t16317: | | (debugging.core/factorial 4)
TRACE t16318: | | | (debugging.core/factorial 3)
TRACE t16319: | | | | (debugging.core/factorial 2)
TRACE t16320: | | | | | (debugging.core/factorial 1)
TRACE t16321: | | | | | | (debugging.core/factorial 0)
TRACE t16321: | | | | | | => 1
TRACE t16320: | | | | | => 1
TRACE t16319: | | | | => 2
TRACE t16318: | | | => 6
TRACE t16317: | | => 24
TRACE t16316: | => 120
TRACE t16315: => 720
720

We get to see the full call tree, including values of parameters and what each call returns. It even works for mutually-recursive functions:

(defn iseven?
  [n]
  (if (= n 0)
    true
    (isodd? (- n 1))))

(defn isodd?
  [n]
  (if (= n 0)
    false
    (iseven? (- n 1))))

Let's try it:

debugging.core=> (trace-vars iseven? isodd?)
#'debugging.core/isodd?
debugging.core=> (iseven? 7)
TRACE t16332: (debugging.core/iseven? 7)
TRACE t16333: | (debugging.core/isodd? 6)
TRACE t16334: | | (debugging.core/iseven? 5)
TRACE t16335: | | | (debugging.core/isodd? 4)
TRACE t16336: | | | | (debugging.core/iseven? 3)
TRACE t16337: | | | | | (debugging.core/isodd? 2)
TRACE t16338: | | | | | | (debugging.core/iseven? 1)
TRACE t16339: | | | | | | | (debugging.core/isodd? 0)
TRACE t16339: | | | | | | | => false
TRACE t16338: | | | | | | => false
TRACE t16337: | | | | | => false
TRACE t16336: | | | | => false
TRACE t16335: | | | => false
TRACE t16334: | | => false
TRACE t16333: | => false
TRACE t16332: => false
false

Note how easy it to see what calls what. Quite often, bugs are uncovered simply by carefully studying the chain of function calls some input tickles in our code, and trace-vars is a very low-effort method to enable this kind of debugging.

Deeper tracing inside cond forms

Tracing function calls is great, but sometimes insufficient. It's not uncommon to have cond forms in functions, and sometimes it's pretty hard to know which condition was actually "taken" (this isn't always easy to infer from the return value of the function). We've seen how to explore where exceptions come from with trace-forms, but exceptions are just one kind of problem. The more difficul problem arises when the code throws no exceptions but still produces a wrong value.

I've mentioned how Clojure's macro superpowers let us write very powerful debugging tools. What follows is another example.

Consider this toy code:

(cond (> 10 20) (+ 10 20)
      (> 20 10) (- 20 10)
      :else 200)

It happens to return 10 since the second condition fires. But suppose it stands for a much more complicated cond where it's not obvious which condition was taken and where the return value came from. How do we go about debugging this?

Well, we can always add a printfv into every result expression (possibly wrapping in a do form) and see what fires. This would work, but it's quite tiresome, especially for large conds. To do this automatically, we can write the following macro:

(defmacro condv
  [& clauses]
  (when clauses
    (list
     'if
     (first clauses)
     (if (next clauses)
       `(do (println (str "condv " '~(first clauses)))
            ~(second clauses))
       (throw (IllegalArgumentException.
               "cond requires an even number of forms")))
     (cons 'condv (next (next clauses))))))

It behaves just like cond, while also printing out the condition that fired. If we replace the cond in the original example with condv and evaluate it, we'll get:

debugging.core=> (condv (> 10 20) (+ 10 20)
            #_=>        (> 20 10) (- 20 10)
            #_=>        :else 200)
condv (> 20 10)
10

Note the printout before the return value of 10: condv (> 20 10) - it shows us exactly which condition was taken.

Conclusion

While beginning Clojure programmers may find the debugging experience challenging, I believe that with some effort and perseverance it's possible to get used to the unusual environment and even reach new levels of productivity by developing a set of debugging tools and techniques.

In this endeavor, Clojure's macro capabilities are an extremely powerful ally. Coupled with a fast edit-rerun cycle in the REPL, such tools can turn Clojure debugging into a much less painful activity.


[1]Alternatively, we can evaluate the same expression somewhere in our editor using a Clojure plugin (such as vim-fireplace for Vim).
[2]The astute reader will notice a slight discrepancy between our code and the output of trace-form. We don't have an if form, or do we? Quiz: what does cond expand to? Complex interactions between macros and functions is yet another reason debugging Clojure code is sometimes hard...
[3]In my professional life I spent far more time writing debuggers than actually using them.
[4]This method is only recommended when the debugging prinouts are destined to be eventually eliminated from the code. For more permanent logging with more verbosity controls, consider using a proper logging library like tools.logging.