This is the first time implementing stream processing infrastructure and my poison was storm 1.0.1, kafka 0.9.0 and Clojure 1.5.
Now I have background working with a messaging system (RabbitMQ) and I liked it for a couple of reasons.
- Simple to install and maintain
- Nice frontend web portal
- Persistent message states are maintained where I can start a consumer and it know which messages have not been consumed. i.e. "Exactly once"
However it cannot achieve the throughput I desire.
Now having gone through Kafka it heavily depends on manually maintaining offsets (internally in the Kafka broker,Zookeper or externally)
I at long last managed to create a spout in Clojure with the source being the Kafka broker which was nightmare.
Now like for most scenarios what I desire is "Exactly once messaging" and as per Kafka documentation states
So effectively Kafka guarantees at-least-once delivery by default and allows the user to implement at most once delivery by disabling retries on the producer and committing its offset prior to processing a batch of messages. Exactly-once delivery requires co-operation with the destination storage system but Kafka provides the offset which makes implementing this straight-forward.
What does this translate to for a clojure kafka spout, finding it hard to conceptualize.
I may have several boltz along the way but the end point is Postgres cluster. Am i to store the offset in the database (sounds like a race hazard waiting to happen) and on initialization of my storm cluster i fetch the offset from Postgres?
Also is there any danger of setting my parallelism for the Kafka spout to a number greater than one?
I generally used this as a starting point, as examples for many things are just not available in Clojure. With a few minor tweaks for the version i am using. (my messages don't quite come out as I expect them but at least i can see them)
(def ^{:private true
:doc "kafka spout config definition"}
spout-config (let [cfg (SpoutConfig. (ZkHosts. "127.0.0.1:2181") "test" "/broker" (.toString (UUID/randomUUID)))]
;;(set! (. cfg scheme) (StringScheme.)) depricated
(set! (. cfg scheme) (SchemeAsMultiScheme. (StringScheme.)))
;;(.forceStartOffsetTime cfg -2)
cfg))
(defn mk-topology []
(topology
{;;"1" (spout-spec sentence-spout)
"1" (spout-spec my-kafka-spout :p 1)
"2" (spout-spec (sentence-spout-parameterized
["the cat jumped over the door"
"greetings from a faraway land"])
:p 2)}
{"3" (bolt-spec {"1" :shuffle}
split-sentence
:p 5)
"4" (bolt-spec {"3" ["word"]}
word-count
:p 1)}))