1

This is the first time implementing stream processing infrastructure and my poison was storm 1.0.1, kafka 0.9.0 and Clojure 1.5.

Now I have background working with a messaging system (RabbitMQ) and I liked it for a couple of reasons.

  1. Simple to install and maintain
  2. Nice frontend web portal
  3. Persistent message states are maintained where I can start a consumer and it know which messages have not been consumed. i.e. "Exactly once"

However it cannot achieve the throughput I desire.

Now having gone through Kafka it heavily depends on manually maintaining offsets (internally in the Kafka broker,Zookeper or externally)

I at long last managed to create a spout in Clojure with the source being the Kafka broker which was nightmare.

Now like for most scenarios what I desire is "Exactly once messaging" and as per Kafka documentation states

So effectively Kafka guarantees at-least-once delivery by default and allows the user to implement at most once delivery by disabling retries on the producer and committing its offset prior to processing a batch of messages. Exactly-once delivery requires co-operation with the destination storage system but Kafka provides the offset which makes implementing this straight-forward.

What does this translate to for a clojure kafka spout, finding it hard to conceptualize.

I may have several boltz along the way but the end point is Postgres cluster. Am i to store the offset in the database (sounds like a race hazard waiting to happen) and on initialization of my storm cluster i fetch the offset from Postgres?

Also is there any danger of setting my parallelism for the Kafka spout to a number greater than one?

I generally used this as a starting point, as examples for many things are just not available in Clojure. With a few minor tweaks for the version i am using. (my messages don't quite come out as I expect them but at least i can see them)

(def ^{:private true
   :doc "kafka spout config definition"}
   spout-config (let [cfg (SpoutConfig. (ZkHosts. "127.0.0.1:2181") "test" "/broker"  (.toString (UUID/randomUUID)))]
             ;;(set! (. cfg scheme) (StringScheme.)) depricated
             (set! (. cfg scheme) (SchemeAsMultiScheme. (StringScheme.)))                
             ;;(.forceStartOffsetTime cfg -2) 
             cfg))



 (defn mk-topology []
 (topology
  {;;"1" (spout-spec sentence-spout)
    "1" (spout-spec my-kafka-spout :p 1)
    "2" (spout-spec (sentence-spout-parameterized
                 ["the cat jumped over the door"
                  "greetings from a faraway land"])
                 :p 2)}
   {"3" (bolt-spec {"1" :shuffle}
               split-sentence
               :p 5)
    "4" (bolt-spec {"3" ["word"]}
               word-count
               :p 1)}))

1 Answer 1

1

With any distributed system it's impossible to ensure that a portion of the work to be done will be worked on exactly once. At some point something will fail and it will need to retried (this is called "at least once" processing) or not retried (this is called "at most once" processing) though you can't have exactly the middle of that and get "exactly once" processing. What you can get is very close to exactly once processing.

The trick is to, at the end of your process, throw out the second copy if you then find that work was done twice. This is where the index comes in. When you are saving the result into the database, look to see if work with a later index than the index on this work as already been saved. If you find that this later work exists, then throw the work out and don't save it. As for the documentation, that's the kind of explanation that's only "strait forward" to people who have done it many times...

Sign up to request clarification or add additional context in comments.

1 Comment

this is insightful

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.