3

I am reading a dynamodb table from Spark, this table has one JSON string in one field and strings in other fields. I am able to read the JSON fields but not the nested JSON fields. This is not a DUPLICATE of query Json Column using dataframes. The question does explain how to extract columns from JSON string but not the Nested JSON columns.

import com.github.traviscrawford.spark.dynamodb._
val users = sqlContext.read.dynamodb("Dynamodb_table")

users.show(1)

Sample Data set

 |col1                                                        | ID | field2|field3|
 -------------------------------------------------------------------------------------
 |{"a":[{"b":"value1","x":23},{"b":value2,"x":52}],"c":"valC"}|A1  | X1    |Y1    |

I need to extract few fields from col1(JSON structure) and ID field. I am able to figure out how to parse the JSON field(col1) and get field 'c' from col1 as explained here but not able to extract the nested fields.

My code:

val users = sqlContext.read.dynamodb("Dynamodb_table")
val data = users.selectExpr("get_json_object(col1, '$.c')","get_json_object(col1, '$.a')","ID")

data.show(1,false)
|a                                              |c   |ID|
---------------------------------------------------------
|[{"b":"value1","x":23},{"b":value2","x":52}...]|valC|A1|

Now when i try to apply the same get_json_object on above data frame, i get all null values.

val nestedData = data.selectExpr("get_json_object(a, '$.b')","c","ID")
nestedData.show(false)

|get_json_object(a, '$.b')| c  | ID|
------------------------------------
|null                     |valC|A1 |    

I tried explode as well since col 'a' has array and struct. But that didn't work either as the data frame 'data' is returning col/field 'a' as a string instead of an array.Any ideas how to solve this?

Update: I also tried parsing using JSON4s and net.liftweb.json.parse . That didn't help either

case class aInfo(b: String) 
case class col1(a: Option[aInfo]), c: String)

import net.liftweb.json.parse
val parseJson = udf((data: String) => {
implicit val formats = net.liftweb.json.DefaultFormats
parse(data).extract[Data]
})

val parsed = users.withColumn("parsedJSON", parseJson($"data"))
parsed.show(1)

All values came out as null when i used these parsers.

My expected result: I am trying to get a flattened out structure from the dataset

|b     |x |c   | ID|
--------------------
|value1|23|valC|A1 |
|value2|52|valC|A1 |

1 Answer 1

2

I believe that all required pieces of the puzzle are already here so let's follow this step by step. Your data is equivalent to:

val df = Seq((
  """{"a":[{"b":"value1"},{"b": "value2"}],"c":"valC"}""", "A1", "X1", "Y1"
)).toDF("col1", "ID",  "field2", "field3")

Spark provides json4s which implements the same query API as Lift:

import org.json4s._
import org.json4s.jackson.JsonMethods._

and we can use for example LINQ style API to define an UDF:

val getBs = udf((s: String) => for { 
  JString(b) <- parse(s) \ "a" \ "b" 
} yield b)

If you want to extract multiple fields you can of course extend this. For example if JSON string has multiple fields

{"a":[{"b":"value1","d":1},{"b":"value2","d":2}],"c":"valC"}

you can:

for  {
  JObject(a) <- parse(s) \ "a"
  JField("b", JString(b))  <- a
  JField("d", JInt(d))  <- a
} yield (b, d)

This assumes that both fields are present otherwise there won't be a match. To handle missing fields you may prefer XPath-like expressions or extractors:

case class A(b: Option[String], d: Option[Int])

(parse(s) \ "a").extract(Seq[A])

UDF like this can be uses with explode to extract fields:

val withBs = df.withColumn("b", explode(getBs($"col1")))

with result:

+--------------------+---+------+------+------+
|                col1| ID|field2|field3|     b|
+--------------------+---+------+------+------+
|{"a":[{"b":"value...| A1|    X1|    Y1|value1|
|{"a":[{"b":"value...| A1|    X1|    Y1|value2|
+--------------------+---+------+------+------+

Your attempt to use Lift is incorrect because you expect a to be sequence of aInfo but define it only as Option[aInfo]. It should be Option[Seq[aInfo]]:

case class col1(a: Option[Seq[aInfo]], c: String)

With class defined like this parsing should work without an issue.

If you use a current build (Spark 2.1.0) there is a from_json method introduced by SPARK-17699 which requires a schema:

import org.apache.spark.sql.types._

val bSchema = StructType(Seq(StructField("b", StringType, true)))
val aSchema = StructField("a", ArrayType(bSchema), true)
val cSchema = StructField("c", StringType, true)

val schema =  StructType(Seq(aSchema, cSchema))

and can be applied as:

import org.apache.spark.sql.functions.from_json

val parsed = df.withColumn("col1", from_json($"col1", schema))

After that you can select fields using usual notation:

parsed.select($"col1.a.b")
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.