1

I'm trying to process streaming avro data from kafka using spark structured streaming (version-2.3.1), so i tried with this example to de-serialize. It works only if the topics value part contains StringType, but in my case the schema contains long and integers like below:

public static final String USER_SCHEMA = "{"
        + "\"type\":\"record\","
        + "\"name\":\"variables\","
        + "\"fields\":["
        + "  { \"name\":\"time\", \"type\":\"long\" },"
        + "  { \"name\":\"thnigId\", \"type\":\"string\" },"
        + "  { \"name\":\"controller\", \"type\":\"int\" },"
        + "  { \"name\":\"module\", \"type\":\"int\" }"
        + "]}";

So it gives an exception at

sparkSession.udf().register("deserialize", (byte[] data) -> {
GenericRecord record = recordInjection.invert(data).get(); //throws error at invert method.
return RowFactory.create(record.get("time"), record.get("thingId").toString(), record.get("controller"), record.get("module"));
    }, DataTypes.createStructType(type.fields()));

saying

Failed to invert: [B@22a45e7
Caused by java.io.IOException: Invalid int encoding.

because I'm having time, controller and module in schema long and int types.

I guess this is some sort of encoding and decoding formats errors of byte array byte[] data.

2 Answers 2

2

Did you take a look at this https://issues.apache.org/jira/browse/AVRO-1650. It talks specifically about the issue you might be running into. The default UTF-8 encoding can result in loss during the encoding/decoding process.

I would also suggest if you are dealing with binary encoded data to use Base64 encoding to save/transmit the data as that utilizes ISO-8859-1 which is the right encoding the use per the link above.

Sign up to request clarification or add additional context in comments.

1 Comment

the issue was I tried to de-serialize confluent avro which has meta data long with the avro record that's the issue.
-1

i also faced this case, I guess maybe you configurate your kafka value-deserializer is default String deserializer, you can try to change the deserializer to org.apache.kafka.common.serialization.ByteArrayDeserializer.

that is my solution.

hope can help you

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.