I have a question regarding udfs in Pyspark and a specific case. I'm trying to make a simple reusable function to aggregate values on different levels and groups. The inputs should be:
- Existing dataframe
- Variables for group by (either single column or a list)
- Variables to be aggregated (same as above)
- Function to be applied (either a specific one or a list of them). I kept it simple to sum, avg, min,max, etc...
I got it to work on cases when I have either a single function or a list, but when it comes to the aggregation variables I get stuck on introducing a list of them to the function
def aggregate(dataframe,grouping,aggregation,functions):
**First part works ok on single functions and single columns**
if hasattr(aggregation,'__iter__') == False and hasattr(functions,'__iter__') == False:
if functions == sum:
df = dataframe.groupby(grouping).sum(aggregation)
elif functions == avg:
df = dataframe.groupby(grouping).avg(aggregation)
elif functions == min:
df = dataframe.groupby(grouping).min(aggregation)
elif functions == max:
df = dataframe.groupby(grouping).max(aggregation)
elif functions == count:
df = dataframe.groupby(grouping).count(aggregation)
elif functions == countDistinct:
df = dataframe.groupby(grouping).countDistinct(aggregation)
**Here is where I got into the part I struggle with, if aggregation == [some list] it will not work
elif hasattr(aggregation,'__iter__') == True and hasattr(functions,'__iter__') == False:
if functions == sum:
df = dataframe.groupby(grouping).sum(aggregation)
elif functions == avg:
df = dataframe.groupby(grouping).avg(aggregation)
elif functions == min:
df = dataframe.groupby(grouping).min(aggregation)
elif functions == max:
df = dataframe.groupby(grouping).max(aggregation)
elif functions == count:
df = dataframe.groupby(grouping).count(aggregation)
elif functions == countDistinct:
df = dataframe.groupby(grouping).countDistinct(aggregation)
**Expression to get inputs as lists works too**
else:
expression_def = [f(col(c)) for f in functions for c in aggregation]
df = dataframe.groupby(grouping).agg(*expression_def)
return df