Code Read of core.async Timeouts
Clojure’s core.async
library allows go-routine style programming in clojure as a library. This is pretty fancy – the ‘as a library’ part especially. One thing to note is that currently the go
blocks run on a fixed threadpool of size 2 + number-of-cores. This means if you block within a go
thread you can end up with thread starvation – every thread in the threadpool is blocked. So it is key that inside the go
blocks every operation is non-blocking.
A great of explanation of non-blocking workflows was given by Martin Trojer. He examines blocking versus non-blocking web requests. Good read.
Another note is that it sounds like the plan is to eventually allow your own executors for core.async
which would let you have custom IO threadpools. One example use case for this would be having a threadpool dedicated to blocking database requests while keeping your ring handlers non-blocking. Cool stuff. I feel like the channel
abstraction is something Clojure can build a great non-blocking ecosystem around (think Akka but simple).
Timeouts
Any-who … the name of the game is non-blocking. One example of taking a blocking call and turning it into a non-blocking call is the timeout
function in core.async
.
In the above sample the go
block is non blocking (while the <!!
does block outside the go
). The (timeout 1000)
function returns a channel that will be closed after 1000 milliseconds. Since it uses the channel
abstraction the <!
is able to release control of the thread while it waits on the close.
I was interested in the mechanics of how this works. It seems like a reasonable place to start examining the core.async
codebase and get familiar with Java while I was at it. I don’t have a Java background so my understanding of different tools on the JVM is limited.
So, to the source!
timers.clj
Luckily the timeout stuff is in a single file, timers.clj
, and is only 68 lines long. This seems tractable by even me.
So the key thing here is the :import
line. I don’t know what any of those things are. Good times. Immediately after we create an instance of a DelayQueue
and ConcurrentSkipListMap
. I’ve never seen defonce
before but a quick read of the docs says it defines the name only if it hasn’t already been defined. I’m guessing this is the singleton pattern.
DelayQueue
So what is a DelayQueue
? The docs to the rescue!
An unbounded blocking queue of Delayed elements, in which an element can only be taken when its delay has expired. The head of the queue is that Delayed element whose delay expired furthest in the past.
There are two key things here. One is that its a BlockingQueue
which means it has a blocking take()
operation. The second is that the elements of the queue must implement Delayed
.
Delayed
is an interface which has one method: getDelay
. getDelay
takes a TimeUnit
and returns the remaining delay associated with the object as a long
. It is pretty clear how this would be useful for the DelayQueue
. You can make a blocking take
call on the queue and each of the items in the queue implements getDelay
.
The last thing is TimeUnit
. Again, this is probably pedestrian Java stuff but I haven’t used Java before. getDelay
taking a TimeUnit
object allows the caller to specify what units they want the return value to be. For example:
This would return the number of milliseconds in 10 minutes.
Okay, so out of the list of DelayQueue
, Delayed
, TimeUnit
, and ConcurrentSkipListMap
we now understand the first three. Progress.
TimeoutQueueEntry
The next bit of code defines a type, TimeoutQueueEntry
.
This type implements two interfaces – Delayed
and Channel
(Delayed
inherits from Comparable
hence the compareTo
) – and takes a channel
and timestamp
to construct.
getDelay
returns the difference between the timestamp
and (System/currentTimeMillis)
while compareTo
just compares the timestamp
with the other TimeoutQueueEntry
’s timestamp
. close!
is the sole method of Channel
and it just passes the call to the internal channel.
The Meat
Enough setup… time for the actual timeout
function.
The first thing we do is bind timeout
to an absolute time value of milliseconds. We then get a map entry, me
, from the timeouts-map
using ceilingEntry
. timeouts-map
is the ConcurrentSkipListMap
defined with defonce
at the start of the file. Time to figure out what that is.
ConcurrentSkipListMap
Going to the docs again for ConcurrentSkipListMap
:
A scalable concurrent ConcurrentNavigableMap implementation. The map is sorted according to the natural ordering of its keys, or by a Comparator provided at map creation time, depending on which constructor is used.
That doesn’t really help – I don’t know what a NavigableMap
is. How about NavigableMap
:
A SortedMap extended with navigation methods returning the closest matches for given search targets. Methods lowerEntry, floorEntry, ceilingEntry, and higherEntry return Map.Entry objects associated with keys respectively less than, less than or equal, greater than or equal, and greater than a given key, returning null if there is no such key.
Bingo. So ceilingEntry
returns the map entry with a key ‘greater than or equal’ to the supplied value. The value we pass is the timeout
value. So we get back to the map entry with the closest timeout greater than or equal to this timeout, or null
if none exists. The more you know!
So how do we use this map entry?
This bit of code is pretty cool. We have three scenarios:
me
isnull
and we fail theand
and thewhen
evaluates tonil
.me
has a value and the key timeout IS NOT withinTIMEOUT_RESOLUTION_MS
(defined as 10) so theand
fails and thewhen
evaluates tonil
.me
has a value and the key timeout IS withinTIMEOUT_RESOLUTION_MS
so theand
istrue
and thewhen
evaluates to theTimeoutQueueEntry
’schannel
.
Why do this? It seems to be an optimization. If I request 1,000 timeouts all 500 ms from now then they can all share a single channel. That appears to be the purpose of the ConcurrentSkipListMap
timeouts-map
. Neato.
This was the first expression in an or
expression.
The or
will short circuit and return the first logical true value, or the value of the last expression. So if when
returns a channel
we are done and return the channel
as the value of the timeout
function. If not we evaluate the let
expression.
The let
expression above creates a channel
, a timeout-entry
, adds it to the timeouts-map
and the timeouts-queue
and returns the channel
. So now we know how timeouts are created and put in the queue but not how they are fulfilled.
The Worker
The last piece is the worker which reads from the queue and writes to the channels
.
The timeout-worker
is in an infinite loop
/recur
blocking on take
from the timeouts-queue
. When it gets an entry it removes the entry from the timeouts-map
and closes the entries channel
. This channel
could be used by many go
block timeouts.
Since this worker is blocking we put it on its own thread – the timeout-daemon
thread. Again, defonce
makes sure we only do this once. We create the Thread with the timeout-worker
and a name. We then make it a daemon thread and start it up. So what is a daemon thread?
Marks this thread as either a daemon thread or a user thread. The Java Virtual Machine exits when the only threads running are all daemon threads.
So the timeout-daemon
thread won’t stop the JVM from exiting. Makes sense.
Java, Java, Java
The majority of this code is built using Java constructs which I find interesting. I don’t know if I expected that or not but I have little experience interfacing Java code with Clojure code (again, not a Java guy) so enjoyed learning a bit from Java land.
Another cool thing about this is we see how timeouts
create their own thread to handle blocking. It is a pool of 1, so not too exciting, but shows how specialty threads can handle blocking and communicating with the non-blocking go
threads using channels. Useful pattern when you can’t avoid blocking.
The last thing is – what does it mean to block? According to the docs puts
to the queue in timeout
can block – “waiting if necessary for space to become available”.
When do you care? If its in j.u.c is it going to be a fast enough block that its not an issue?
Conclussion
So fun stuff. I should spend more time reading code. I’m always left wondering if my Clojure is aesthetically pleasing or not so reading code from people who do this for a living seems like a good way to develop a taste.