6

I have "countries" dataframe with map:

+--------------------+
|                 map|
+--------------------+
|[1-> Spain        |
|[2-> Germany   ...|
|[3-> Czech Repu...|
|[4-> Malta     ...|

How I can access value from map using key, then how it is possible to map values from column in other dataframe using map dataframe.

So from "sale" dataframe like this:

+--------------------+
|country_id | Sale   |
+--------------------+
|1          |200     |
|2          |565     |

country_id value will be mapped to country (and we will drop country_id column):

+--------------------+
|country    | Sale   |
+--------------------+
|Spain      |200     |
|Germany    |565     |

I know about alternative approach like using joins or dictionary maps but here question is only regarding spark maps. Tried functions like element_at but it haven't worked properly.

4
  • 2
    if countries and sale are two different dataframes, there is no (good) way to do this without a join. Did you try to explode your map and join on key / country_id? Commented Nov 13, 2020 at 15:29
  • yes I know how to do with join, just I am curious how to that with map as usually maps are for that something like map.key.get(value) Just I see that there are dozens spark functions to create maps but than usability is really low. Only function which I found is element_at but could not figure it out how to use it in real world scenario :-) Maps in theory are really good as when using them we have gurantee that number of rows (data model) will not change but how to use them in spark. Usual proposals are joins or use dictionary map but if someone created spark maps can we use them? Commented Nov 13, 2020 at 15:37
  • But the map isn't just a MapType() object - it's a column of MapType(IntegerType(), StringType()) in a DataFrame. If you had the former (i.e. a MapType) or a regular python dictionary, you could do something differently because you can push the "get value from key" to the execution plan. Commented Nov 13, 2020 at 15:56
  • 1
    Can you show the schema of the two dataframes? I'm particularly interested in the countries dataframe. Commented Nov 16, 2020 at 9:34

1 Answer 1

9
+50

If you are starting with the two Dataframes as shown in your example, the idiomatic way to obtain your desired output is through a join. (I assume your map DataFrame is small relative to the Sale DataFrame, you can probably get away with using a broadcast join.)

from pyspark.sql.functions import broadcast, col, explode, 
from pyspark.sql.types import IntegerType, MapType, StringType
from pyspark.sql.types import StructType, StructField

# set up data
map_df = spark.createDataFrame(
    [({1: "Spain"},),({2: "Germany"},),({3: "Czech Republic"},),({4: "Malta"},)],
    schema=StructType([StructField("map", MapType(IntegerType(), StringType()))])
)
sale_df = spark.createDataFrame([(1, 200), (2, 565)], ["country_id","Sale"])

# join
sale_df.join(
    broadcast(map_df.select(explode("map").alias("country_id", "country"))), 
    on="country_id",
    how="left"
).select("country", "Sale").show()
#+-------+----+
#|country|Sale|
#+-------+----+
#|  Spain| 200|
#|Germany| 565|
#+-------+----+

If instead, you had your mapping as a single MapType, you could avoid the join by pushing the evaluation of the map up in execution plan.

from pyspark.sql.functions import array, map_from_arrays, lit

my_dict = {1: "Spain", 2: "Germany", 3: "Czech Republic", 4: "Malta"}
my_map = map_from_arrays(
    array(*map(lit, my_dict.keys())),
    array(*map(lit, my_dict.values()))
)
print(my_map)
#Column<map_from_arrays(array(1, 2, 3, 4), array(Spain, Germany, Czech Republic, Malta))>

Now use getItem in your select statement:

sale_df.select(my_map.getItem(col("country_id")).alias("country"), "Sale").show()
#+-------+----+
#|country|Sale|
#+-------+----+
#|  Spain| 200|
#|Germany| 565|
#+-------+----+

And the execution plan:

sale_df.select(my_map.getItem(col("country_id")).alias("country"), "Sale").explain()
#== Physical Plan ==
#*(1) Project [keys: [1,2,3,4], values: [Spain,Germany,Czech Republic,Malta][cast(country_id#6L as int)] AS country#62, Sale#7L]
#+- Scan ExistingRDD[country_id#6L,Sale#7L]

Could you transform the data you have in the first method (DataFrame) to the second method? Yes - but it's almost surely not worth the overhead to do so.

Sign up to request clarification or add additional context in comments.

4 Comments

I am selecting this as a right answer. I think in future handling of maps in pyspark should be improved. In some other data processing tool is much better. For example in qlikview it is just ApplyMap('map name', value_be_mapped, 'value when there is no mapping') So in my opinion in future it should be something like: withColumn("country", ApplyMap.... (so ApplyMap could be spark function)
@HubertDudek IIUC, I think you can get what you want by wrapping a coalesce around the getItem(). Something like: sale_df.withColumn("country", coalesce(my_map.getItem(col("country_id")), lit("value when there is no mapping"))). You can define this as your own implementation of ApplyMap(map, value_to_map, default_value)
Or you can use a fillna at the end: sale_df.select(my_map.getItem(col("country_id")).alias("country"), "Sale").fillna("value when there is no map", subset=["country"])
Brilliant answer - now I understand what you meant under 'because you can push the "get value from key" to the execution plan' in the comment section above.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.