To learn about concurrency in Clojure, I wrote a function that accomplishes the following:
- Takes a (possibly lazy or infinite) sequence of functions representing tasks to be run asynchronously
- Runs the tasks asynchronously and in parallel
- Immediately returns a lazy sequence of promises that can be deref'd to get the return values of these tasks, in the same order
- Only starts a task once its promise has been requested from the lazy sequence returned
- Uses only up to a given number of threads at a time
My implementation, below, is based on the following algorithm:
- When an item in the lazy sequence is requested, create a promise.
- Place the task that needs to be executed in a
refcontaining a queue, and also place the promise in a separaterefcontaining another queue. Check the task queue, which entails the following:
If there is a task on the queue
ref, and the number of threads currently in use is less than the allowed number of threads, then:- Take the task off the queue, and increase the number of threads in use.
- Run the task in a future; once it is done, deliver the return value to the corresponding promise, decrease the number of threads in use, and check the task queue again.
- Return the promise.
(ns temp.core)
(defn thread-safe-println
[& more]
(locking *out* (apply println more)))
(def empty-queue clojure.lang.PersistentQueue/EMPTY)
(defn check-queue
[max-threads free-threads task-queue promise-queue]
(dosync
(when (and (seq @task-queue)
(pos? @free-threads))
(let [task (first @task-queue)
the-promise (first @promise-queue)]
(alter free-threads dec)
(alter task-queue pop)
(alter promise-queue pop)
; is it ok to use a future in a transaction?
; or does it count as a side-effect and it
; might be run multiple times (undesirable)?
(future
(let [rv (task)]
(deliver the-promise rv)
(dosync
(alter free-threads inc))
(check-queue max-threads free-threads task-queue promise-queue)))))))
(defn throttled-futures
"Takes (possibly lazy or infinite) sequence of tasks and runs them
in parallel using up to a specified number of threads at a time.
Returns a lazy sequence of objects that can be deref'd to get the
results of the corresponding tasks. Tasks are not scheduled until the
corresponding objects in the lazy sequence are requested."
([max-threads tasks]
(throttled-futures
max-threads
tasks
(ref max-threads)
(ref empty-queue)
(ref empty-queue)))
([max-threads tasks free-threads task-queue promise-queue]
(lazy-seq
(if (seq tasks)
(cons (let [the-promise (promise)]
(dosync
(let [task (first tasks)]
(alter task-queue conj task)
(alter promise-queue conj the-promise)))
(check-queue max-threads free-threads task-queue promise-queue)
the-promise)
(throttled-futures
max-threads
(rest tasks)
free-threads
task-queue
promise-queue))))))
(defn get-futures
[number-of-futures]
(println "Realizing" number-of-futures "futures")
(let [futures (doall
(take number-of-futures
(throttled-futures
3
; using a higher-arity form of map prevents chunking
; see http://stackoverflow.com/a/16567104/3538165
(map (fn [index _]
(fn []
(thread-safe-println "Starting task" index)
(Thread/sleep (* (rand) 1000))
(thread-safe-println "Finished task" index)
index))
(range 10)
(range 10)))))]
(thread-safe-println futures)
(dorun (map deref futures))
(thread-safe-println futures)))
I included get-futures to easily test throttled-futures, like so:
temp.core=> (get-futures 5)
Realizing 5 futures
Starting task 0
Starting task 2
Starting task 1
(#object[clojure.core$promise$reify__6779 0x708d9ee0 {:status :pending, :val nil}] #object[clojure.core$promise$reify__6779 0x68964df5 {:status :pending, :val nil}] #object[clojure.core$promise$reify__6779 0x4876340d {:status :pending, :val nil}] #object[clojure.core$promise$reify__6779 0x72737be0 {:status :pending, :val nil}] #object[clojure.core$promise$reify__6779 0x6e1581d0 {:status :pending, :val nil}])
Finished task 2
Starting task 3
Finished task 3
Starting task 4
Finished task 4
Finished task 1
Finished task 0
(#object[clojure.core$promise$reify__6779 0x708d9ee0 {:status :ready, :val 0}] #object[clojure.core$promise$reify__6779 0x68964df5 {:status :ready, :val 1}] #object[clojure.core$promise$reify__6779 0x4876340d {:status :ready, :val 2}] #object[clojure.core$promise$reify__6779 0x72737be0 {:status :ready, :val 3}] #object[clojure.core$promise$reify__6779 0x6e1581d0 {:status :ready, :val 4}])
As you can see, it starts three (value of max-threads) tasks, creates all of the promises, and then returns them. Then, once a task is finished, it starts a new task. In this case, it turned out that task 0 was long, so it finished last even though it was started first. Also, only five tasks are run because only the first five elements of the lazy sequence throttled-futures returned were realized.
Here are some of my concerns with this code:
- Are my uses of
refanddosynccorrect and/or appropriate? - Is this code thread-safe in all situations, or is it possible for my refs to get out of sync?
- Are there any major performance concerns?
- Is there a more elegant way to approach this problem (for instance, by using a fixed number of threads instead of creating new ones and terminating old ones)?