3

In my simple application i am trying to instantiate a KafkaConsumer my code is nearly a copy of the code from javadoc ("Automatic Offset Committing"):

@Slf4j
public class MyKafkaConsumer {

    public MyKafkaConsumer() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe( Arrays.asList("mytopic"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
                log.info( record.offset() + record.key() + record.value() );
                //System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
    }
}

If i try to instantiate this i get:

org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:781)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:635)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:617)
at ...MyKafkaConsumer.<init>(SikomKafkaConsumer.java:23)
    ...
    Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.serialization.StringDeserializer is not an instance of org.apache.kafka.common.serialization.Deserializer
        at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:248)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:680)
        ... 48 more

How to fix this?

4
  • How did you solve this? Commented Jul 17, 2018 at 19:54
  • i am not sure, but the code that is working now is the same as in my question but probably it was a problem with dependency: i have this one: 'org.apache.kafka:kafka-clients:1.0.0' Commented Jul 18, 2018 at 7:26
  • thanks. for me too, it was a problem with transitive dependency Commented Jul 18, 2018 at 18:57
  • you could add this as an answer, I'll accept Commented Jul 19, 2018 at 7:47

3 Answers 3

2

This might be the problem with Kafka classloading.
Setting classloader to null might help.

...
Thread currentThread = Thread.currentThread();    
ClassLoader savedClassLoader = currentThread.getContextClassLoader();

currentThread.setContextClassLoader(null);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

currentThread.setContextClassLoader(savedClassLoader);
...

There is full explanation:
https://stackoverflow.com/a/50981469/1673775

Sign up to request clarification or add additional context in comments.

Comments

1

Not sure if this is what finally fixed your error, but note that when using spring-kafka-test (version 2.1.x, starting with version 2.1.5) with the 1.1.x kafka-clients jar, you will need to override certain transitive dependencies as follows:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>${spring.kafka.version}</version>
</dependency>

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka-test</artifactId>
    <version>${spring.kafka.version}</version>
    <scope>test</scope>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.1.1</version>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.1.1</version>
    <classifier>test</classifier>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>1.1.1</version>
    <scope>test</scope>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>1.1.1</version>
    <classifier>test</classifier>
    <scope>test</scope>
</dependency>

so it could've been a problem with your transitive dependency for sure

Comments

-1

Your Custom class need to implement, org.apache.kafka.common.serialization.Deserializer.

like

import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.codehaus.jackson.map.ObjectMapper;

import java.io.Serializable;
import java.util.Map;

//Developed by Arun Singh
public class Employee implements Serializable, Serializer, **Deserializer** {

@Override
    public Object deserialize(String s, byte[] bytes) {
        ObjectMapper mapper = new ObjectMapper();
        Employee employee = null;
        try {
            //employee = mapper.readValue(bytes, Employee.class);
            employee = mapper.readValue(bytes.toString(), Employee.class);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return employee;
    }

    @Override
    public Object deserialize(String topic, Headers headers, byte[] data) {
        ObjectMapper mapper = new ObjectMapper();
        Employee employee = null;
        try {
            //employee = mapper.readValue(bytes, Employee.class);
            employee = mapper.readValue(data.toString(), Employee.class);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return employee;
    }

    public void close() {

    }
}

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.