We have a Java application which consumes Kafka messages, using org.apache.kafka.clients.consumer.KafkaConsumer
We have created a Spring Boot application with a Spring-Kafka dependency, but are unable to read the messages within the new project. Have checked the obvious parameters, including hostname and port of the bootstrap servers (which the logs show are recognized), the group, the topic and that Spring Boot, like the original consumer, uses StringDeserializer . Here is our configuration file:
spring:
kafka:
bootstrap-servers: hostname1:9092,hostname2:9092
consumer:
auto-offset-reset: earliest
group-id: our_group
enable-auto-commit: false
fetch-max-wait: 500
max-poll-records: 1
kafka:
topic:
boot: topic.name
and the receiver:
@Component
public class Receiver {
private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);
private CountDownLatch latch = new CountDownLatch(1);
public CountDownLatch getLatch() {
return latch;
}
@KafkaListener(topics = "${kafka.topic.boot}")
public void receive(ConsumerRecord<?, ?> consumerRecord) {
LOGGER.info("received payload='{}'", consumerRecord.toString());
latch.countDown();
}
}
Here is the code to start the Boot application:
@SpringBootApplication
public class EmsDemoUsingSpringBootApplication {
public static void main(String[] args) {
SpringApplication.run(EmsDemoUsingSpringBootApplication.class, args);
}
}
This exception is being caught:
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
Is there anything obvious I have overlooked? What is the best way to debug this?
Thanks
topic.name?Receiverclass in the same package (or child package) as the boot app?@KafkaListeneris enabled by@EnableKafkawhich boot should auto configure. Get an autoconfiguration report. Turn on DEBUG logging.@EnableKafka? and also try creating a bean ofConsumerFactoryas mentioned here codenotfound.com/spring-kafka-consumer-producer-example.html