1

I defined a pandas udf function, and want to pass other arguments to udf function except pandas.Series or pandas.DataFrame. I want to use partial function to do that, but it went wrong. My code is in below:

from functools import partial

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf, PandasUDFType

conf = SparkConf().setMaster("local[*]").setAppName("test")
spark = SparkSession.builder.config(conf=conf).getOrCreate()

df = spark.createDataFrame([(1, 2), (1, 4), (2, 6), (2, 4)], schema=["x", "y"])

@pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
def f(pdf, z):
    y = pdf.y * 2 + z
    return pdf.assign(y=y)

df.groupBy(df.x).apply(partial(f, z=100)).show()

and the traceback:

Traceback (most recent call last): File "test.py", line 140, in <module> df.groupBy(df.x).apply(partial(f, z=100)).show() File "/usr/local/python3/lib/python3.5/site-packages/pyspark/sql/group.py", line 270, in apply or udf.evalType != PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF: AttributeError: 'functools.partial' object has no attribute 'evalType'

Is there anything wrong with it?

4
  • which spark distribution are you using . Because latest spark version doesn't seem to have any function as PandasUDFType . Commented Dec 18, 2019 at 5:21
  • The version is 2.4.3.@ Ajinkya Bhore Commented Dec 18, 2019 at 6:20
  • Creating an object of PandasUDFType like PandasUDFType().GROUP_MAP, I really don't know how to do it. Commented Dec 18, 2019 at 7:31
  • Please mention expected output in the question. Commented Dec 18, 2019 at 9:21

2 Answers 2

4

You cannot wrap the pandas_udf function in a partial function because apply won't recognize it as a pandas_udf anymore. Here the error says that the partial function doesn't have attribute evalType. That attribute is available only on the pandas_udf object.

Instead you could do something a bit different like having a function that returns the right pandas_udf for a given z:

df = spark.createDataFrame([(1, 2), (1, 4), (2, 6), (2, 4)], schema=["x", "y"])

def f(z):
    @pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
    def _internal_udf(pdf):
        y = pdf.y * 2 + z
        return pdf.assign(y=y)
    return _internal_udf

df.groupBy(df.x).apply(f(z=100)).show()
Sign up to request clarification or add additional context in comments.

1 Comment

Works like a Charm! Thanks for this answer!! I spent hours trying out stuff :( I can also say that this answer works for SCALAR
1

This can be done in pyspark 3.0 using the grouped map Pandas function API.

df = spark.createDataFrame([(1, 2), (1, 4), (2, 6), (2, 4)], schema=["x", "y"])

def f(pdf, z):
    y = pdf.y * 2 + z
    return pdf.assign(y=y)

partial_f = partial(f, z=100)

df.groupBy(df.x).applyInPandas(partial_f, df.schema).show()

+---+---+
|  x|  y|
+---+---+
|  1|104|
|  1|108|
|  2|112|
|  2|108|
+---+---+

The @pandas_udf decorator should not be used on the function declaration. Spark calls pandas_udf on your function behind the scenes when using the Pandas Function APIs.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.