0

I have a following data frame (df) in spark

| group_1 | group_2 | year | value |
| "School1" | "Student" | 2018 | name_aaa |
| "School1" | "Student" | 2018 | name_bbb |
| "School1" | "Student" | 2019 | name_aaa |
| "School2" | "Student" | 2019 | name_aaa |

What I want to have is

| group_1 | group_2 | values_map |
| "School1" | "Student" | [2018 -> [name_aaa, name_bbb], [2019 -> [name_aaa] |
| "School2" | "Student" | [2019 -> [name_aaa] |

I tried it with groupBy and collect_list() & map() but it didn't work. It created a map with only last value from name_aaa or name_bbb. How can I achieve that with Apache Spark?

2 Answers 2

5

The result of the other answer is an array type not a map. Here is the way to achieve the map type column for your result.

df.groupBy("group_1", "group_2", "year").agg(collect_list("value").as("value_list"))
  .groupBy("group_1", "group_2").agg(collect_list(struct(col("year"), col("value_list"))).as("map_list"))
  .withColumn("values_map", map_from_entries(col("map_list")))
  .drop("map_list")
  .show(false)

I haven't used an udf. Then, the result directly shows your expected one.

+-------+-------+--------------------------------------------------+
|group_1|group_2|values_map                                        |
+-------+-------+--------------------------------------------------+
|School2|Student|[2019 -> [name_aaa]]                              |
|School1|Student|[2018 -> [name_aaa, name_bbb], 2019 -> [name_aaa]]|
+-------+-------+--------------------------------------------------+
Sign up to request clarification or add additional context in comments.

Comments

2

The solution could be:

scala> df1.show
+-------+-------+----+--------+
|group_1|group_2|year|   value|
+-------+-------+----+--------+
|school1|student|2018|name_aaa|
|school1|student|2018|name_bbb|
|school1|student|2019|name_aaa|
|school2|student|2019|name_aaa|
+-------+-------+----+--------+


scala> val df2 = df1.groupBy("group_1","group_2","year").agg(collect_list('value).as("value"))
df2: org.apache.spark.sql.DataFrame = [group_1: string, group_2: string ... 2 more fields]

scala> df2.show
+-------+-------+----+--------------------+
|group_1|group_2|year|               value|
+-------+-------+----+--------------------+
|school1|student|2018|[name_aaa, name_bbb]|
|school1|student|2019|          [name_aaa]|
|school2|student|2019|          [name_aaa]|
+-------+-------+----+--------------------+


scala> val myUdf = udf((year: String, values: Seq[String]) => Map(year -> values))
myUdf: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function2>,MapType(StringType,ArrayType(StringType,true),true),Some(List(StringType, ArrayType(StringType,true))))

scala> val df3 = df2.withColumn("values",myUdf($"year",$"value")).drop("year","value")
df3: org.apache.spark.sql.DataFrame = [group_1: string, group_2: string ... 1 more field]
scala> val df4 = df3.groupBy("group_1","group_2").agg(collect_list("values").as("value_map"))
df4: org.apache.spark.sql.DataFrame = [group_1: string, group_2: string ... 1 more field]

scala> df4.printSchema
root
 |-- group_1: string (nullable = true)
 |-- group_2: string (nullable = true)
 |-- value_map: array (nullable = true)
 |    |-- element: map (containsNull = true)
 |    |    |-- key: string
 |    |    |-- value: array (valueContainsNull = true)
 |    |    |    |-- element: string (containsNull = true)


scala> df4.show(false)
+-------+-------+------------------------------------------------------+
|group_1|group_2|value_map                                             |
+-------+-------+------------------------------------------------------+
|school1|student|[[2018 -> [name_aaa, name_bbb]], [2019 -> [name_aaa]]]|
|school2|student|[[2019 -> [name_aaa]]]                                |
+-------+-------+------------------------------------------------------+

Let me know if it helps!!

5 Comments

can you tell me the reason for downvote? Feedback will be greatly appreciated.
Thank you, it works. With what should I change Seq[String] if I would have a Struct<...> instead of String? edit: I already upvoted/accepted your solution. Probably it was someone else who downvoted your answer.
can you give me the details of the schema?
it's Person(id: Int, firstName: String, lastName: String)
Thanks man!! I think this link:stackoverflow.com/question/42931796/… might help you to find how to pass struct type to udf. Let me know if any questions!!

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.