1

I'm trying to add list of columns into existing Spark DataFrame.

Example code:

columns_list = ['col1', 'col2', 'col3', 'col4']
reduce(lambda df, col: df.withColumn(col, lit('NULL')), columns_list, df).show()

This is giving the expected results.

Does using reduce() on Distributed Spark DataFrame will try to execute everything on Single Node?

2
  • 3
    any specific reason to use reduce? just df.select("*",*[lit('NULL').alias(i) for i in columns_list]).show() seem to work fine. Commented Apr 24, 2020 at 16:14
  • @anky: that's a nice solution, i love it.. thanks.. Commented Apr 24, 2020 at 17:16

1 Answer 1

2

OP asked

Does using reduce() on Distributed Spark DataFrame will try to execute everything on Single Node?

But I think what OP really wants to know is

whether the following commands are different from an Spark execution standpoint?

Generate toy data

data = [
    ('1',),
    ('2',),
    ('3',),
    ('4',),
]
df = spark.createDataFrame(data, ['id'])

You can see the execution plan of your code using .explain()

Scenario 1 (using functools.reduce)

from functools import reduce
from pyspark.sql.functions import col, lit
columns_list = ['col1', 'col2', 'col3', 'col4']
reduce(lambda df, col: df.withColumn(col, lit('NULL')), columns_list, df).show()
result1 = reduce(lambda df, col: df.withColumn(col, lit('NULL')), columns_list, df)
result1.explain()

== Physical Plan ==
*(1) Project [id#0, NULL AS col1#122, NULL AS col2#125, NULL AS col3#129, NULL AS col4#134]
+- Scan ExistingRDD[id#0]

Scenario 2 (@anky's code using select and list comprehension)

result2 = df.select("*",*[lit('NULL').alias(i) for i in columns_list])
result2.explain()

== Physical Plan ==
*(1) Project [id#0, NULL AS col1#140, NULL AS col2#141, NULL AS col3#142, NULL AS col4#143]
+- Scan ExistingRDD[id#0]

Scenario 3 (using for loop and iterative assignment)

result3 = df
for i in columns_list:
    result3 = result3.withColumn(i, lit('NULL'))

result3.explain()

== Physical Plan ==
*(1) Project [id#0, NULL AS col1#167, NULL AS col2#170, NULL AS col3#174, NULL AS col4#179]
+- Scan ExistingRDD[id#0]

Note that Scenario 3 does not work in 'base' Python (why functools.reduce() is necessary). OP, I suggest reading about the differences between Transformations and Actions in Spark. Spark generates a 'Plan' of execution first, which is why Reduce() is not required.

Sign up to request clarification or add additional context in comments.

1 Comment

Hello, may you please explain to me why in your definition of reduce function you passed 2 more arguments after your lambda function. I am struggling with a similar case, but when I check the documentations for reduce it only has 2 arguments for lambda and an iterable. Thank you very much in advance.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.