7

Getting exception while reading from kafka topic: Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 1 Caused by: org.apache.kafka.common.errors.SerializationException: Could not find class USERS specified in writer's schema whilst finding reader's schema for a SpecificRecord.

I think deserialising is not correct and i am not able to find any proper example also.

    Properties props = new Properties();

    props.put(StreamsConfig.APPLICATION_ID_CONFIG , "TEST");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");

    props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
            props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,Serdes.String().getClass().getName());
                    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,SpecificAvroSerde.class);

                    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
                    props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
    StreamsBuilder builder = new StreamsBuilder();




    KStream<String,USERS> valid = builder.stream("testUSERS");

valid.foreach((k,v)-> System.out.println("v ="+v.getUSERNAME()));
valid.foreach((k,v)-> System.out.println("k ="+k));

KafkaStreams streams = new KafkaStreams(builder.build(),props);

streams.cleanUp();

streams.start();
0

1 Answer 1

6

You are using kafka's configuration to require a specific schema in this case the consumed schema must exist in your class path so that it is possible to parse it, if not, configure to use GenericRecord

props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
Sign up to request clarification or add additional context in comments.

2 Comments

I face the same issue, what's the solution if we use a remote schema registry?
same solution being remote does not change the way lib will handle the serialization of avros.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.