2

I am trying to build a simple spring boot Kafka Consumer to consume messages from a kafka topic, however no messages get consumed as the KafkaListener method is not getting triggered.

I saw in other answers to make sure that AUTO_OFFSET_RESET_CONFIG is set to "earliest" and that the GROUP_ID_CONFIG is unique which I did, however still the KafkaListenerMethod is not triggering. The application simply starts and doesn't do anything:

Application Started

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v2.1.4.RELEASE)

This is a sub project of another gradle project and build.gradle for this subproject is as below (MAIN_CLASS_PATH has been correctly provided in the code):

apply plugin: 'application'

mainClassName = <MAIN_CLASS_PATH>

dependencies {
    compile "org.springframework.kafka:spring-kafka:${SpringKafkaVersion}"
    compile "org.springframework.boot:spring-boot-starter:${SpringBootVersion}"
    compile group: 'org.springframework', name: 'spring-tx', version: '5.2.2.RELEASE'
}

Java classes:

Start.java:

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Start {
    public static void main(String[] args) {
        try {
            System.out.println("Application Started");
            SpringApplication.run(Start.class, args);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

KafkaConsumerConfig.java:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import java.util.HashMap;
import java.util.UUID;

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        HashMap<String, Object> props = new HashMap<>();
        props.put(
                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                <KAFKA_SERVER_ADDRESS>);
        props.put(
                ConsumerConfig.GROUP_ID_CONFIG,
                UUID.randomUUID().toString());
        props.put(
                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        props.put(
                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String>
    kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

KafkaConsumer.java

@Service
public class KafkaConsumer {

    @KafkaListener(topics = <TOPIC_NAME>)
    public void consume(@Payload ConsumerRecord<String, String> message) {
        System.out.println("Consumed message: " + message);
    }
}

The KAFKA_SERVER_ADDRESS and TOPIC_NAME have been correctly provided within my code. Also I have checked that the topic actually contains messages in it already.

Any ideas as to why this doesn't consume any messages from a kafka topic?

4
  • It looks fine, although you really don't need to define a consumer or container factory - boot will auto-configure them from the application propertied (or yaml). Does enabling DEBUG logging provide any help? Commented Jan 6, 2020 at 18:48
  • Hi Gary, as per your suggestion I added DEBUG level logging. Commented Jan 6, 2020 at 20:13
  • Did it help? Is the KafkaConsumer class in the same package (or sub-package) as Start? Commented Jan 6, 2020 at 20:18
  • Don't put stuff like that in comments; it doesn't render well; edit the question instead, with a comment to say you have done so (we don't get notifications of edits). You should see an INFO log with something like ktest24: partitions assigned: [ktest24-0, ktest24-9, .... With DEBUG logging, you should see Received: 0 records and Commit list: {} every 5 seconds (by default) if there is nothing to fetch. If you don't see those, something is stuck and you'll have to take a thread dump to see what the consumer thread is doing. Commented Jan 6, 2020 at 21:06

3 Answers 3

1

Spring boot and spring kafka has autoconfiguration. For simple consumer, remove your configuration class, and add this properties to your application.yml (or .properties):

 spring.application.name: your-app-name
 spring.kafka.consumer.group-id: your-group-id
 spring.kafka.bootstrap-servers: server1:port1,server2:port2,server3:port3
Sign up to request clarification or add additional context in comments.

Comments

1

The issue was a kafka connectivity issue. The machine consuming from the kafka server was not permitted to consume from the kafka server. WhiteListing the consuming machine in the kafka server nodes solved the issue.

If the kafka server url can be resolved but can not be connected to due to insufficient permissions, the kafka consumer doesn't log that. This is not specific to spring kafka, but even the normal kafka clients consumer wasn't logging this.

Comments

0

In your listener add group -id. @KafkaListener(topics = "<topic-name", groupId = "". You have to load this in Consumer properties also.

1 Comment

Your answer could be improved with additional supporting information. Please edit to add further details, such as citations or documentation, so that others can confirm that your answer is correct. You can find more information on how to write good answers in the help center.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.