5

Consider the following DataFrame. Here I want the array of maps merged into one map without using UDFs.

+---+------------------------------------+
|id |greek                               |
+---+------------------------------------+
|1  |[{alpha -> beta}, {gamma -> delta}] |
|2  |[{epsilon -> zeta}, {etha -> theta}]|
+---+------------------------------------+

I think I've tried all the mapping funcions in the pyspark 3 docs. I thought I'd be able to do map_from_entries, but it just throws an exception where it says it requires maps and not an array of maps?

Although I'm aware that this is easily done using UDFs, I find it hard to believe that there are no easier way?

Runnable python code

from pyspark.sql import SparkSession
spark = (
    SparkSession
    .builder
    .getOrCreate()
)

df = spark.createDataFrame([
    (1, [{"alpha": "beta"}, {"gamma": "delta"}]),
    (2, [{"epsilon": "zeta"}, {"etha": "theta"}])
],
    schema=["id", "greek"]
)

3 Answers 3

2

Another version using high-order functions:

map_schema = df.selectExpr('greek[0]').dtypes[0][1]

expr = "REDUCE(greek, cast(map() as {schema}), (acc, el) -> map_concat(acc, el))".format(schema=map_schema)
df = df.withColumn("Concated", F.expr(expr))

Output:

+---+------------------------------------+--------------------------------+
|id |greek                               |Concated                        |
+---+------------------------------------+--------------------------------+
|1  |[{alpha -> beta}, {gamma -> delta}] |{alpha -> beta, gamma -> delta} |
|2  |[{epsilon -> zeta}, {etha -> theta}]|{epsilon -> zeta, etha -> theta}|
+---+------------------------------------+--------------------------------+
Sign up to request clarification or add additional context in comments.

Comments

1

I figured one approach that would use the aggregate built-in:

import pyspark.sql.functions as F

## Aggregate needs a column with the array to be iterated,
## an initial value and a merge function.

## For the initial value, we need an empty map with corresponding map schema 
## which evaluates to (map<string,string>) in this case

map_schema = df.selectExpr('greek[0]').dtypes[0][1]
## F.create_map() creates a 'map<null,null>' type.
empty_map = F.create_map().cast(map_schema)

df.withColumn("Concated",
              F.aggregate(
                # Values to iterate
                col=F.col("greek"),
                # Initial value
                initialValue=empty_map,
                merge = lambda acc, el: F.map_concat(acc, el)
              )
)

Edit

As pointed out by @kafels, the issue about duplicates should be addressed. According to the spark configuration docs it would throw an exception if the keys are duplicate. To avoid this, and let the last key win, set the following spark sql option:

spark.conf.set('spark.sql.mapKeyDedupPolicy', 'LAST_WIN')

4 Comments

you can try using F.create_map().cast('map<string,string>') as initialValue and do it without slicing
Awesome, @mck. If you know any clean way of getting the map schema/type from the column, then that would be awesome!
df.selectExpr('greek[0]').dtypes[0][1]
Remember to set spark.conf.set('spark.sql.mapKeyDedupPolicy', 'LAST_WIN') option if you want merge duplicate keys
1

My approach is explode the parent list, explode the keys, explode the values, then merge them all together

(df
    .withColumn('g', F.explode('greek'))
    .withColumn('k', F.explode(F.map_keys('g')))
    .withColumn('v', F.explode(F.map_values('g')))
    .groupBy('id')
    .agg(
        F.collect_list('k').alias('key'),
        F.collect_list('v').alias('value')
    )
    .withColumn('single_map', (F.map_from_arrays('key', 'value')))
    .show(10, False)
)

# +---+---------------+-------------+--------------------------------+
# |id |key            |value        |single_map                      |
# +---+---------------+-------------+--------------------------------+
# |1  |[alpha, gamma] |[beta, delta]|{alpha -> beta, gamma -> delta} |
# |2  |[epsilon, etha]|[zeta, theta]|{epsilon -> zeta, etha -> theta}|
# +---+---------------+-------------+--------------------------------+

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.