9

I created a PySpark dataframe using the following code

testlist = [
             {"category":"A","name":"A1"}, 
             {"category":"A","name":"A2"}, 
             {"category":"B","name":"B1"},
             {"category":"B","name":"B2"}
]

spark_df = spark.createDataFrame(testlist)

Result:

category    name
A           A1
A           A2
B           B1
B           B2

I want to make it appear as follows:

category    name
A           A1, A2
B           B1, B2

I tried the following code which does not work

spark_df.groupby('category').agg('name', lambda x:x + ', ')

Can anyone help identify what I am doing wrong and the best way to make this happen ?

3 Answers 3

24

One option is to use pyspark.sql.functions.collect_list() as the aggregate function.

from pyspark.sql.functions import collect_list
grouped_df = spark_df.groupby('category').agg(collect_list('name').alias("name"))

This will collect the values for name into a list and the resultant output will look like:

grouped_df.show()
#+---------+---------+
#|category |name     |
#+---------+---------+
#|A        |[A1, A2] |
#|B        |[B1, B2] |
#+---------+---------+

Update 2019-06-10: If you wanted your output as a concatenated string, you can use pyspark.sql.functions.concat_ws to concatenate the values of the collected list, which will be better than using a udf:

from pyspark.sql.functions import concat_ws

grouped_df.withColumn("name", concat_ws(", ", "name")).show()
#+---------+-------+
#|category |name   |
#+---------+-------+
#|A        |A1, A2 |
#|B        |B1, B2 |
#+---------+-------+

Original Answer: If you wanted your output as a concatenated string, you'd have to can use a udf. For example, you can first do the groupBy() as above and the apply a udf to join the collected list:

from pyspark.sql.functions import udf
concat_list = udf(lambda lst: ", ".join(lst), StringType())

grouped_df.withColumn("name", concat_list("name")).show()
#+---------+-------+
#|category |name   |
#+---------+-------+
#|A        |A1, A2 |
#|B        |B1, B2 |
#+---------+-------+
Sign up to request clarification or add additional context in comments.

2 Comments

Hi pault, thanks for your help. I am wondering how you got the answer. I tried exactly the same and I get the error 'NoneType does not have attribute withColumn'. Please let me know if I should correct something. Thanks !
Are you sure you didn't set grouped_df equal to that output of .show()? That error means the df for which you are calling withColumn() is actually None
4

UNIQUE values

If you want unique values then use collect_set instead of collect_list

from pyspark.sql.functions import collect_set
grouped_df = sdf.groupby('category').agg(collect_set('name').alias("unique_name"))
sdf.show(5)

Comments

1

Another option is this

>>> df.rdd.reduceByKey(lambda x,y: x+','+y).toDF().show()
+---+-----+
| _1|   _2|
+---+-----+
|  A|A1,A2|
|  B|B1,B2|
+---+-----+

2 Comments

Hi Bala, Thanks for your help. I tried your script on my data which has more than instances of each key. It gives me the error: ValueError: too many values to unpack (expected 2)
reduceByKey works with more than one instance of key. Can you give us sample data having exact pattern you are working with.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.