2

I am working on spark dataframes and I need to do a group by of a column and convert the column values of grouped rows into an array of elements as new column. Example :

Input:

employee | Address
------------------
Micheal  |  NY
Micheal  |  NJ

Output:

employee | Address
------------------
Micheal  | (NY,NJ)

Any help is highly appreciated.!

5
  • Seems like you can use groupByKey to get what you want, which will give you an Iterable of [Address]. Commented Mar 31, 2016 at 11:46
  • @Manas this is the error i get groupByKey is not a member of org.apache.spark.sql.DataFrame Commented Mar 31, 2016 at 11:56
  • 2
    show us your code..... Commented Mar 31, 2016 at 12:53
  • Hi @vds say your dataframe is Input => you write Input.groupBy(Input.col("employee")) . You can take a look at spark API reference. Commented Mar 31, 2016 at 19:25
  • 1
    look at collect_list() Commented Nov 22, 2016 at 21:04

2 Answers 2

5

Here is an alternate solution Where I have converted the dataframe to an rdd for the transformations and converted it back a dataFrame using sqlContext.createDataFrame()

Sample.json

{"employee":"Michale","Address":"NY"}
{"employee":"Michale","Address":"NJ"}
{"employee":"Sam","Address":"NY"}
{"employee":"Max","Address":"NJ"}

Spark Application

val df = sqlContext.read.json("sample.json")

// Printing the original Df
df.show()

//Defining the Schema for the aggregated DataFrame
val dataSchema = new StructType(
  Array(
    StructField("employee", StringType, nullable = true),
    StructField("Address", ArrayType(StringType, containsNull = true), nullable = true)
  )
)
// Converting the df to rdd and performing the groupBy operation
val aggregatedRdd: RDD[Row] = df.rdd.groupBy(r =>
          r.getAs[String]("employee")
        ).map(row =>
          // Mapping the Grouped Values to a new Row Object
          Row(row._1, row._2.map(_.getAs[String]("Address")).toArray)
        )

// Creating a DataFrame from the aggregatedRdd with the defined Schema (dataSchema)
val aggregatedDf = sqlContext.createDataFrame(aggregatedRdd, dataSchema)

// Printing the aggregated Df
aggregatedDf.show()

Output :

+-------+--------+---+
|Address|employee|num|
+-------+--------+---+
|     NY| Michale|  1|
|     NJ| Michale|  2|
|     NY|     Sam|  3|
|     NJ|     Max|  4|
+-------+--------+---+

+--------+--------+
|employee| Address|
+--------+--------+
|     Sam|    [NY]|
| Michale|[NY, NJ]|
|     Max|    [NJ]|
+--------+--------+
Sign up to request clarification or add additional context in comments.

1 Comment

This answer is OK but using RDD API is slower than using DataFrame API by a large margin (due to lack of query optimizer & tungsten)
5

If you're using Spark 2.0+, you can use collect_list or collect_set. Your query will be something like (assuming your dataframe is called input):

import org.apache.spark.sql.functions._

input.groupBy('employee).agg(collect_list('Address))

If you are ok with duplicates, use collect_list. If you're not ok with duplicates and only need unique items in the list, use collect_set.

Hope this helps!

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.