4

I've developed a couple of C++ apps that produce and consume Kafka messages (using cppkafka) embedding Protobuf3 messages. Both work fine. The producer's relevant code is:

std::string kafkaString;
cppkafka::MessageBuilder *builder;
...
solidList->SerializeToString(&kafkaString);
builder->payload(kafkaString);

Protobuf objects are serialized to string and inserted as Kafka payload. Everything works fine up to this point. Now, I'm trying to develop a consumer for that in Java. The relevant code should be:

KafkaConsumer<Long, String> consumer=new KafkaConsumer<Long, String>(properties);
....
ConsumerRecords<Long, String> records = consumer.poll(100);
  for (ConsumerRecord<Long, String> record : records) {
    SolidList solidList = SolidList.parseFrom(record.value());
    ...

but that fails at compile time: parseFrom complains: The method parseFrom(ByteBuffer) in the type Solidlist.SolidList is not applicable for the arguments (String). So, I try using a ByteBuffer:

KafkaConsumer<Long, ByteBuffer> consumer=new KafkaConsumer<Long, ByteBuffer>(properties);
....
ConsumerRecords<Long, ByteBuffer> records = consumer.poll(100);
  for (ConsumerRecord<Long, ByteBuffer> record : records) {
    SolidList solidList = SolidList.parseFrom(record.value());
    ...

Now, the error is on execution time, still on parseFrom(): Exception in thread "main" java.lang.ClassCastException: java.lang.String cannot be cast to java.nio.ByteBuffer. I know it is a java.lang.String!!! So, I get back to the original, and try using it as a byte array:

    SolidList solidList = SolidList.parseFrom(record.value().getBytes());

Now, the error is on execution time: Exception in thread "main" com.google.protobuf.InvalidProtocolBufferException$InvalidWireTypeException: Protocol message tag had invalid wire type..

The protobuf documentation states for the C++ serialization: bool SerializeToString(string output) const;: serializes the message and stores the bytes in the given string. Note that the bytes are binary, not text; we only use the string class as a convenient container.*

TL;DR: In consequence, how should I interpret the protobuf C++ "binary bytes" in Java?

This seems related (it is the opposite) but doesn't help: Protobuf Java To C++ Serialization [Binary]

Thanks in advance.

1
  • I am new to C++ looking for an example to test protobuf serialization using Kafka C++ library. Would it be possible for you to share your sample code? Commented Aug 8, 2022 at 16:32

2 Answers 2

1

Try implementing a Deserializer and pass it to KafkaConsumer constructor as value deserializer. It could look like this:

class SolidListDeserializer implements Deserializer<SolidList> {
  public SolidList deserialize(final String topic, byte[] data) {
    return SolidList.parseFrom(data);
  }
  ...
}

...

KafkaConsumer<Long, SolidList> consumer = new KafkaConsumer<>(props, new LongDeserializer(), new SolidListDeserializer())
Sign up to request clarification or add additional context in comments.

1 Comment

Great! it works! (the deserializer is required, my mistake on a previous comment); thanks, that made the trick. Peace. – RodolfoAP 9 mins ago
1

You can read kafka as ConsumerRecords<Long, String>. And then SolidList.parseFrom(ByteBuffer.wrap(record.value().getBytes("UTF-8")));

1 Comment

I think I've already tried that: it gives Exception in thread "main" com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero).

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.