8

I am looking for an idiomatic way to do the following. I have a http server that on a particular GET request responds with a stream of messages. Now, since this message is non-terminating, when I use clj-http/get, the call just blocks forever (I am using LightTable). I would like to set up a callback or a core.async style channel to do some operation on the message as it comes in. Even writing the stream to a file would be a good first step for me. Any pointers? Here is the call:

    (require '[clj-http.client :as client])

    (def url "http://hopey.netfonds.no/tradedump.php?date=20150508&paper=AAPL.O&csv_format=txt")

    (client/get url)

The date has to be changed to today's date for the data to stream. Thanks!

3 Answers 3

13

To just write the stream to file, a simple approach is using clojure.java.io/copy (which takes an input-stream such as that returned by (:body (client/get some-url {:as :stream})) and an output stream and copies from one to the other). Something like

(ns http-stream
  (:require [clj-http.client :as client]
            [clojure.java.io :as io]))


(with-open [in-stream (:body (client/get "http://hopey.netfonds.no/tradedump.php?date=20150508&paper=AAPL.O&csv_format=txt" {:as :stream}))
            out-stream (->> "streamoutput.txt"
                        io/as-file
                        io/output-stream)]
  (io/copy in-stream out-stream))

That gave me several thousand lines of tab separated values over a couple seconds. Now, to process them with core.async at the level of lines we probably want to process the stream a bit more using a reader and a line-seq:

(ns http-stream
  (:require [clj-http.client :as client]
            [clojure.core.async :as async]
            [clojure.java.io :as io]
            [clojure.string :as str]))


(defn trades-chan
  "Open the URL as a stream of trades information. Return a channel of the trades, represented as strings."
  [dump-url]
  (let[lines (-> dump-url
                 (client/get {:as :stream})
                 :body
                 io/reader 
                 line-seq) ];;A lazy seq of each line in the stream.
    (async/to-chan lines))) ;;Return a channel which outputs the lines

;;Example: Print the first 250 lines.
(let [a (trades-chan "http://hopey.netfonds.no/tradedump.php?date=20150508&paper=AAPL.O&csv_format=txt")]
  (async/go-loop [takes 250]
                 (when (< 0 takes) 
                   (println (async/<! a))
                   (recur (dec takes)))))

Now, with this you are largely started up, but I notice that the stream always starts with a description of what the columns are

time    price   quantity    board   source  buyer   seller  initiator

and you can use that as a chance to improve a little bit. In particular, that's enough information to build a transducer for the trades-chan that can turn the trades into a more convenient format to work with, like a map. Also, we likely want a way to stop taking elements and close the connection sometime. I'm not that familiar with core.async myself but this seems to work:

(defn trades-chan
  "Open the URL as a tab-separated values stream of trades. 
  Returns a core.async channel of the trades, represented as maps.
  Closes the HTTP stream on channel close!"
  [dump-url]
  (let[stream (-> dump-url
                 (client/get {:as :stream})
                 :body)
       lines  (-> stream
                 io/reader 
                 line-seq) ;;A lazy seq of each line in the stream.
       fields (map keyword (str/split (first lines) #"\t")) ;; (:time :price :quantity ...
       transducer (map (comp #(zipmap fields %) #(str/split % #"\t")))  ;;A transducer that splits strings on tab and makes them into maps with keys from fields
       output-chan (async/chan 50 transducer)]
    (async/go-loop [my-lines (drop 1 lines)]
                   (if (async/>! output-chan (first my-lines))   ;;If we managed to put
                     (recur (rest my-lines))         ;;then the chan is not closed. Recur with the rest of the lines.
                     (.close stream)))               ;;else close the HTTP stream.
    output-chan))
Sign up to request clarification or add additional context in comments.

1 Comment

this hangs for me. Calling (trades-chan url) will freeze my REPL
4

I think user1571406's answer is reasonable and gives a good introduction to combining clj-http with core.async. However, if you do not stick to clj-http, I would like to strongly recommend the http-kit library, which is more designed for asynchronous response handling. Using http-kit, you can write your call back as follows.

user> (require '[clojure.java.io :as io]
               '[org.httpkit.client :as h])
nil

user> (def url "http://hopey.netfonds.no/tradedump.php?date=20150508&paper=AAPL.O&csv_format=txt")
#'user/url

user> (h/get url {:as :stream}
             (fn [{:keys [status body]}]
               (if (= status 200)
                 (with-open [out (io/output-stream "/tmp/output.txt")]
                   (io/copy body out)))))
#<core$promise$reify__6363@373b22df: :pending>

The last h/get function call returns immediately, and its callback fn writes the body of the response to the file /tmp/output.txt asynchronously.

1 Comment

AFAIK http-kit buffers the entire output (and they recommend clj-http for non-buffered streams).
0
(ns asyncfun.core
  (:require [clojure.core.async :as async
             :refer [<! >!! go chan]]
            [clj-http.client :as client]))

(def url "http://hopey.netfonds.no/tradedump.php?date=20150508&paper=AAPL.O&csv_format=txt")

(def out-chan (chan))
(go (println (<! out-chan)))
  (>!! out-chan (client/get url))

I put this code together in a couple minutes. I think core.async is what you are looking for.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.