2

I came across this link for implementing the KafkaConsumer in Java. The below listed code reads the stream and processes the messages. Will this code keep listening to incoming messages once it is started? If so , how do it keep running and keep consuming the messages?

public void run(int a_numThreads) {
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(topic, new Integer(a_numThreads));
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

    // now launch all the threads
    //
    executor = Executors.newFixedThreadPool(a_numThreads);

    // now create an object to consume the messages
    //
    int threadNumber = 0;
    for (final KafkaStream stream : streams) {
        executor.submit(new ConsumerTest(stream, threadNumber));
        threadNumber++;
    }
}
3
  • 1
    I wonder why they decided to call it Kafka. Commented Feb 23, 2016 at 0:14
  • 1
    Kafka has its origins in the Czech language. The meaning of Kafka is 'resembling a bird'. Maybe a "dove" which passes message from source to destination ;) Commented Feb 23, 2016 at 5:06
  • How interesting! I speak Czech a little (very little) and I had no idea, and it's not in my dictionary. Commented Feb 23, 2016 at 6:33

1 Answer 1

2

The code will keep listening to incoming message once it is started since it use a ThreadPool (Number of thread in the threadpool = a_numThreads) and each of the thread in the threadpool execute a Consumer (ConsumerTest).

If you look closely at the ConsumerTest class, you'll see that there is an infinite loop. The it.hasNext() is blocking (see ConsumerIterator) so the consumer will always wait for the next message :

public void run() {
        ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
        while (it.hasNext())
            System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message()));
        System.out.println("Shutting down Thread: " + m_threadNumber);
}

With that said, it is possible to set a timeout with the property consumer.timeout.ms that Throw a timeout exception to the consumer if no message is available for consumption after a specified interval of time, but the default value is -1 (no timeout) so by default, consumer will always keep consuming messages till the consumer and the executor is shutdown (See the shutdown method in the example).

Sign up to request clarification or add additional context in comments.

2 Comments

Thanks Jean! Is the iterator an implementation provided by Kafka which causes the program to block and wait for next message?
Exactly, the iterator is implemented in Kafka and is backed by a java BlockingQueue. "An iterator that blocks until a value can be read from the supplied queue. The iterator takes a shutdownCommand object which can be added to the queue to trigger a shutdown" (github.com/kafka-dev/kafka/blob/master/core/src/main/scala/…)

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.