0

I have following code:

Properties properties = new Properties();
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyCustomClassDeserializer.class.getName());

FlinkKafkaConsumer<MyCustomClass> kafkaConsumer = new FlinkKafkaConsumer(
                    "test-kafka-topic",
                    new SimpleStringSchema(),
                    properties);

final StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<MyCustomClass> kafkaInputStream = streamEnv.addSource(kafkaConsumer);

DataStream<String> stringStream = kafkaInputStream
                    .map(new MapFunction<MyCustomClass,String>() {
                        @Override
                        public String map(MyCustomClass message) {
                            logger.info("--- Received message : " + message.toString());
                            return message.toString();
                        }
                    });

streamEnv.execute("Published messages");

MyCustomClassDeserializer is implemented to convert byte array to java object.

When I run this program locally, I get error:

Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Input mismatch: Basic type expected.

And I get this for code line:

.map(new MapFunction<MyCustomClass,String>() {

Not sure why I get this?

1 Answer 1

1

So, You have a deserializer that returns POJO, yet You are telling Flink that it should deserialize record from byte[] to String by using SimpleStringSchema. See the problem now? :)

I don't think You should use the custom Kafka deserializers in FlinkKafkaConsumer in general. What You should aim for instead is to instead create a custom class that extends DeserializationSchema from Flink. It should be much better in terms of type safety and testability.

Sign up to request clarification or add additional context in comments.

1 Comment

Thanks, got it. Actually its a legacy code, with too many such issues.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.