5

Could someone please help me understand the behaviour of appending map functions to an RDD in a python for loop?

For the following code:

rdd = spark.sparkContext.parallelize([[1], [2], [3]])

def appender(l, i):
    return l + [i]

for i in range(3):
    rdd = rdd.map(lambda x: appender(x, i))

rdd.collect()

I get the output:

[[1, 2, 2, 2], [2, 2, 2, 2], [3, 2, 2, 2]]

Whereas with the following code:

rdd = spark.sparkContext.parallelize([[1], [2], [3]])

def appender(l, i):
    return l + [i]

rdd = rdd.map(lambda x: appender(x, 1))
rdd = rdd.map(lambda x: appender(x, 2))
rdd = rdd.map(lambda x: appender(x, 3))

rdd.collect()

I get the expected output:

[[1, 1, 2, 3], [2, 1, 2, 3], [3, 1, 2, 3]]

I imagine this has something to do with the closure that is passed to the PySpark compiler, but I can't find any documentation about this...

2 Answers 2

2

The solution is to store all global variables (in this case i) in the lambda function to ensure proper closure. This can be accomplished by

for i in range(3):
    rdd = rdd.map(lambda x, i=i: appender(x, i))

More information about this can be found at lambda function accessing outside variable.

Interestingly, at least on a local cluster (have not tested on distributed clusters), the problem can also be addressed by persisting the intermediate rdd:

for i in range(3):
    rdd = rdd.map(lambda x: appender(x, i))
    rdd.persist()

both solutions produce

[[1, 0, 1, 2], [2, 0, 1, 2], [3, 0, 1, 2]] 
Sign up to request clarification or add additional context in comments.

Comments

1

My best guess is because of lazy evaluation: And also You had a bad range.

this two code snippets results in the same output:

rdd = spark.sparkContext.parallelize([[1], [2], [3]])

def appender(l, i):
    return l + [i]

for i in range(1,4):
    rdd = spark.sparkContext.parallelize(rdd.map(lambda x: appender(x, i)).collect())

rdd.collect()

outputs:

[[1, 1, 2, 3], [2, 1, 2, 3], [3, 1, 2, 3]]

and second one :

rdd = spark.sparkContext.parallelize([[1], [2], [3]])

rdd = rdd.map(lambda x: appender(x, 1))
rdd = rdd.map(lambda x: appender(x, 2))
rdd = rdd.map(lambda x: appender(x, 3))

rdd.collect()

outputs:

[[1, 1, 2, 3], [2, 1, 2, 3], [3, 1, 2, 3]]

Also, to show what happens in for loop in simplified example ( only inputs 1 and 2 ) with modified appender function to print l argument:

  1. for loop prints :

    [2]
    [2, 2]
    [1]
    [3]
    [1, 2]
    [3, 2]
    

as firstly it gets second field from input list

  1. explicit writing of mappers output is:

    [1]
    [1, 1]
    [2]
    [2, 1]
    [3]
    [3, 1]
    

3 Comments

Hmm, nothing wrong with that range from my (and my python interpreter's) perspective. docs.python.org/2/library/functions.html#range
Parallelizing the rdd.map function is certainly not what I want to do either. Parallelize should be used to distribute an existing collection over the cluster. Bear in mind that this is just test pseudo-code.
` Python 2.7.12 >>> for i in range(3): ... print(i) 0 1 2 ` and in second snippet You put as input numbers : 1,2,3

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.