1

I have a broadcast a dicitonary that I would like to use for mapping column values in my DataFrame. Let's say I call withColumn() method for that.

I can only get it to work with a UDF, but not directly:

sc = SparkContext()
ss = SparkSession(sc)
df = ss.createDataFrame( [ "a", "b" ], StringType() ).toDF("key")
# +---+                                                                           
# |key|
# +---+
# |  a|
# |  b|
# +---+
thedict={"a":"A","b":"B","c":"C"}
thedict_bc=sc.broadcast(thedict)

Referencing with a literal or using UDF works fine:

df.withColumn('upper',lit(thedict_bc.value.get('c',"--"))).show()
# +---+-----+
# |key|upper|
# +---+-----+
# |  a|    C|
# |  b|    C|
# +---+-----+
df.withColumn('upper',udf(lambda x : thedict_bc.value.get(x,"--"), StringType())('key')).show()
# +---+-----+
# |key|upper|
# +---+-----+
# |  a|    A|
# |  b|    B|
# +---+-----+

However, accessing the dictionary directly from the command doesn't:

df.withColumn('upper',lit(thedict_bc.value.get(col('key'),"--"))).show()
# +---+-----+
# |key|upper|
# +---+-----+
# |  a|   --|
# |  b|   --|
# +---+-----+
df.withColumn('upper',lit(thedict_bc.value.get(df.key,"--"))).show()
# +---+-----+
# |key|upper|
# +---+-----+
# |  a|   --|
# |  b|   --|
# +---+-----+
df.withColumn('upper',lit(thedict_bc.value.get(df.key.cast("string"),"--"))).show()
# +---+-----+
# |key|upper|
# +---+-----+
# |  a|   --|
# |  b|   --|
# +---+-----+

Am I missing something obvious?

1 Answer 1

3

TL;DR You're mixing up things which belong to completely different context. Symbolic SQL expressions (lit, col, etc.) and plain Python code.

You are mixing up the contexts. Following line:

thedict_bc.value.get(col('key'),"--")))

is executed in Python on the driver and is literally a local dictionary lookup. thedict doesn't contain col('key') (literal, there is no expansion involved) you you always get default value.

Personally I would use a simple join:

lookup = sc.parallelize(thedict.items()).toDF(["key", "upper"])
df.join(lookup, ["key"], "left").na.fill("upper", "--").show()
+---+-----+                                                                     
|key|upper|
+---+-----+
|  b|    B|
|  a|    A|
+---+-----+

but udf (as you've already established) or literal map would work as well:

from pyspark.sql.functions import coalesce, create_map
from itertools import chain

thedict_col = create_map(*chain.from_iterable(
    (lit(k), lit(v)) for k, v in thedict.items()
))

df.withColumn('upper', coalesce(thedict_col[col("key")], lit("--"))).show()
+---+-----+
|key|upper|
+---+-----+
|  a|    A|
|  b|    B|
+---+-----+

Notes:

  • Of course if you want to convert to upper case, just use pyspark.sql.functions.upper.
  • Using some_broadcast.value as an argument for the function won't work at all. Variable substitution will applied locally and broadcasting won't be utilized. value should be called in the function body, so it is executed in the executor context.
Sign up to request clarification or add additional context in comments.

4 Comments

Thank you. Your explanation makes sense, and yes, I'm a bit hazy on when which context is applied. My actual dictionary (a parameter structure) is a multilevel one. For encapsulated dictionaries, I figured I can just keep using create_map, like you suggested, but what if my parameter dictionary looks like this: thedict={"a": ("A","AA","AAA"), "b" : ("B","BB","BBB") } How do I encode it to access thedict["a"][2]. I'm trying to stay away from UDF for speed reasons.
Nested dictionaries work pretty much the same way. You can check my answers here and here and structs are just just Columns so this create_map(lit("a"), struct(lit("A"), lit("A"), lit("AAA")), lit("b"), struct(lit("B"), lit("BB"), lit("BBB"))) would work, but for syntax you'll need arrays: create_map(lit("a"), array(lit("A"), lit("A"), lit("AAA")), lit("b"), array(lit("B"), lit("BB"), lit("BBB"))). With from pyspark.sql.functions import create_map, struct, array, lit.
Apologies for pushing. With your last suggestion, I felt I came close to my final goal: broadcast parameter structure of the form: {"a": { "first" : (0.1,0.2,0.3), "second": 0.4 }, "b" : { "first" : (0.11,0.22,0.33), "second": 0.44 } } but then I started getting errors like this: cannot resolve 'map('first', array(0.1D, 0.2D, 0.3D), 'second', 0.4)' due to data type mismatch. The given values of function map should all be the same type, but they are [array<double>, double]. Does it mean a heterogeneous parameter structure like this is not the right solution?
That's correct. Heterogeneous structures are not supported. If overall structure is fixed then you can use struct. Or arrays like this: create_map(lit("a"), create_map(lit("first"), array(lit(0.1),lit(0.2),lit(0.3)), lit("second"), array(lit(0.4))))

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.