15

I am trying to use structured streaming approach using Spark-Streaming based on DataFrame/Dataset API to load a stream of data from Kafka.

I use:

  • Spark 2.10
  • Kafka 0.10
  • spark-sql-kafka-0-10

Spark Kafka DataSource has defined underlying schema:

|key|value|topic|partition|offset|timestamp|timestampType|

My data come in json format and they are stored in the value column. I am looking for a way how to extract underlying schema from value column and update received dataframe to columns stored in value? I tried the approach below but it does not work:

 val columns = Array("column1", "column2") // column names
 val rawKafkaDF = sparkSession.sqlContext.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers","localhost:9092")
  .option("subscribe",topic)
  .load()
  val columnsToSelect = columns.map( x => new Column("value." + x))
  val kafkaDF = rawKafkaDF.select(columnsToSelect:_*)

  // some analytics using stream dataframe kafkaDF

  val query = kafkaDF.writeStream.format("console").start()
  query.awaitTermination()

Here I am getting Exception org.apache.spark.sql.AnalysisException: Can't extract value from value#337; because in time of creation of the stream, values inside are not known...

Do you have any suggestions?

0

1 Answer 1

13

From the Spark perspective value is just a byte sequence. It has no knowledge about the serialization format or content. To be able to extract the filed you have to parse it first.

If data is serialized as a JSON string you have two options. You can cast value to StringType and use from_json and provide a schema:

import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.from_json

val schema: StructType = StructType(Seq(
  StructField("column1", ???),
  StructField("column2", ???)
))

rawKafkaDF.select(from_json($"value".cast(StringType), schema))

or cast to StringType, extract fields by path using get_json_object:

import org.apache.spark.sql.functions.get_json_object

val columns: Seq[String] = ???

val exprs = columns.map(c => get_json_object($"value", s"$$.$c"))

rawKafkaDF.select(exprs: _*)

and cast later to the desired types.

Sign up to request clarification or add additional context in comments.

3 Comments

FWIW, i find both alternatives clunky: + the schema alternative requires schema specification using Spark native code. So if we are shipping a complicated schema, code has to parse a schema file (xsd etc) and build this object. + get_json_object alternative forces individually popping out paths/fields. I'm not sure whats the performance penalty of this. I'd have prefered if Apache Spark shipped with a easier way to cleanly accept a schema file and generate Spark/Catalyst schema object.
another way to get schema for messages (its yet another clunky way): stackoverflow.com/questions/48361177/…
i am having hard time to do it in python. can anyone suggest same.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.