5

I have a file which has bunch of columns and one column called jsonstring is of string type which has json strings in it… let's say the format is the following:

{
    "key1": "value1",
    "key2": {
        "level2key1": "level2value1",
        "level2key2": "level2value2"
    }
}

I want to parse this column something like this: jsonstring.key1,jsonstring.key2.level2key1 to return value1, level2value1

How can I do that in scala or spark sql.

0

2 Answers 2

9

With Spark 2.2 you could use the function from_json which does the JSON parsing for you.

from_json(e: Column, schema: String, options: Map[String, String]): Column parses a column containing a JSON string into a StructType or ArrayType of StructTypes with the specified schema.

With the support for flattening nested columns by using * (star) that seems the best solution.

// the input dataset (just a single JSON blob)
val jsonstrings = Seq("""{
    "key1": "value1",
    "key2": {
        "level2key1": "level2value1",
        "level2key2": "level2value2"
    }
}""").toDF("jsonstring")

// define the schema of JSON messages
import org.apache.spark.sql.types._
val key2schema = new StructType()
  .add($"level2key1".string)
  .add($"level2key2".string)
val schema = new StructType()
  .add($"key1".string)
  .add("key2", key2schema)
scala> schema.printTreeString
root
 |-- key1: string (nullable = true)
 |-- key2: struct (nullable = true)
 |    |-- level2key1: string (nullable = true)
 |    |-- level2key2: string (nullable = true)

val messages = jsonstrings
  .select(from_json($"jsonstring", schema) as "json")
  .select("json.*") // <-- flattening nested fields
scala> messages.show(truncate = false)
+------+---------------------------+
|key1  |key2                       |
+------+---------------------------+
|value1|[level2value1,level2value2]|
+------+---------------------------+

scala> messages.select("key1", "key2.*").show(truncate = false)
+------+------------+------------+
|key1  |level2key1  |level2key2  |
+------+------------+------------+
|value1|level2value1|level2value2|
+------+------------+------------+
Sign up to request clarification or add additional context in comments.

Comments

4

You can use withColumn + udf + json4s:

import org.json4s.{DefaultFormats, MappingException}
import org.json4s.jackson.JsonMethods._
import org.apache.spark.sql.functions._

def getJsonContent(jsonstring: String): (String, String) = {
    implicit val formats = DefaultFormats
    val parsedJson = parse(jsonstring)  
    val value1 = (parsedJson \ "key1").extract[String]
    val level2value1 = (parsedJson \ "key2" \ "level2key1").extract[String]
    (value1, level2value1)
}
val getJsonContentUDF = udf((jsonstring: String) => getJsonContent(jsonstring))

df.withColumn("parsedJson", getJsonContentUDF(df("jsonstring")))

1 Comment

thanks this was helpful...does anyone has some snippet to handle the exception ...like for e.g. if key2 does not exist, it fails..i would like to handle it and just say not found or something like that...I am sure its trivial and I can figure it out but will be good to get snippet if someone has it ;)

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.