3

I am using Pandas UDF on Pyspark.

I have a main file __main_.py with:

from pyspark.sql import SparkSession
from run_udf import compute


def main():
    spark = SparkSession.builder.getOrCreate()
    df = spark.createDataFrame(
        [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
        ("id", "v"))
    df = compute(df)
    df.show()
    spark.stop()


if __name__ == "__main__":
    main()

And a run_udf.py file that contains my UDF function and another function (that multiplies a single variable by 2):

from pyspark.sql.functions import pandas_udf, PandasUDFType


def multi_by_2(x):
    return 2 * x


def compute(df):

    @pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
    def subtract_mean(pdf):
        # pdf is a pandas.DataFrame
        v = pdf.v
        return pdf.assign(v=multi_by_2(v) - v.mean())

    df = df.groupby("id").apply(subtract_mean)

    return df

By running main.py I get the following error: "No module named 'run_udf'". In this configuration, subtract_mean() does not seem to access the function multi_by_2(). I found 2 ways but don't know if it follows best practice standards:

Method 1: (move the function inside compute - not ideal as I will have the copy the function each time I use another pandas_udf() function - we loose the concept of 'reusable' function).

def compute(df):
    def multi_by_2(x):
        return 2 * x
    @pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
    def subtract_mean(pdf):
        # pdf is a pandas.DataFrame
        v = pdf.v
        return pdf.assign(v=multi_by_2(v) - v.mean())

    df = df.groupby("id").apply(subtract_mean)


    return df

Method 2: Pass the multiplying function as parameter of compute.

__main_.py

from pyspark.sql import SparkSession
from run_udf import compute
def multi_by_2(x):
    return 2 * x

def main():
    spark = SparkSession.builder.getOrCreate()
    df = spark.createDataFrame(
        [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
        ("id", "v"))
    df = compute(df, multi_by_2)
    df.show()
    spark.stop()


if __name__ == "__main__":
    main()

run_udf.py from pyspark.sql.functions import pandas_udf, PandasUDFType

def compute(df, multi_by_2):
    @pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
    def subtract_mean(pdf):
        # pdf is a pandas.DataFrame
        v = pdf.v
        return pdf.assign(v=multi_by_2(v) - v.mean())

    df = df.groupby("id").apply(subtract_mean)


    return df

The two solutions I found seem to be a bit hacky. Is there any better way to tackle this issue?

1 Answer 1

1

I know this reply comes a while after you've posted the question but I hope it can still be helpful!

What is the reason you want to wrap this in a nested function? Also, calling a function with a spark dataframe as the argument is not commonly done as far as I am aware, so maybe you could try something like the following for your main script:

from pyspark.sql import SparkSession
from run_udf import substract_mean_udf

def main():
    spark = SparkSession.builder.getOrCreate()
    df = spark.createDataFrame(
        [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
        ("id", "v"))
    df =df.groupby("id").apply(subtract_mean_udf)
    df.show()
    spark.stop()

if __name__ == "__main__":
    main()

And the following for the run_udf.py script:

from pyspark.sql.functions import pandas_udf, PandasUDFType

def multi_by_2(x):
    return 2 * x

@pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
def subtract_mean_udf(pdf):
    # pdf is a pandas.DataFrame
    return pdf.assign(v=multi_by_2(pdf.v) - pdf.v.mean())

Most of this information is taken from the a Databricks notebook on Pandas UDFs.

You could probably also get away with

return pdf.assign(v=pdf.v*2 - pdf.v.mean())

but I haven't tested that so I am not 100% sure.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.