I have a buffered channel which is caching some values (Lets say that its a list with many values). Now I have to fully read (empty) the channel once its buffer is full or a timeout occurs.
After I have done that, I want the channel to again start caching values from the same source till the source is empty.
I have read a lot of materials on the internet but the concepts are still a bit confusing.
How do I do this using clojure.core.async?
EDIT
Okay, so I basically managed to write some code which does the trick.
(require '[clojure.core.async :as a :refer [>! <! >!! <!! go chan buffer close! thread alts! alts!! timeout offer! poll! buffer put!]])
(defn on-overflow-or-timeout [channel]
(do
(println "current used space: " (count (.buf channel)))
(if (> (count (.buf channel)) 0)
(let [loop-range (range (count (.buf channel)))]
(do
(println "retrieving values.....")
(dorun
(for [i loop-range]
(println "retrieved value: " (poll! channel))
))))
(println "No values in the channel. Please restart the process....")
)))
(defn process [channel buffer-size tout]
(let [tch (timeout tout)
check-chan (chan 2)]
(loop []
(let [value (read-string (read-line))]
(do
(println "Storing the value in actual channel: " value)
(offer! channel value)
(offer! check-chan value)
; Checking only till half its capacity
(if (>= (count (.buf channel)) (int (Math/ceil (/ buffer-size 2))))
(do
(println "overflowed.....")
(on-overflow-or-timeout channel)
(recur)
)
(let [[win-val win-chan] (alts!! [check-chan tch])]
(if (nil? win-val)
(do
(println "timed out.....")
(on-overflow-or-timeout channel)
(recur)
)
(do
(println "retrieved value from check-chan: " win-val)
(recur)
)))))))))
But I still feel this code needs to be optimised using GO blocks or something. Can anyone point out the flaws in this code and manoeuvre it in the right direction?
PLEASE NOTE that I will be using this code to cache elasticsearch queries and results or something like that and store them somewhere at timeout or when the buffer is full.