2

I have a buffered channel which is caching some values (Lets say that its a list with many values). Now I have to fully read (empty) the channel once its buffer is full or a timeout occurs.

After I have done that, I want the channel to again start caching values from the same source till the source is empty.

I have read a lot of materials on the internet but the concepts are still a bit confusing.

How do I do this using clojure.core.async?

EDIT

Okay, so I basically managed to write some code which does the trick.

(require '[clojure.core.async :as a :refer [>! <! >!! <!! go chan buffer close! thread alts! alts!! timeout offer! poll! buffer put!]])

(defn on-overflow-or-timeout [channel]
  (do
    (println "current used space: " (count (.buf channel)))
      (if (> (count (.buf channel)) 0)
      (let [loop-range (range (count (.buf channel)))]
        (do
          (println "retrieving values.....")
          (dorun
          (for [i loop-range]
            (println "retrieved value: " (poll! channel))
            ))))
      (println "No values in the channel. Please restart the process....")
      )))


(defn process [channel buffer-size tout]
  (let [tch (timeout tout)
        check-chan (chan 2)]
    (loop []
      (let [value (read-string (read-line))]
        (do
                (println "Storing the value in actual channel: " value)
                (offer! channel value)
          (offer! check-chan value)
          ; Checking only till half its capacity
          (if (>= (count (.buf channel)) (int (Math/ceil (/ buffer-size 2))))
            (do
              (println "overflowed.....")
              (on-overflow-or-timeout channel)
              (recur)
              )
            (let [[win-val win-chan] (alts!! [check-chan tch])]
              (if (nil? win-val)
                (do
                  (println "timed out.....")
                  (on-overflow-or-timeout channel)
                  (recur)
                  )
                (do
                  (println "retrieved value from check-chan: " win-val)
                  (recur)
                  )))))))))

But I still feel this code needs to be optimised using GO blocks or something. Can anyone point out the flaws in this code and manoeuvre it in the right direction?

PLEASE NOTE that I will be using this code to cache elasticsearch queries and results or something like that and store them somewhere at timeout or when the buffer is full.

3
  • 2
    Show us some code and where you’re stuck? Commented Feb 23, 2016 at 10:41
  • @glts I apologise that I could not even start coding as I am very confused regarding core.async. I am not asking for a full fledged solution. Just a small outline of the whole process or maybe even a pseudo-code of sorts. Commented Feb 23, 2016 at 10:48
  • @glts I have written a bit of code. Just have a look at it and please point out the flaws. Commented Feb 23, 2016 at 18:25

1 Answer 1

2

The fact that a channel has a .buf field is an implementation detail and you should not use it.

Not a 100% about what your requirements are but when working with core.async you have to try to think in terms of "threads" (go blocks) doing just one thing and communicating with other "threads" through channels.

From what I gather from your code, it seems that you want:

  1. A go block that read from read-line. Will write to a chan
  2. A go block that acts immediately on new values. Will read from a chan
  3. A go block that does the buffering. Will read from a chan and write to another
  4. A go block that does whatever with the buffers. Will read from a chan

(2) and (3) need to consume from (1). Use a mult channel so they both get a copy of what (1) writes. (4) can read directly from (3).

A possible implementation of (3) would look like:

(defn buffer-or-timeout [src out buffer-size tout]
  (go-loop [[timeout          buffer to-send]
            [(a/timeout tout) []     nil]]
    (when (seq to-send)
      (println "flushing" to-send)
      (a/>! out to-send))
    (recur (a/alt!
             src ([v] (if (= buffer-size (inc (count buffer)))
                        [(a/timeout tout) [] (conj buffer v)]
                        [timeout (conj buffer v) nil]))
             timeout [(a/timeout tout) [] buffer]))))
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.