1

I'm writing a Spark application in Scala using Spark Structured Streaming that receive some data formatted in JSON style from Kafka. This application could receive both a single or multiple JSON object formatted in this way:

[{"key1":"value1","key2":"value2"},{"key1":"value1","key2":"value2"},...,{"key1":"value1","key2":"value2"}]

I tried to define a StructType like:

var schema = StructType(
                  Array(
                        StructField("key1",DataTypes.StringType),
                        StructField("key2",DataTypes.StringType)
             ))

But it doesn't work. My actual code for parsing JSON:

var data = (this.stream).getStreamer().load()
  .selectExpr("CAST (value AS STRING) as json")
  .select(from_json($"json",schema=schema).as("data"))

I would like to get this JSON objects in a dataframe like

+----------+---------+
|      key1|     key2|
+----------+---------+
|    value1|   value2|
|    value1|   value2|
        ........
|    value1|   value2|
+----------+---------+

Anyone can help me please? Thank you!

7
  • Before converting to JSON explode your array and it should work. Commented Dec 16, 2018 at 14:42
  • Refer this link, stackoverflow.com/questions/48361177/… I can't comment as I am yet to reach there.. Commented Dec 16, 2018 at 22:48
  • @Sc0rpion, the schema is always the same. The structure is the problem Commented Dec 18, 2018 at 18:45
  • @vindev I tried, but it doesn't work Commented Dec 18, 2018 at 18:45
  • @Vinc Can you share what you tried? and what error you got? Commented Dec 19, 2018 at 3:22

3 Answers 3

2

As your incoming string is Array of JSON, one way is to write a UDF to parse the Array, then explode the parsed Array. Below is the complete code with each steps explained. I have written it for batch but same can be used for streaming with minimal changes.

object JsonParser{

  //case class to parse the incoming JSON String
  case class JSON(key1: String, key2: String)

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.
      builder().
      appName("JSON").
      master("local").
      getOrCreate()

    import spark.implicits._
    import org.apache.spark.sql.functions._

    //sample JSON array String coming from kafka
    val str = Seq("""[{"key1":"value1","key2":"value2"},{"key1":"value3","key2":"value4"}]""")

    //UDF to parse JSON array String
    val jsonConverter = udf { jsonString: String =>
      val mapper = new ObjectMapper()
      mapper.registerModule(DefaultScalaModule)
      mapper.readValue(jsonString, classOf[Array[JSON]])
    }

    val df = str.toDF("json") //json String column
      .withColumn("array", jsonConverter($"json")) //parse the JSON Array
      .withColumn("json", explode($"array")) //explode the Array
      .drop("array") //drop unwanted columns
      .select("json.*") //explode the JSON to separate columns

    //display the DF
    df.show()
    //+------+------+
    //|  key1|  key2|
    //+------+------+
    //|value1|value2|
    //|value3|value4|
    //+------+------+

  }
}
Sign up to request clarification or add additional context in comments.

Comments

1

This worked fine for me in Spark 3.0.0 and Scala 2.12.10. I used schema_of_json to get the schema of the data in a suitable format for from_json, and applied explode and * operator in the last step of the chain to expand accordingly.

// TO KNOW THE SCHEMA
scala> val str = Seq("""[{"key1":"value1","key2":"value2"},{"key1":"value3","key2":"value4"}]""")
str: Seq[String] = List([{"key1":"value1","key2":"value2"},{"key1":"value3","key2":"value4"}])

scala> val df = str.toDF("json")
df: org.apache.spark.sql.DataFrame = [json: string]

scala> df.show()
+--------------------+
|                json|
+--------------------+
|[{"key1":"value1"...|
+--------------------+

scala> val schema = df.select(schema_of_json(df.select(col("json")).first.getString(0))).as[String].first
schema: String = array<struct<key1:string,key2:string>>

Use the resulting string as your schema: 'array<structkey1:string,key2:string>', as follows:

// TO PARSE THE ARRAY OF JSON's
scala> val parsedJson1 = df.selectExpr("from_json(json, 'array<struct<key1:string,key2:string>>') as parsed_json")
parsedJson1: org.apache.spark.sql.DataFrame = [parsed_json: array<struct<key1:string,key2:string>>]

scala> parsedJson1.show()
+--------------------+
|         parsed_json|
+--------------------+
|[[value1, value2]...|
+--------------------+

scala> val data = parsedJson1.selectExpr("explode(parsed_json) as json").select("json.*")
data: org.apache.spark.sql.DataFrame = [key1: string, key2: string]

scala> data.show()
+------+------+
|  key1|  key2|
+------+------+
|value1|value2|
|value3|value4|
+------+------+

Just FYI, without the star expansion the intermediate result looks as follows:

scala> val data = parsedJson1.selectExpr("explode(parsed_json) as json")
data: org.apache.spark.sql.DataFrame = [json: struct<key1: string, key2: string>]

scala> data.show()
+----------------+
|            json|
+----------------+
|[value1, value2]|
|[value3, value4]|
+----------------+

Comments

1
  1. You can add ArrayType to your schema and from_json would convert the data to json.
var schema = ArrayType(StructType(
                  Array(
                        StructField("key1", DataTypes.StringType),
                        StructField("key2", DataTypes.StringType)
             )))
  1. Explode it to get the json array element in each row.
val explodedDf = df.withColumn("jsonData", explode(from_json(col("value"), schema)))
.select($"jsonData").show
+----------------+
|        jsonData|
+----------------+
|[value1, value2]|
|[value3, value4]|
+----------------+
  1. Select the json keys
explodedDf.select("jsonData.*").show
+------+------+
|  key1|  key2|
+------+------+
|value1|value2|
|value3|value4|
+------+------+

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.