3

These are Avros serialized with the Confluent platform.

I would like to find a working example like this:

https://github.com/seanpquig/confluent-platform-spark-streaming/blob/master/src/main/scala/example/StreamingJob.scala

but for Spark Structured Streaming.

 kafka
   .select("value")
   .map { row => 

     // this gives me test == testRehydrated    
     val test = Foo("bar") 
     val testBytes = AvroWriter[Foo].toBytes(test)
     val testRehydrated = AvroReader[Foo].fromBytes(testBytes)


     // this yields mangled Foo data
     val bytes = row.getAs[Array[Byte]]("value") 
     val rehydrated = AvroReader[Foo].fromBytes(bytes)
2
  • Did you find a working solution? Commented Feb 6, 2017 at 10:36
  • @aasthetic see below Commented Feb 7, 2017 at 17:33

2 Answers 2

3

We have been working on this library which may help: ABRiS (Avro Bridge for Spark)

It provides APIs for integrating Spark to Avro in both, read and write operations (streaming and batch). It also supports Confluent Kafka and integrates with Schema Registry.

DISCLAIMER: I work for ABSA and I am the main developer behind this library.

Sign up to request clarification or add additional context in comments.

5 Comments

after adding ABRiS dependencies to the POM file the method fromAvro is not there when I tried ABRiS with spark java api . Any help ?
@Vignesh, the library is written in Scala but should be possible to be imported into a Java class. How are you trying to import it?
after adding all the necessary dependencies in pom i.e(confluent and abris) I tried to import ABRiS libs as said in github.com/AbsaOSS/ABRiS Read me, but at the line Dataset<Row> ds = sparksession.readstream().format("kafka").option("kafka.bootstrap.server","localhost:9092").option("subscribe","topicname"). method fromavro is not there in the eclipse intellisense and there is no issue in build
fromConfluentAvro is not there even after all the imports, but in scala it works fine.
fromConfluentAvro method is not popping up in eclipse intelisense when we use java, but in scala it works fine. could you please shed some light on ABRiS using java
2

I figured out you have to use the Confluent platform decoder if you want to read their stuff.

def decoder: io.confluent.kafka.serializers.KafkaAvroDecoder = {
  val props = new Properties()
  props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, getSchemaRegistryUrl())
  props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true")
  val vProps = new kafka.utils.VerifiableProperties(props)
  new io.confluent.kafka.serializers.KafkaAvroDecoder(vProps)
}

2 Comments

thanks @zzztimbo, I also registered schema for my keys, they have to be of Long type. I am unable to deserialise them in Spark Streaming. Any idea?
It would be helpful to get a complete example of how to setup structured streaming, with a kafka topic + schema registry. It's not clear to me what to do with this decoder.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.