Code Read of core.async Timeouts
Clojure’s core.async
library allows go-routine style programming in clojure as a library. This is pretty fancy – the ‘as a library’ part especially. One thing to note is that currently the go
blocks run on a fixed threadpool of size 2 + number-of-cores. This means if you block within a go
thread you can end up with thread starvation – every thread in the threadpool is blocked. So it is key that inside the go
blocks every operation is non-blocking.
A great of explanation of non-blocking workflows was given by Martin Trojer. He examines blocking versus non-blocking web requests. Good read.
Another note is that it sounds like the plan is to eventually allow your own executors for core.async
which would let you have custom IO threadpools. One example use case for this would be having a threadpool dedicated to blocking database requests while keeping your ring handlers non-blocking. Cool stuff. I feel like the channel
abstraction is something Clojure can build a great non-blocking ecosystem around (think Akka but simple).
Any-who … the name of the game is non-blocking. One example of taking a blocking call and turning it into a non-blocking call is the timeout
function in core.async
(let [c (chan)]
(<! (timeout 1000))
(>! c "hello from the future!"))
(prn (<!! c))
(close! c))
In the above sample the go
block is non blocking (while the <!!
does block outside the go
). The (timeout 1000)
function returns a channel that will be closed after 1000 milliseconds. Since it uses the channel
abstraction the <!
is able to release control of the thread while it waits on the close.
I was interested in the mechanics of how this works. It seems like a reasonable place to start examining the core.async
codebase and get familiar with Java while I was at it. I don’t have a Java background so my understanding of different tools on the JVM is limited.
So, to the source!
Luckily the timeout stuff is in a single file, timers.clj
, and is only 68 lines long. This seems tractable by even me.
(ns ^{:skip-wiki true}
(:require [clojure.core.async.impl.protocols :as impl]
[clojure.core.async.impl.channels :as channels])
(:import [java.util.concurrent DelayQueue Delayed TimeUnit ConcurrentSkipListMap]))
(set! *warn-on-reflection* true)
(defonce ^:private ^DelayQueue timeouts-queue
(defonce ^:private ^ConcurrentSkipListMap timeouts-map
(def ^:const TIMEOUT_RESOLUTION_MS 10)
So the key thing here is the :import
line. I don’t know what any of those things are. Good times. Immediately after we create an instance of a DelayQueue
and ConcurrentSkipListMap
. I’ve never seen defonce
before but a quick read of the docs says it defines the name only if it hasn’t already been defined. I’m guessing this is the singleton pattern.
So what is a DelayQueue
? The docs to the rescue!
An unbounded blocking queue of Delayed elements, in which an element can only be taken when its delay has expired. The head of the queue is that Delayed element whose delay expired furthest in the past.
There are two key things here. One is that its a BlockingQueue
which means it has a blocking take()
operation. The second is that the elements of the queue must implement Delayed
is an interface which has one method: getDelay
. getDelay
takes a TimeUnit
and returns the remaining delay associated with the object as a long
. It is pretty clear how this would be useful for the DelayQueue
. You can make a blocking take
call on the queue and each of the items in the queue implements getDelay
The last thing is TimeUnit
. Again, this is probably pedestrian Java stuff but I haven’t used Java before. getDelay
taking a TimeUnit
object allows the caller to specify what units they want the return value to be. For example:
TimeUnit.MILLISECONDS.convert(10L, TimeUnit.MINUTES)
This would return the number of milliseconds in 10 minutes.
Okay, so out of the list of DelayQueue
, Delayed
, TimeUnit
, and ConcurrentSkipListMap
we now understand the first three. Progress.
The next bit of code defines a type, TimeoutQueueEntry
(deftype TimeoutQueueEntry [channel ^long timestamp]
(getDelay [this time-unit]
(.convert time-unit
(- timestamp (System/currentTimeMillis))
[this other]
(let [ostamp (.timestamp ^TimeoutQueueEntry other)]
(if (< timestamp ostamp)
(if (= timestamp ostamp)
(close! [this]
(impl/close! channel)))
This type implements two interfaces – Delayed
and Channel
inherits from Comparable
hence the compareTo
) – and takes a channel
and timestamp
to construct.
returns the difference between the timestamp
and (System/currentTimeMillis)
while compareTo
just compares the timestamp
with the other TimeoutQueueEntry
’s timestamp
. close!
is the sole method of Channel
and it just passes the call to the internal channel.
The Meat
Enough setup… time for the actual timeout
(defn timeout
"returns a channel that will close after msecs"
(let [timeout (+ (System/currentTimeMillis) msecs)
me (.ceilingEntry timeouts-map timeout)]
(or (when (and me (< (.getKey me) (+ timeout TIMEOUT_RESOLUTION_MS)))
(.channel ^TimeoutQueueEntry (.getValue me)))
(let [timeout-channel (channels/chan nil)
timeout-entry (TimeoutQueueEntry. timeout-channel timeout)]
(.put timeouts-map timeout timeout-entry)
(.put timeouts-queue timeout-entry)
The first thing we do is bind timeout
to an absolute time value of milliseconds. We then get a map entry, me
, from the timeouts-map
using ceilingEntry
. timeouts-map
is the ConcurrentSkipListMap
defined with defonce
at the start of the file. Time to figure out what that is.
Going to the docs again for ConcurrentSkipListMap
A scalable concurrent ConcurrentNavigableMap implementation. The map is sorted according to the natural ordering of its keys, or by a Comparator provided at map creation time, depending on which constructor is used.
That doesn’t really help – I don’t know what a NavigableMap
is. How about NavigableMap
A SortedMap extended with navigation methods returning the closest matches for given search targets. Methods lowerEntry, floorEntry, ceilingEntry, and higherEntry return Map.Entry objects associated with keys respectively less than, less than or equal, greater than or equal, and greater than a given key, returning null if there is no such key.
Bingo. So ceilingEntry
returns the map entry with a key ‘greater than or equal’ to the supplied value. The value we pass is the timeout
value. So we get back to the map entry with the closest timeout greater than or equal to this timeout, or null
if none exists. The more you know!
So how do we use this map entry?
(when (and me (< (.getKey me) (+ timeout TIMEOUT_RESOLUTION_MS)))
(.channel ^TimeoutQueueEntry (.getValue me)))
This bit of code is pretty cool. We have three scenarios:
and we fail theand
and thewhen
evaluates tonil
has a value and the key timeout IS NOT withinTIMEOUT_RESOLUTION_MS
(defined as 10) so theand
fails and thewhen
evaluates tonil
has a value and the key timeout IS withinTIMEOUT_RESOLUTION_MS
so theand
and thewhen
evaluates to theTimeoutQueueEntry
Why do this? It seems to be an optimization. If I request 1,000 timeouts all 500 ms from now then they can all share a single channel. That appears to be the purpose of the ConcurrentSkipListMap
. Neato.
This was the first expression in an or
(or (when (and me (< (.getKey me) (+ timeout TIMEOUT_RESOLUTION_MS)))
(.channel ^TimeoutQueueEntry (.getValue me)))
(let [timeout-channel (channels/chan nil)
timeout-entry (TimeoutQueueEntry. timeout-channel timeout)]
(.put timeouts-map timeout timeout-entry)
(.put timeouts-queue timeout-entry)
The or
will short circuit and return the first logical true value, or the value of the last expression. So if when
returns a channel
we are done and return the channel
as the value of the timeout
function. If not we evaluate the let
The let
expression above creates a channel
, a timeout-entry
, adds it to the timeouts-map
and the timeouts-queue
and returns the channel
. So now we know how timeouts are created and put in the queue but not how they are fulfilled.
The Worker
The last piece is the worker which reads from the queue and writes to the channels
(defn- timeout-worker
(let [q timeouts-queue]
(loop []
(let [^TimeoutQueueEntry tqe (.take q)]
(.remove timeouts-map (.timestamp tqe) tqe)
(impl/close! tqe))
(defonce timeout-daemon
(doto (Thread. ^Runnable timeout-worker "clojure.core.async.timers/timeout-daemon")
(.setDaemon true)
The timeout-worker
is in an infinite loop
blocking on take
from the timeouts-queue
. When it gets an entry it removes the entry from the timeouts-map
and closes the entries channel
. This channel
could be used by many go
block timeouts.
Since this worker is blocking we put it on its own thread – the timeout-daemon
thread. Again, defonce
makes sure we only do this once. We create the Thread with the timeout-worker
and a name. We then make it a daemon thread and start it up. So what is a daemon thread?
Marks this thread as either a daemon thread or a user thread. The Java Virtual Machine exits when the only threads running are all daemon threads.
So the timeout-daemon
thread won’t stop the JVM from exiting. Makes sense.
Java, Java, Java
The majority of this code is built using Java constructs which I find interesting. I don’t know if I expected that or not but I have little experience interfacing Java code with Clojure code (again, not a Java guy) so enjoyed learning a bit from Java land.
Another cool thing about this is we see how timeouts
create their own thread to handle blocking. It is a pool of 1, so not too exciting, but shows how specialty threads can handle blocking and communicating with the non-blocking go
threads using channels. Useful pattern when you can’t avoid blocking.
The last thing is – what does it mean to block? According to the docs puts
to the queue in timeout
can block – “waiting if necessary for space to become available”.
(.put timeouts-queue timeout-entry)
When do you care? If its in j.u.c is it going to be a fast enough block that its not an issue?
So fun stuff. I should spend more time reading code. I’m always left wondering if my Clojure is aesthetically pleasing or not so reading code from people who do this for a living seems like a good way to develop a taste.