1

I'm having an issue with getting my Kafka / confluent spring boot with gradle project up and running. I originally had just a producer in this test project and everything was running well. I then added a Kafka consumer and now I get an exception on start up. Would anyone be able to spot the problem here:

Firstly this is the stacktrace

2021-01-22 19:56:08.566  WARN 61123 --- [           main] ConfigServletWebServerApplicationContext : Exception encountered during context initialization - cancelling refresh attempt: org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
2021-01-22 19:56:08.573  INFO 61123 --- [           main] j.LocalContainerEntityManagerFactoryBean : Closing JPA EntityManagerFactory for persistence unit 'default'
2021-01-22 19:56:08.575  INFO 61123 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Shutting down ExecutorService 'applicationTaskExecutor'
2021-01-22 19:56:08.576  INFO 61123 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown initiated...
2021-01-22 19:56:08.584  INFO 61123 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown completed.
2021-01-22 19:56:08.586  INFO 61123 --- [           main] o.apache.catalina.core.StandardService   : Stopping service [Tomcat]
2021-01-22 19:56:08.597  INFO 61123 --- [           main] ConditionEvaluationReportLoggingListener : 

Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
2021-01-22 19:36:06.216 ERROR 61013 --- [           main] o.s.boot.SpringApplication               : Application run failed

org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:185)
    at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53)
    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360)
    at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158)
    at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122)
    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:895)
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:554)
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:143)
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:758)
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:750)
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:315)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1237)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226)
    at com.test.DevlyAuthServiceApplication.main(DevlyAuthServiceApplication.java:39)
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:825)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:631)
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createRawConsumer(DefaultKafkaConsumerFactory.java:358)
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:326)
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumerWithAdjustedProperties(DefaultKafkaConsumerFactory.java:302)
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:269)
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:243)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:639)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:305)
    at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:338)
    at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:204)
    at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:338)
    at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:312)
    at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:257)
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182)
    ... 14 common frames omitted
Caused by: java.lang.NoClassDefFoundError: org/apache/kafka/common/requests/IsolationLevel
    at io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor.configure(MonitoringConsumerInterceptor.java:46)
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:376)
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:436)
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:417)
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:404)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:711)
    ... 28 common frames omitted
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.requests.IsolationLevel
    at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:602)
    at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
    ... 34 common frames omitted

This is my build.gradle:

plugins {
    id 'org.springframework.boot' version '2.3.2.RELEASE'
    id 'io.spring.dependency-management' version '1.0.8.RELEASE'
    id 'java'
    id "com.commercehub.gradle.plugin.avro" version "0.21.0"
    id "idea"

}
group 'com.test.tge-auth-service'
version '1.0'

java {
    sourceCompatibility = JavaVersion.VERSION_14
    targetCompatibility = JavaVersion.VERSION_14
}

ext {
    avroVersion = "1.10.1"
}

repositories {
    mavenCentral()
    jcenter()
    maven {
        url "https://packages.confluent.io/maven/"
    }
}

avro {
    createSetters = true
    fieldVisibility = "PRIVATE"
}

dependencies {
//    providedRuntime 'org.springframework.boot:spring-boot-starter-tomcat'

    compile group: 'co.elastic.logging', name: 'logback-ecs-encoder', version: '0.5.2'
    compile group: 'com.amazonaws', name: 'aws-java-sdk', version: '1.11.860'

    compile group: 'io.springfox', name: 'springfox-swagger-ui', version: '3.0.0'
    compile group: 'io.springfox', name: 'springfox-boot-starter', version: '3.0.0'

    compile('org.springframework.boot:spring-boot-starter-data-elasticsearch')
    compile group: 'org.springframework.boot', name: 'spring-boot-starter-data-mongodb', version: '2.3.3.RELEASE'
    compile group: 'org.springframework.data', name: 'spring-data-elasticsearch', version: '4.0.4.RELEASE'
    compile group: 'org.springframework.boot', name: 'spring-boot-starter-data-jpa', version: '2.3.3.RELEASE'
    compile group: 'org.springframework.boot', name: 'spring-boot-starter-security', version: '2.3.3.RELEASE'
    compile group: 'org.springframework.security', name: 'spring-security-oauth2-client', version: '5.4.0'
    compile group: 'org.springframework.boot', name: 'spring-boot-starter-validation', version: '2.4.2'
    compile group: 'org.springframework.kafka', name: 'spring-kafka', version: '2.6.5'

    compile group: 'javax.validation', name: 'validation-api', version: '2.0.1.Final'
    compile group: 'com.fasterxml.jackson.datatype', name: 'jackson-datatype-jsr310', version: '2.11.2'

    compile group: 'mysql', name: 'mysql-connector-java', version: '8.0.21'

    compile group: 'io.jsonwebtoken', name: 'jjwt', version: '0.9.1'
    compile group: 'org.openapitools', name: 'jackson-databind-nullable', version: '0.2.1'

    compile group: 'commons-io', name: 'commons-io', version: '2.6'
    compile group: 'org.apache.commons', name: 'commons-collections4', version: '4.4'
    compile group: 'org.apache.commons', name: 'commons-lang3', version: '3.11'
    compile group: 'org.passay', name: 'passay', version: '1.6.0'
    compile group: 'com.google.guava', name: 'guava', version: '30.0-jre'

    compile group: 'io.confluent', name: 'kafka-schema-registry-client', version: '5.4.0'
    compile group: 'io.confluent', name: 'kafka-avro-serializer', version: '5.4.0'
    compile group: 'io.confluent', name: 'monitoring-interceptors', version: '5.4.0'
    compile(group: 'io.confluent', name: 'kafka-streams-avro-serde', version:'5.4.0') {
        exclude(module: 'log4j-over-slf4j')
    }

    compile "org.apache.avro:avro:1.10.1"
    implementation "org.apache.avro:avro:${avroVersion}"

    compileOnly 'org.projectlombok:lombok:1.18.12'
    annotationProcessor 'org.projectlombok:lombok:1.18.12'

    implementation 'com.amazonaws:aws-java-sdk-s3'
    implementation 'org.springframework.boot:spring-boot-starter-web'

    testCompile group: 'junit', name: 'junit', version: '4.12'
    testCompileOnly 'org.projectlombok:lombok:1.18.12'
    testAnnotationProcessor 'org.projectlombok:lombok:1.18.12'

    testImplementation('org.springframework.boot:spring-boot-starter-test') {
        exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
    }

    jar {
        manifest {
            attributes(
                    'Main-Class': 'com.test.SpringBootPersistenceApplication'
            )
        }
        from {
            configurations.runtimeClasspath.collect { it.isDirectory() ? it : zipTree(it) }
        }
    }
}

Here is my producer which works:

import com.test.messages.avro.model.User;
import lombok.RequiredArgsConstructor;
import lombok.extern.apachecommons.CommonsLog;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
@CommonsLog(topic = "Producer Logger")
@RequiredArgsConstructor
public class ProducerK {

  @Value("${topic.name}")
  private String TOPIC;

  private final KafkaTemplate<String, User> kafkaTemplate;

  void sendMessage(User user) {
    this.kafkaTemplate.send(this.TOPIC, "key", user);
    log.info(String.format("Produced user -> %s", user));
  }
}

And finally here is my Consumer:

import com.test.messages.avro.model.User;
import lombok.extern.apachecommons.CommonsLog;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

import java.io.IOException;

@Service
@CommonsLog(topic = "Consumer Logger")
public class Consumer {

    @KafkaListener(
            topics = "#{'${topic.name}'}",
            groupId = "simple-consumer"
    )

    public void consume(User record) throws IOException {
        log.info(String.format("Consumed message -> %s", record));
    }
}

If its of any help here is also my application.yaml file:

topic:
  name: users-2kb
  partitions-num: 3
  replication-factor: 1
spring:
  kafka:
    properties:
      bootstrap.servers: localhost:9092
      schema.registry.url: http://localhost:8081
    consumer:
      group-id: my-microservice
      auto-offset-reset: latest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
      properties:
        specific.avro.reader: true
        interceptor.classes: io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
      properties:
        interceptor.classes: io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
    template:
      default-topic:
logging:
  level:
    root: info

Thank you for any help you can provide (I'm going out of my mind with this :D)

1 Answer 1

5

Boot 2.3 uses spring-kafka 2.5 by default (and kafka-clients 2.5.0); since you have overridden its prescribed spring-kafka version to 2.6.5, you must override all of the kafka dependencies to match

kafka-clients 2.6.1, kafka-streams 2.6.1 (if you are using them).

If you are using the embedded Kafka broker in tests, there are a bunch of other jars you need. See https://docs.spring.io/spring-kafka/docs/current/reference/html/#update-deps

2.6.x is used by Boot 2.4 and will bring in all the right versions.

Confluent 5.4 uses Kafka 2.4.

You should use the version of confluent that matches Spring Boot's prescribed versions of spring-kafka, kafka-clients.

If you use Boot 2.4.x, use confluent 6.0.

https://docs.confluent.io/platform/current/installation/versions-interoperability.html

Sign up to request clarification or add additional context in comments.

7 Comments

Gary thank you for taking the time to respond. Sorry are you saying I need to match my spring boot version to the same as the kafka ones?
Or would upgrading to spring "2.4.2" solve the issue? Its only a test project for me to learn so updating isnt an issue. thank you once again!!!
Actually I just tried that and its still giving the exception
Also worth pointing out the Confluent dependencies also have specific Kafka client versions
docs.confluent.io/platform/current/installation/… Looks like you want Boot 2.4.2 and confluent platform 6.0 - they both use kafka-clients 2.6.x.
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.