1

I'd like to get some lesson about the input grammar in pyspark.

My platform is below.

Red Hat Enterprise Linux Server release 6.8 (Santiago)
spark version 1.6.2
python 2.6

I have def defined in module basic_lib.py as below.

def selectRowByTimeStamp(x,y):
    if x._1 > y._1:
        return x
    return y

Below is my snippet of the main code

  df2 = df2.map(lambda x: (x._2, x))
  rdd = df2.reduceByKey(basic_lib.selectRowByTimeStamp)

Why does above basic_lib.selectRowByTimeStamp work without clearly specifying the input parameter?

For example , something like below is more easy to understand.

var1 = 1
var2 = 2

rdd = df2.reduceByKey(basic_lib.selectRowByTimeStamp(var1, var2))
3
  • Please define what do you mean by "work". What is your desired outcome? Commented Sep 17, 2016 at 8:18
  • "work" means reduceByKey returns the correct RDD. But I don't understand the how can selectRowByTimeStamp return correct value if the input variable is not written in the source code. Commented Sep 17, 2016 at 8:24
  • well reduceByKey takes the processing function as input which in turn seems to operate on the tuples of the input RDD. So reduceByKey doesn't need to know the parameters, it is applied similarly to a lmbda function on the elements of df2 Commented Sep 17, 2016 at 10:03

1 Answer 1

2

It looks like you're slightly confused about what exactly is the purpose of lambda expressions. In general lambda expression in Python are used to create anonymous, single expression functions. Other than than that, as far as we care here, there are not different than any other function you define. To quote the docs:

Small anonymous functions can be created with the lambda keyword. (...) Lambda functions can be used wherever function objects are required. Semantically, they are just syntactic sugar for a normal function definition.

Since lambda functions are not special in Python in general there cannot be special in PySpark (well, they may require some serialization tricks due to their scope but it is only about their scope). No matter if function is defined by lambda or not (or if it is even a function*) Spark applies it in exactly the same way. So when you call:

df2.map(lambda x: (x._2, x))

lambda expression is simply evaluated and what is received by map is just another function object. It wouldn't be different if you assigned if first:

foo = lambda x: (x._2, x)  # Yup, this is against style guide (PEP 8)

or created a standalone function:

def bar(x):
    return x._2, x 

In all three cases function object is functionally pretty much the same:

import dis

dis.dis(foo)
##   1           0 LOAD_FAST                0 (x)
##               3 LOAD_ATTR                0 (_2)
##               6 LOAD_FAST                0 (x)
##               9 BUILD_TUPLE              2
##              12 RETURN_VALUE

dis.dis(bar)
##   2           0 LOAD_FAST                0 (x)
##               3 LOAD_ATTR                0 (_2)
##               6 LOAD_FAST                0 (x)
##               9 BUILD_TUPLE              2
##              12 RETURN_VALUE

dis.dis(lambda x: (x._2, x))
##   1           0 LOAD_FAST                0 (x)
##               3 LOAD_ATTR                0 (_2)
##               6 LOAD_FAST                0 (x)
##               9 BUILD_TUPLE              2
##              12 RETURN_VALUE

On a side note if input is a DataFrame here it is much more efficient to solve this using Spark SQL. Also it is better to extract rdd before you use map to ensure forward compatibility. Finally Row is just a tuple.

So optimally you could:

df.groupBy("_2").max()

but if you really want to use RDD API:

df.select("_2", "_1").rdd.reduceByKey(max)

* In practice any callable object will work as long as it accepts given arguments. For example (not that it makes much sense here) you could replace function with an object of a class defined as follows:

class FooBar(object):
    def __call__(self, x):
        return x._2, x

df2.rdd.map(FooBar())
Sign up to request clarification or add additional context in comments.

1 Comment

Due to my lack of experience writing pyspark and python, finding that lambda and def are the same is new. I also found your description in the spark docment. (spark.apache.org/docs/1.6.2/programming-guide.html#basics) . Also , combination with below question gave me clearer undersanding. (stackoverflow.com/questions/25091091/…) This question taught me reduceByKey takes two arguments which are (K1,V1) , (K2,V2)

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.