1

I trying to aggregate few fields in a dataset and transform them into json array format, I used concat_ws and lit functions to manually add the ":" separator, I am sure there should be some better way to do this, here is the code I tried so far, I am on spark 2.0.1 version, so no luck with to_json function.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.functions.collect_list
import org.apache.spark.sql.functions.concat_ws
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.functions.struct
import org.apache.spark.sql.functions.udf

object Zipper {
  val warehouseLocation = "file:///${system:user.dir}//spark-warehouse"
  val spark = SparkSession
    .builder()
    .appName("jsonconvert")
    .config("spark.master", "local")
    .config("spark.sql.warehouse.dir", warehouseLocation)
    .getOrCreate()
  import spark.implicits._
def main(args: Array[String]) = {

val df = Seq(
  ("john", "tomato", 1.99),
  ("john", "carrot", 0.45),
  ("bill", "apple", 0.99),
  ("john", "banana", 1.29),
  ("bill", "taco", 2.59)
).toDF("name", "food", "price")
df.show(false)



df.groupBy($"name")
  .agg(collect_list(struct(concat_ws(":",lit("food"),$"food"),concat_ws(":",lit("price"),$"price"))).as("foods"))
  .show(false)
}
} 



    +----+------------------------------------------------------------------------------+
|name|foods                                                                         |
+----+------------------------------------------------------------------------------+
|john|[[food:tomato,price:1.99], [food:carrot,price:0.45], [food:banana,price:1.29]]|
|bill|[[food:apple,price:0.99], [food:taco,price:2.59]]                             |
+----+------------------------------------------------------------------------------+

Expected Output

    +----+------------------------------------------------------------------------------+
|name|foods                                                                         |
+----+------------------------------------------------------------------------------+
|john|[{"food":"tomato","price":1.99}, {"food":"carrot","price":0.45}, {"food":"banana","price":1.29}]|
|bill|[{"food":"apple","price":0.99}, {"food":"taco","price":2.59}]                             |
+----+---------------------------------------------------------------------------

2 Answers 2

3

For Spark version prior to 2.1, try aggregate (food, price) by name, apply toJSON to the DataFrame, and extract JSON objects as follows:

import org.apache.spark.sql.functions._

df.groupBy($"name").agg(collect_list(struct($"food", $"price")).as("food_price")).
  toJSON.
  select(
    get_json_object($"value", "$.name").as("name"),
    get_json_object($"value", "$.food_price").as("foods")
  ).
  show(false)
// +----+----------------------------------------------------------------------------------------------+
// |name|foods                                                                                         |
// +----+----------------------------------------------------------------------------------------------+
// |john|[{"food":"tomato","price":1.99},{"food":"carrot","price":0.45},{"food":"banana","price":1.29}]|
// |bill|[{"food":"apple","price":0.99},{"food":"taco","price":2.59}]                                  |
// +----+----------------------------------------------------------------------------------------------+
Sign up to request clarification or add additional context in comments.

2 Comments

As, mentioned in the question, I am on a older spark version, which doesn't have to_json function readily available.
Missed that, my bad. Please see whether the revised solution works for you.
0
    import org.apache.spark.sql._
    import org.apache.spark.sql.functions._




    val df = Seq(
      ("john", "tomato", 1.99),
      ("john", "carrot", 0.45),
      ("bill", "apple", 0.99),
      ("john", "banana", 1.29),
      ("bill", "taco", 2.59)
    ).toDF("name", "food", "price")



    val vkDF2 = df.groupBy("name").agg(collect_list(struct(col("food"),col("price"))).alias("vaquarkhan_json"))

    vkDF2.show()

**Results :**

+----+--------------------+
|name|     vaquarkhan_json|
+----+--------------------+
|john|[[tomato,1.99], [...|
|bill|[[apple,0.99], [t...|
+----+--------------------+

import org.apache.spark.sql._
import org.apache.spark.sql.functions._
df: org.apache.spark.sql.DataFrame = [name: string, food: string ... 1 more field]
vkDF2: org.apache.spark.sql.DataFrame = [name: string, vaquarkhan_json: array<struct<food:string,price:double>>]

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.