2

Suppose I have the following dataframe:

id | myStruct
___________________
1  | [val1, val2]
___________________
2  | [val3, val4]
___________________
1  | [val5, val6]

I would like to group all myStructs that share the same id into an array column of myStructs. So, the above dataframe should become

id | myStruct
__________________________________
1  | [[val1, val2], [val5, val6]]
__________________________________
2  | [[val3, val4]]

I know there is an array function, but that only converts each column into an array of size 1. How do I coalesce the resulting arrays?

I am using Spark 1.5.2 in a Scala shell.

Given that I am using Spark 1.5.2, I cannot use collect_list or collect_set.

2 Answers 2

3

If you use Spark 1.5 and you cannot upgrade the simplest option is RDD.groupByKey:

import org.apache.spark.sql.Row
import org.apache.spark.sql.types._

val rows = df.rdd
  .map { case Row(id, myStruct) => (id, myStruct) }
  .groupByKey
  .map { case (id, myStructs) => Row(id, myStructs) }

val schema =  StructType(Seq(
  df.schema("id"),
  StructField("myStructs", ArrayType(df.schema("myStruct").dataType))
))

sqlContext.createDataFrame(rows, schema)

It can be generalized by converting to "pairs" like this:

import org.apache.spark.sql.functions.struct

df.select(
  struct($"key1", $"key2", ..., $"keyn").alias("id"),
  struct($"val1", $"val2", ..., $"valn").alias("myStruct")
)
Sign up to request clarification or add additional context in comments.

Comments

1

In Spark 2.0+ you can use collect_list to accomplish this:

scala> val df = sc.parallelize(Seq((1, ("v1", "v2")), (2, ("v3", "v4")), (1, ("v5", "v6")))).toDF("id", "myStruct")
df: org.apache.spark.sql.DataFrame = [id: int, myStruct: struct<_1: string, _2: string>]

scala> df.show
+---+--------+
| id|myStruct|
+---+--------+
|  1| [v1,v2]|
|  2| [v3,v4]|
|  1| [v5,v6]|
+---+--------+

scala> df.groupBy("id").agg(collect_list($"myStruct")).show
+---+----------------------+                                                    
| id|collect_list(myStruct)|
+---+----------------------+
|  1|    [[v1,v2], [v5,v6]]|
|  2|             [[v3,v4]]|
+---+----------------------+

However in Spark 1.5.2 you would need something like this:

scala> val df2 = df.select($"id", $"myStruct._1".as("p1"), $"myStruct._2".as("p2"))
df2: org.apache.spark.sql.DataFrame = [id: int, p1: string, p2: string]

scala> df2.show
+---+---+---+
| id| p1| p2|
+---+---+---+
|  1| v1| v2|
|  2| v3| v4|
|  1| v5| v6|
+---+---+---+

scala> val rdd = df2.rdd.map{case Row(id: Int, p1: String, p2: String) => (id, (p1, p2))}
rdd: org.apache.spark.rdd.RDD[(Int, (String, String))] = MapPartitionsRDD[47] at map at <console>:32

scala> val finalDF = rdd.groupByKey.map(x => (x._1, x._2.toList)).toDF("id", "structs")
finalDF: org.apache.spark.sql.DataFrame = [id: int, structs: array<struct<_1:string,_2:string>>]

scala> finalDF.show
+---+------------------+
| id|           structs|
+---+------------------+
|  1|[[v1,v2], [v5,v6]]|
|  2|         [[v3,v4]]|
+---+------------------+

2 Comments

Correct, but I think collect_list was only introduced in Spark 1.6.0, hence OP can't use it (Spark 1.5.2) unless they upgrade...
Yes, unfortunately I cannot upgrade my spark, so I can't use collect_list.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.