0

I am trying to invoke a UDF that uses a broadcasted object in PySpark.

Here is a minimal example that reproduces the situation and error:

import pyspark.sql.functions as sf
from pyspark.sql.types import LongType


class SquareClass:
    def compute(self, n):
        return n ** 2


square = SquareClass()
square_sc = sc.broadcast(square)

def f(n):
    return square_sc.value.compute(n)  

numbers = sc.parallelize([{'id': i} for i in range(10)]).toDF()
f_udf = sf.udf(f, LongType())  

numbers.select(f_udf(numbers.id)).show(10)

The stacktrace and error message that this snippet produces:

Traceback (most recent call last)
<ipython-input-75-6e38c014e4b2> in <module>()
     13 f_udf = sf.udf(f, LongType())
     14 
---> 15 numbers.select(f_udf(numbers.id)).show(10)

/usr/hdp/current/spark-client/python/pyspark/sql/dataframe.py in show(self, n, truncate)
    255         +---+-----+
    256         """
--> 257         print(self._jdf.showString(n, truncate))
    258 
    259     def __repr__(self):

/usr/local/lib/python3.5/dist-packages/py4j/java_gateway.py in __call__(self, *args)
   1131         answer = self.gateway_client.send_command(command)
   1132         return_value = get_return_value(
-> 1133             answer, self.gateway_client, self.target_id, 

<snip>

An error occurred while calling o938.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 49.0 failed 1 times, most recent failure: Lost task 1.0 in stage 49.0 (TID 587, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
1
  • You didn't provide the trace, it seems to be an AttributeError Commented Nov 14, 2017 at 11:15

2 Answers 2

2

When calling the attributes of square_sc you're calling the module SquareClass which is not present on the workers.

If you want to use a python package, class, function in a UDF, workers should be able to have access to it you can achieve this by putting the code in a python script and deploying it using --py-files when running you spark-submit, pyspark

Sign up to request clarification or add additional context in comments.

Comments

1

One thing you can do is, keep the class as separate module and add the module to sparkContext.

class_module.py

class SquareClass:
    def compute(self, n):
        return n ** 2

pyspark-shell 

    import pyspark.sql.functions as sf
    from pyspark.sql.types import LongType
    from class_module import SquareClass

    sc.addFile('class_module.py')

    square = SquareClass()
    square_sc = sc.broadcast(square) 
    def f(n):
        return square_sc.value.compute(n)

    f_udf = sf.udf(f, LongType())
    numbers = sc.parallelize([{'id': i} for i in range(10)]).toDF()
    numbers.select(f_udf(numbers.id)).show(10)
    +-----+
    |f(id)|
    +-----+
    |    0|
    |    1|
    |    4|
    |    9|
    |   16|
    |   25|
    |   36|
    |   49|
    |   64|
    |   81|
    +-----+

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.