1

I have a JSON string that I load into a Spark DataFrame. The JSON string can have between 0 and 3 key-value pairs.

When more than one kv pairs are sent, the product_facets is correctly formatted as an array like below:

{"id":1,
  "productData":{
  "product":{
  "product_name":"xyz",
  "product_facets":{"entry":[{"key":"test","value":"success"}, {"key": "test2","value" : "fail"}]}
 }}}

I can now use the explode function:

sourceDF.filter($"someKey".contains("some_string"))
  .select($"id", explode($"productData.product.product_facets.entry") as "kvPairs")

However when only one key value was sent, the source JSON string for entry is not formatted as an array with square braces:

{"id":1,
  "productData":{
  "product":{
  "product_name":"xyz",
  "product_facets":{"entry":{"key":"test","value":"success"}}
 }}}

The schema for product tag looks like:

|    |-- product: struct (nullable = true)
|    |    |-- product_facets: struct (nullable = true)
|    |    |    |-- entry: string (nullable = true)
|    |    |-- product_name: string (nullable = true)

How can I change the entry to an array of key value pairs that is compatible with the explode function. My end goal is to pivot the keys into individual columns and I want to use group by on exploding the kv pairs. I tried using from_json but could not get it to work.

    val schema =
    StructType(
      Seq(
        StructField("entry", ArrayType(
          StructType(
            Seq(
              StructField("key", StringType),
              StructField("value",StringType)
            )
          )
        ))
      )
    )

sourceDF.filter($"someKey".contains("some_string"))
      .select($"id", from_json($"productData.product.product_facets.entry", schema) as "kvPairsFromJson")

But the above does creates a new column kvPairsFromJson that looks like "[]" and using explode does nothing.

Any pointers on whats going on or if there is a better way to do this?

3
  • You have two kinds of data: one which its 'product_facets' is an array and another which its 'product_facets' is a string. Am I right? You're trying to load both and address them as a single-kind field (product_facets-wise). Is that the case? Commented Aug 15, 2019 at 7:29
  • @nir-hedvat Yes, that is correct. One is an array and in other cases it is a string and I want to treat them both as an array to be able to use the explode function. Commented Aug 15, 2019 at 14:22
  • This is not feasible using a simple SQL query because of the fact that Spark cannot address data with multiple schemas (both read & write). You should use UDF to achieve this. Take a look at this docs.databricks.com/spark/latest/spark-sql/udf-scala.html. Just pass the field that holds the data and always return an array for it. Commented Aug 15, 2019 at 14:58

1 Answer 1

1

I think one approach could be :
1. Create a udf which takes entry value as json string, and converts it to List( Tuple(K, V))
2. In udf, check if entry value is array or not and do conversion accordingly.

The code below explains above approach:

// one row where entry is array and other non-array
val ds = Seq("""{"id":1,"productData":{"product":{"product_name":"xyz","product_facets":{"entry":[{"key":"test","value":"success"},{"key":"test2","value":"fail"}]}}}}""", """{"id":2,"productData":{"product":{"product_name":"xyz","product_facets":{"entry":{"key":"test","value":"success"}}}}}""").toDS

val df = spark.read.json(ds)

// Schema used by udf to generate output column    
import org.apache.spark.sql.types._
val outputSchema = ArrayType(StructType(Seq(
  StructField("key", StringType, false),
  StructField("value", StringType, false)
)))

// Converts non-array entry value to array
val toArray = udf((json: String) => {

  import com.fasterxml.jackson.databind._
  import com.fasterxml.jackson.module.scala.DefaultScalaModule

  val jsonMapper = new ObjectMapper()
  jsonMapper.registerModule(DefaultScalaModule)

  if(!json.startsWith("[")) {
    val jsonMap = jsonMapper.readValue(json, classOf[Map[String, String]])
    List((jsonMap("key"), jsonMap("value")))
  } else {
    jsonMapper.readValue(json, classOf[List[Map[String, String]]]).map(f => (f("key"), f("value")))
  } 

}, outputSchema)

val arrayResult = df.select(col("id").as("id"), toArray(col("productData.product.product_facets.entry")).as("entry"))

val arrayExploded = df.select(col("id").as("id"), explode(toArray(col("productData.product.product_facets.entry"))).as("entry"))

val explodedToCols = df.select(col("id").as("id"), explode(toArray(col("productData.product.product_facets.entry"))).as("entry")).select(col("id"), col("entry.key").as("key"), col("entry.value").as("value"))

Results in:

scala> arrayResult.printSchema
root
 |-- id: long (nullable = true)
 |-- entry: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- key: string (nullable = false)
 |    |    |-- value: string (nullable = false)


scala> arrayExploded.printSchema
root
 |-- id: long (nullable = true)
 |-- entry: struct (nullable = true)
 |    |-- key: string (nullable = false)
 |    |-- value: string (nullable = false)

scala> arrayResult.show(false)
+---+--------------------------------+
|id |entry                           |
+---+--------------------------------+
|1  |[[test, success], [test2, fail]]|
|2  |[[test, success]]               |
+---+--------------------------------+

scala> arrayExploded.show(false)
+---+---------------+
|id |entry          |
+---+---------------+
|1  |[test, success]|
|1  |[test2, fail]  |
|2  |[test, success]|
+---+---------------+
Sign up to request clarification or add additional context in comments.

2 Comments

Thanks for writing a snippet. Thats really helpful. Before i accep this as an answer. I think we would need to handle cases where the input string to "json" can be empty. There are cases where product_facets is an empty string and how would one handle that in the udf?
I figured it out. the udf needs a slight modification to return an empty array in cases where product_facets is an empty string.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.