11

Is there a function similar to the collect_list or collect_set to aggregate a column of maps into a single map in a (grouped) pyspark dataframe? For example, this function might have the following behavior:

>>>df.show()

+--+---------------------------------+
|id|                             map |
+--+---------------------------------+
| 1|                    Map(k1 -> v1)|
| 1|                    Map(k2 -> v2)|
| 1|                    Map(k3 -> v3)|
| 2|                    Map(k5 -> v5)|
| 3|                    Map(k6 -> v6)|
| 3|                    Map(k7 -> v7)|
+--+---------------------------------+

>>>df.groupBy('id').agg(collect_map('map')).show()

+--+----------------------------------+
|id|                 collect_map(map) |
+--+----------------------------------+
| 1| Map(k1 -> v1, k2 -> v2, k3 -> v3)|
| 2|                     Map(k5 -> v5)|
| 3|           Map(k6 -> v6, k7 -> v7)|
+--+----------------------------------+

It probably wouldn't be too difficult to produce the desired result using one of the other collect_ aggregations and a udf, but it seems like something like this should already exist.

4 Answers 4

11

The suggested solution with concat_map deosn't work and this solution doesn't use UDFs.
For spark>=2.4

(df
.groupBy(f.col('id'))
.agg(f.collect_list(f.col('map')).alias('maps'), 
.select('id',
        f.expr('aggregate(slice(maps, 2, size(maps)), maps[0], (acc, element) -> map_concat(acc, element))').alias('mapsConcatenated')
        )
)

collect_list ignores the null values so no need to worry about them when using map_concat in aggregate function.

Sign up to request clarification or add additional context in comments.

Comments

10

I know it is probably poor form to provide an answer to your own question before others have had a chance to answer, but in case someone is looking for a udf based version, here is one possible answer.

from pyspark.sql.functions import udf,collect_list
from pyspark.sql.types import MapType,StringType

combineMap=udf(lambda maps: {key:f[key] for f in maps for key in f},
               MapType(StringType(),StringType()))

df.groupBy('id')\
  .agg(collect_list('map')\
  .alias('maps'))\
  .select('id',combineMap('maps').alias('combined_map')).show()

Comments

1

Using aggregate() with default map() of <string,string> datatype.

val df = Seq(
    
(1,"k1","v1"),
(1,"k2","v2"),
(1,"k3","v3"),
(2,"k5","v5"),
(3,"k6","v6"),
(3,"k7","v7")
    ).toDF("id","k","v")

df.show()

+---+---+---+
| id|  k|  v|
+---+---+---+
|  1| k1| v1|
|  1| k2| v2|
|  1| k3| v3|
|  2| k5| v5|
|  3| k6| v6|
|  3| k7| v7|
+---+---+---+

df.createOrReplaceTempView("id_map")

spark.sql("""
with
  t1 ( select *, map(k,v) m from id_map ),
  t2  (select id, collect_list(m) m1  from t1 group by id )
select id
  , aggregate(m1, cast(map() as map<string,string>), (acc,x) -> map_concat(acc,x)) m2 
from t2   
""").show(false)

+---+------------------------------+
|id |m2                            |
+---+------------------------------+
|1  |{k1 -> v1, k2 -> v2, k3 -> v3}|
|2  |{k5 -> v5}                    |
|3  |{k6 -> v6, k7 -> v7}          |
+---+------------------------------+

or using the struct() and map_from_entries()

spark.sql("""
with t1 ( select *, struct(k,v) m from id_map ),
     t2  (select id, collect_list(m) m1  from t1 group by id )
  select id, map_from_entries(m1) m2 from t2   
""").show(false)

+---+------------------------------+
|id |m2                            |
+---+------------------------------+
|1  |{k1 -> v1, k2 -> v2, k3 -> v3}|
|2  |{k5 -> v5}                    |
|3  |{k6 -> v6, k7 -> v7}          |
+---+------------------------------+

Comments

0

it's map_concat in the pyspark version >= 2.4

2 Comments

I have tried using the map_concat as the aggregator and I get this error : "expression 'test2' is neither present in the group by, nor is it an aggregate function." test2 is the name of the column that contains the maps.
This does not aggregate a list of maps as described in the question, rather merges multiple map columns to a single one.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.