I am using Pandas UDF on Pyspark.
I have a main file __main_.py with:
from pyspark.sql import SparkSession
from run_udf import compute
def main():
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
df = compute(df)
df.show()
spark.stop()
if __name__ == "__main__":
main()
And a run_udf.py file that contains my UDF function and another function (that multiplies a single variable by 2):
from pyspark.sql.functions import pandas_udf, PandasUDFType
def multi_by_2(x):
return 2 * x
def compute(df):
@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def subtract_mean(pdf):
# pdf is a pandas.DataFrame
v = pdf.v
return pdf.assign(v=multi_by_2(v) - v.mean())
df = df.groupby("id").apply(subtract_mean)
return df
By running main.py I get the following error: "No module named 'run_udf'". In this configuration, subtract_mean() does not seem to access the function multi_by_2(). I found 2 ways but don't know if it follows best practice standards:
Method 1: (move the function inside compute - not ideal as I will have the copy the function each time I use another pandas_udf() function - we loose the concept of 'reusable' function).
def compute(df):
def multi_by_2(x):
return 2 * x
@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def subtract_mean(pdf):
# pdf is a pandas.DataFrame
v = pdf.v
return pdf.assign(v=multi_by_2(v) - v.mean())
df = df.groupby("id").apply(subtract_mean)
return df
Method 2: Pass the multiplying function as parameter of compute.
__main_.py
from pyspark.sql import SparkSession
from run_udf import compute
def multi_by_2(x):
return 2 * x
def main():
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
df = compute(df, multi_by_2)
df.show()
spark.stop()
if __name__ == "__main__":
main()
run_udf.py from pyspark.sql.functions import pandas_udf, PandasUDFType
def compute(df, multi_by_2):
@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def subtract_mean(pdf):
# pdf is a pandas.DataFrame
v = pdf.v
return pdf.assign(v=multi_by_2(v) - v.mean())
df = df.groupby("id").apply(subtract_mean)
return df
The two solutions I found seem to be a bit hacky. Is there any better way to tackle this issue?