0

Using spark dataframe.

scala> val df_input = Seq( ("p1", """{"a": 1, "b": 2}"""), ("p2", """{"c": 3}""") ).toDF("p_id", "p_meta")
df_input: org.apache.spark.sql.DataFrame = [p_id: string, p_meta: string]

scala> df_input.show()
+----+----------------+
|p_id|          p_meta|
+----+----------------+
|  p1|{"a": 1, "b": 2}|
|  p2|        {"c": 3}|
+----+----------------+

Given this input df, is it possible to split it by json key to create a new df_output like the output below?

df_output =

p_id    p_meta_key      p_meta_value
 p1         a                1
 p1         b                2
 p2         c                3

I am using spark version 3.0.0 / scala 2.12.x . and I prefer to using spark.sql.functions._

2 Answers 2

3

Another alternative-

from_json + explode

 val df_input = Seq( ("p1", """{"a": 1, "b": 2}"""), ("p2", """{"c": 3}""") )
      .toDF("p_id", "p_meta")
    df_input.show(false)
    /**
      * +----+----------------+
      * |p_id|p_meta          |
      * +----+----------------+
      * |p1  |{"a": 1, "b": 2}|
      * |p2  |{"c": 3}        |
      * +----+----------------+
      */

    df_input.withColumn("p_meta", from_json($"p_meta", "map<string, string>", Map.empty[String, String]))
      .selectExpr("p_id", "explode(p_meta) as (p_meta_key, p_meta_value)")
      .show(false)
    /**
      * +----+----------+------------+
      * |p_id|p_meta_key|p_meta_value|
      * +----+----------+------------+
      * |p1  |a         |1           |
      * |p1  |b         |2           |
      * |p2  |c         |3           |
      * +----+----------+------------+
      */
Sign up to request clarification or add additional context in comments.

Comments

1

the below code will solve your problem, I have tested this in spark 3.0.0/scala 2.12.10.


import org.apache.spark.sql.functions._

val df_input = Seq( ("p1", """{"a": 1, "b": 2}"""), ("p2", """{"c": 3}""") ).toDF("p_id", "p_meta")
df_input.show()

/*
+----+----------------+
|p_id|          p_meta|
+----+----------------+
|  p1|{"a": 1, "b": 2}|
|  p2|        {"c": 3}|
+----+----------------+
*/
//UDF to convert JSON to MAP
def convert(str:String):Map[String,String]={
     "(\\w+): (\\w+)".r.findAllIn(str).matchData.map(i => {
     (i.group(1), i.group(2))
     }).toMap
     }
val udfConvert=spark.udf.register("udfConvert",convert _)

//Remove double quotes 
val df=df_input.withColumn("p_meta", regexp_replace($"p_meta", "\"", ""))
df.show()

/*
+----+------------+
|p_id|      p_meta|
+----+------------+
|  p1|{a: 1, b: 2}|
|  p2|      {c: 3}|
+----+------------+
*/

val df1=df.withColumn("new_col",udfConvert($"p_meta"))

/*
+----+------------+----------------+
|p_id|      p_meta|         new_col|
+----+------------+----------------+
|  p1|{a: 1, b: 2}|[a -> 1, b -> 2]|
|  p2|      {c: 3}|        [c -> 3]|
+----+------------+----------------+
*/

df1.select($"p_id",$"p_meta",$"new_col",explode($"new_col")).drop($"p_meta").drop($"new_col").withColumn("p_meta_key",$"key").withColumn("p_mata_value",$"value").drop($"key").drop($"value").show()
/*
+----+----------+------------+
|p_id|p_meta_key|p_mata_value|
+----+----------+------------+
|  p1|         a|           1|
|  p1|         b|           2|
|  p2|         c|           3|
+----+----------+------------+
*/

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.