Blake Smith

create. code. learn.

»

understanding clojure concurrency, part 2

This is part 2 of a 2 part series I’m writing about Clojure concurrency. Check out the first post here

concurrency primitives

Clojure provides a grab-bag of concurrency primitives and functions that give meaning and control to your multi-threaded code. Most programming languages leave these constructs up to the library writers - Clojure takes a different stance.

atoms

First, I’d like to talk about atoms. The concept of atomicity might not be new to you, but let’s recap just to solidify the idea. Atomicity implies that even in an environment with multiple actors (threads) working on the same piece of data, you can guarantee that updates to that data either happen entirely, or don’t happen at all. In practice, this is used when multiple threads are reading and writing to one data structure to share its contents. Here’s how you create an atom in Clojure:

user=> (atom 1)
#<Atom@15c313da: 1>

This will create an atom that houses the Java Integer 1 inside of it. To fetch the value of the atom, you use the deref function.

user=> (def a (atom 1))
#'user/a
user=> a
#<Atom@3f78e13f: 1>
user=> (deref a)
1
user=>

You can also deref an atom using the ‘@’ shortcut:

user=> (def a (atom 1))
#'user/a
user=> @a
1
user=>

To change the value of a atom, you use the swap! function.

user=> a
#<Atom@30394ffa: 1>
user=> (swap! a inc)
2
user=>

The swap! function takes the atom as its first argument, and a function to apply to the value in the atom. In this case, we applied the ‘inc’ function, which increments the integer by 1.

Under the covers, the swap! function is doing a compare-and-set! operation, which means that if two threads are in a race to apply their function to the atom, the loser will retry their swap if the value has changed since they began the swap. Therefore, it’s very important that ‘swap!’ procedures be completely free of side-effects. No log writing, database updating or anything else - otherwise you’ll get unpredictable behavior in your I/O layers.

Atoms also allow you to specify a :validator function. This function will be executed whenever the atom is attempting to change state:

user=> (def a (atom 1 :validator #(< % 10)))
#'user/a
user=> a
#<Atom@476dc5c9: 1>
user=> (swap! a (partial + 8))
9
user=> a
#<Atom@476dc5c9: 9>
user=> (swap! a inc)
java.lang.IllegalStateException: Invalid reference state (NO_SOURCE_FILE:0)
user=> 

In the example, we created an Integer that cannot be greater than 10. The first time we apply a swap function, the atom successfuly changes. When we try to increment the atom one more time up to 10, an IllegalStateException is thrown. This is really useful in cases where retrying a swap operation might put the atom in an illegal state, especially during thread races.

What I love about atoms is how simple they are. You take an immutable persistent data type and you can make it atomic, without having to remake your entire processing pipeline or the underlying data structure. You get a tool that helps create data that is shared, synchronous, and independent in state. How do you know when it’s the right decision to use an atom? The general rule of thumb is that if you only have to manipulate one data item concurrently and synchronously, you should use an atom. A good example of this might be a counter that counts how many items have been sold in an online store - there might be multiple webserver trying to increment and decrement this number at once, but the count will always be guaranteed to be in a single state at any given moment in time.

When you start needing to coordinate a group of data items that need to mutate together, you should start to think about Refs.

refs

Because Clojure has immutable data structures, Refs provide a mechanism to track the same fact idea (identity) over a period of time. Let’s look at an example of this.

Here are some facts I know to be true:

  1. On July 22, 2011 - Bob Anderson lived in Chicago.
  2. On September 14, 2011 - Bob Anderson lived in San Francisco

In most programming languages, you might have a variable called ‘userLocation’ that you would update when Bob moves from Chicago to San Francisco. As far as your program is concerned, when you update the ‘userLocation’ variable, your program loses awareness that Bob ever lived in Chicago. Once you rewrite that variable, Bob’s previous location is lost forever. Now just because you updated the ‘userLocation’ to San Francisco doesn’t change the fact that at an earlier point in time, Bob lived in Chicago. What’s worse, is that any other thread that was holding on to the ‘userLocation’ variable would have its value change out from under it. This is where the nasty dragons live.

What a Clojure Ref does is track a series of facts over time. So in the Clojure world, conceptually, a Ref looks like this:

Question: “Where does Bob live at this point in time?”

  1. On July 22, 2011 - Bob Anderson lived in Chicago.
  2. On September 14, 2011 - Bob Anderson lived in San Francisco

So in Clojure, you would have a Ref called user-location, and if Bob moves to a new location, you update the Ref to point to the newest fact (Bob living in San Francisco). Again, what a Ref does is tracks values over time. Before Bob moved, the Ref pointed to the fact that Bob lived in Chicago. When Bob moves, on September 14, 2011, the Ref points to the fact that Bob now lives in San Francisco. Anyone holding on to the previous value of ‘Chicago’ would still retain that value until it updated its state from the Ref.

But that’s not the main use case for Refs.

If atoms are for managing the fact changes of a single data structure, then Refs are for managing changes of multiple data structures together, atomically. Put simply, Refs give you in memory transactions using a software transactional memory system. I’m not going to get into the details of software transactional memory systems, but if you’re curious I’d recommend doing some reading on them. Let’s take our example with Bob and elaborate on it more.

Bob’s company stores 2 facts about him, where he lives and how much his salary is. Both of these pieces of data always move together in a coordinated fashion to adjust for cost of living. When Bob moves from Chicago to San Francisco, his salary also needs to increase at the exact same time. Because these two facts are bound together, they either both change, or neither of them change. Let’s setup our two Refs and get started:

	(def location (ref "Chicago"))
	(def salary (ref 30000))

You access Ref values the same way you access atom values, using the deref function (or the @ prefix):

user=> @location
"Chicago"
user=> @salary
30000

We come to September 14, 2011 - time for Bob to move and get a salary bump at the exact same time. For this to happen as one atomic change, we need a transaction. Note that it is impossible for you to change the value of a Ref without a transaction. Clojure provides the ‘dosync’ macro that wraps an expression in a transaction for Ref changes.

	(dosync
	  (ref-set location "San Francisco")
	  (alter salary (partial + 15000)))

The first Ref we altered by simply changing its value to a completely new value. Again, Bob’s location used to be Chicago, but now it’s San Francisco. We achieved this change using the ‘ref-set’ function, which allows you to alter the value of a Ref outright. At the same time Bob moves to San Francisco, he gets a $15,000 pay increase. We represent this pay increase by applying the ‘alter’ operation to the salary Ref. Similar to the ‘swap!’ operation of the Atom, the alter command takes the Ref and a function to apply to the current Ref value - in our case we’re applying a plus operation to his salary. As far as other threads are concerned, both of these changes will happen at the same point in time. After we alter the values, we get the new expected values by derefing:

user=> @location
"San Francisco"
user=> @salary
45000

Refs give you the amazing power to coordinate several data structures, forcing their changes to occur together as one atomic group. Similar to atoms, if two threads attempt to change the value of a Ref at the same time, one of them will succeed and the other will fail. When the second one fails, it will be automatically retried. So, just like with Atoms, Ref transactions must be completely free of side effects.

Keep in mind that the more data structures you group together into a transaction, the more transaction retries you will experience under heavy write load, and the more your performance will degrade. The less your data structures have to coordinate, the better. Think about it like trying to schedule a meeting with someone else. It’s not too hard to have the two of you find a common time to meet, but what happens when you have to schedule a meeting with ten busy people? It becomes much harder to get everyone in the same room at the same time. People often end up not showing up or the meeting gets moved or cancelled due to last minute scheduling conflicts. The same goes for manipulating many data structures in a single transaction. Remember this concept when doing coordinated Ref manipulation: the faster the transactions, and the less the participants, the better. Less coordination helps computers scale horizontally and go faster by doing more at once.

futures

Futures are one of my favorite Clojure concurrency primitives. They offer a simple and flexible way to dispatch operations into another thread and then retrieve the results of that computation from the existing thread. Put more simply, they’re terrific at producing a request/response operation that occurs between two threads.

To start off, let’s start playing around a bit:

	(future (Thread/sleep 1000) (prn "Hello futures!"))

When you execute this in your REPL, you’ll notice that it returns immediately, and 1 second later a print message sent to standard out. This use of futures doesn’t give us much over newing up a Thread and telling it to print to the screen. Futures give us the ability deref and retrieve the value the Thread returns in the exact same way we deref Atoms and Refs. Let’s do that now using a simple factorial function:

	(defn factorial [n]
	  (apply * (map bigint (range 1 (inc n)))))

If we want to find the factorial of a very large number, we can perform this computation in another thread and then retrieve the computed value using our handy friend deref:

	(def result (future (factorial 10000)))
	... do some other things here ...
	(deref result)

Following the same pattern, deref will return the result immediately if the future has already completed, otherwise it will block until the future is done. In this case, we dispatched a really expensive action to another thread, so our main thread could continue execution. Then, when our main thread is ready to consume the result of the future, it simply derefs it like we’ve seen before. I like to think of this like a request/response pattern that spans multiple threads instead of two servers. You can offload the heavy lifting to something else, and keep on doing what you’re doing.

Now let’s step it up another notch and do something non-trivial. A user on my website uploads a photo album, and I want to back all of the images up to Amazon S3. I could iterate through each photo and upload them one at a time, but that would be slow and would waste a lot of CPU cycles simply waiting for a response from Amazon. Why don’t we instead use a collection of futures to fan out the images simultaneously? Let me show you how this is done.

First, let’s assume the user has just submitted all the files to the server, and they’re sitting in memory as a collection of java BufferedImages. To simulate this, I’m just going to read the same image 10 times:

	(import [javax.imageio ImageIO])
	
	(def images
	  (take 10
	        (repeatedly
	         #(ImageIO/read (clojure.java.io/input-stream
				 "/Users/blake/Pictures/blake_signature.jpg")))))

Utilizing the ‘repeatedly’ function to generate an infinite sequence of BufferedImages, we only realized 10 of them using the ‘take’ function. We have a collection of BufferedImages, now let’s upload all 10 of them concurrently to S3. To do this, I’m going to use the ‘clj-aws-s3’ library. First, let’s write our function that will upload the image to S3 itself:

	(require [aws.sdk.s3 :as s3])
	
	(def credentials
	  {:access-key "my s3 access key"
	   :secret-key "super secret key"})
	   
	(def bucket-name "user-images")   
	
	(defn save-image [image filename]
	  (with-open [os (ByteArrayOutputStream.)]
	    (ImageIO/write image "jpg" os)
	    (let [request
	          (s3/put-object credentials bucket-name filename
	                         (clojure.java.io/input-stream (.toByteArray os)))]
	      {:finished true :request request})))

That looks like a lot, but it’s not so bad. First we set up some of our boilerplate code. We import the s3 library and then define our S3 credentials using a hash map and set our S3 bucket name. Our ‘upload-image’ function takes a BufferedImage as input, and a desired filename for the image in our S3 bucket. From there, we open a new ByteArrayOutputStream. This is where we’re going to write the raw bytes from our image to in preparation for uploading them. After that, we build a request and invoke the ‘s3/put-object’ with our credentials, filename and an InputStream from our ByteArrayOutputStream. We basically wrote out the bytes into memory and then read them out again to upload to S3. That returns a s3 request object which we inspect to write out to our hash map along with an easy to access :finished key which all gets returned from the function. Conceptually, all this function is really doing is reading the raw bytes out of the BufferedImage and then shuttling them off to S3.

Now, let’s write one more function that’s takes our entire BufferedImage collection as input and utilizes futures to fan multiple threads out to S3 all simultaneously:

	(defn upload-images [images]
	  (doall
	    (map-indexed
	     (fn [image i]
	       (future (upload-image image (format "myimage-%s.jpg" i))))
	     images)))

We’re mapping over our collection of images, and for each image we’re generating a future thats body content will upload the image to S3. When we invoke this function, we’ll get back a collection of futures, each one representing one image that’ll get uploaded by one of the threads in the thread pool. Keep in mind that once the thread pool is exhausted, other futures will have to wait for a thread to become available again.

Let’s try out our parallel uploading:

	(def f (upload-images images))
	(map deref f)

When every single image upload completes, you’ll get back a collection of hashes like the one we defined in ‘save-image’.

Like I mentioned earlier, futures are one of my favorite concurrency primitives, because they allow you to dish out heavy or long running tasks to a thread pool with ease, letting your main execution path continue on its marry way, and not get bogged down by long running requests.

agents

Let’s say you have a problem that almost warrants the use of an Atom, but you need your functionality to be asynchronous instead of synchronous. On top of that, you know that it’s a bad idea to use Atoms when your functions have side-effects, because they might get invoked several times when threads are competing, but your functions are going to do some I/O. What can you do? You should consider using Agents.

Note: the following example was borrowed from Will Larson

One great use case for Agents is for shared logging. We’re writing our program, and we have several threads that need to log to the same shared BufferedWriter file. We know that BufferedWriter is not a thread safe data structure, and ignoring this fact will lead to strange interleaving in our log files due to thread contention. We also know that our logging use case is something that can happen asynchronously from our main app logic - logging really shouldn’t be stopping the main flow of your program execution. Agents will help us solve this problem quite easily.

First, we wrap the BuffedWriter in an agent, similar to how we wrap Atoms and Refs.

	(import [java.io BufferedWriter FileWriter])
	
	(def logfile
	  (agent (BufferedWriter. (FileWriter. "shared-logfile.log"))))

Now let’s go ahead and write our log function that’s going to handle writing out to the BufferedWriter.

	(defn write-out [out msg]
	  (.write out msg)
	  out)

This function writes to the BufferedWriter, and then returns the instance of the BufferedWriter. We’ll see why it’s important to return the BufferedWriter instance in a moment.

Now, we need a way to dispatch our write-out function to the agent. To do this, we use the built-in ‘send’ function.

	(defn log [logger msg]
	  (send logger write-out msg))
	  
	(defn close [logger]
	  (send logger #(.close %)))

The first parameter to ‘send’ is the agent itself, followed by the function to apply to the agent, and any additional arguments to pass into the function. In our case, we’re applying the write-out function to the logger agent and passing the message along to the write-out function. Then we can send log events to the logger using the ‘log’ function:

	(log logfile "My awesome message")
	(log logfile "My other awesome message")
	(close logfile)

And both of those log lines would be present in our log file. This is all fine and well, but it doesn’t really demonstrate the reason we used agents in the first place. We chose agents because we wanted to be able to have multiple threads writing to the same log file without any interleave or contention issues. Let’s throw a bunch of threads at our logger, and have them all write out to the log file:

	(dotimes [i 1000]
	  (future (log logfile (format "Message number %s\n" i))))
	  
	(close logfile)

This will utilize a thread pool to write all those log events to the file. When I open up my logfile to see the results, I get something tha looks like this:

Message number 1
Message number 3
Message number 5
Message number 9
Message number 11
Message number 22
Message number 23
Message number 24
Message number 25
...

You’ll notice that the log numbers are not in order - because multiple threads were trying to write to the file at the same time, so ordering is not guaranteed or expected. The good news is, none of our log lines are interleaved. This is due to our functions being applied to the agent in a serial fashion. Think of agents like a queue for the functions you want to apply to some piece of state. Even though multiple threads are trying to apply their function to the agent at the same time, all the function calls get ordered and applied one after the other.

There are two ways you can dispatch a function to an agent. You can use the ‘send’ function like we did in the example, or you can use the ‘send-off’ function. On the surface, these two functions appear to do the same thing, with a subtle difference. Under the covers, ‘send’ uses a thread pool that has a fixed size, depending on the number of cores your CPU has. This means that if one of the threads from the thread pool blocks for a long period of time, you risk starving your agent from available threads. If all the threads in the thread pool block for a long period of time, you’ll get a backup of ‘sends’.

On the other hand, using ‘send-off’ will utilize a separate thread pool that is capable of growing in size. This means that if your threads block for any lengthy period of time, more threads can be created to accomodate the increasing demands (up to a certain point).

What does this mean for you as the programmer? The general rule of thumb is if you’re making long I/O calls, like going out to the network for something, use ‘send-off’. If your operations are generally CPU bound, or risk no chance of blocking for a long time, use ‘send’.

promises

In a basic sense, a promise is a contract between one thread and another to deliver a calculation at some point in the future. Put another way, a promise is a mechanism to safetly deliver data between two threads. Let’s walk through a circumstance where using a promise might ease the ability to share data in between concurrent processes.

Imagine you’re planning a party. The restaurant party manager calls you three weeks before the event to get the guest count. You don’t yet know how many people are going to come to your party, but you expect you’ll have the final head count soon. You tell the party manager that you’re still working on getting final head count and that you’ll call him and let him know as soon as you’ve got the number nailed down. You work on getting the last RSVPs gathered, and three days later you call back the restaurant owner and give him the final count.

In this example, you don’t know what the exact guest count is, but you know it’s a number you’re going to have to give the party manager in the future - so you make him a promise that you’ll tell him the final count as soon as you know it. Then you go about calling your invited guests to sum the total count. Once you know number, you deliver on that promise by calling the manager back and telling him what it is.

This interaction represents how you might use Clojure promises. The typical pattern goes like this:

  • A thread generates a promise by invoking the ‘promise’ function with no arguments.
	(def guest-count (promise))
  • That thread passes the promise along to another thread. This states the intetion “I am going to deliver the guest count to you at some point in the future when I know it”. That other thread is free to do other things while waiting on the value contained in the promise, and check to see if the other thread delivered the value periodically.
	(future (manager-duties guest-count))

In this case, we use a future to perform all the manager duties in another thread. The manager duties might be described like this:

	(defn talk-to-guests []
	  (prn "Talking to guests"))
	
	(defn train-wait-staff []
	  (prn "Training wait staff"))
	
	(defn check-party-guest-count [p]
	  (if (realized? p)
	    (deref p)
	    (prn "Don't know the guest count yet, call later")))
	
	(defn manager-duties [cnt]
	  (talk-to-guests)
	  (train-wait-staff)
	  (if-let [count (check-party-guest-count cnt)]
	    (prn (format "Total guest count is %s" count))
	    (do
	      (Thread/sleep 1000)
	      (manager-duties cnt))))

We set up a few tasks for the party manager to perform, and he performs them continually. Notice the use of the ‘realized?’ function in the ‘check-party-guest-count’ function. This function can be used on promises to check if the promise has been delived yet. You may need to do this in some circumstances if you don’t want to invoke ‘deref’ and block until the promise is delivered.

  • The guest count is delivered from the main thread, thus completing the promise. The manager stops performing his tasks.
	(deliver guest-count 50)

When I first encountered promises, I was confused about how they differ from futures. When do I use one and when would I use the other? Let me explain the difference. With futures, your main thread of execution is delegating work to another thread. When the value of the future comes back, the main thread gets to harvest this value and benefit from it. With promises, this relationship is usually inverted. The main thread of execution generates a promise, and passes it off to another thread. The main thread of execution does the hard work to generate the value that will be delivered, and delivers it to the second thread. The second thread gets to benefit from the hard work of the main thread.

One handy rule of thumb for determining whether to use a future or a promise is:

  1. I want another thread to do my work for me and tell me the result: use a future.
  2. I want to do the work and tell the other thread when I’m done: use a promise.

the world ahead

I hope you’ll come away from this feeling more empowered to write concurrent Clojure programs. Part of the challenge of learning this stuff is figuring out the new patterns and different ways of thinking that are required to write concurrent code. Concurrent and parallel programming is a very big topic, and this series of posts serves as a way to get excited about the potential of multi-threaded programming with Clojure. In a lot of ways, our computer systems have run out of affordable vertical scaling runway - the future holds the growing need to make our computer systems grow horizontally, and thus places more demands on the programmer to write concurrent code. I believe that mastering this stuff can open up doors to all kinds of problems you didn’t even think solvable. Jump in, and have fun!


about the author

Blake Smith is a Principal Software Engineer at Sprout Social.