0

I have a question regarding udfs in Pyspark and a specific case. I'm trying to make a simple reusable function to aggregate values on different levels and groups. The inputs should be:

  1. Existing dataframe
  2. Variables for group by (either single column or a list)
  3. Variables to be aggregated (same as above)
  4. Function to be applied (either a specific one or a list of them). I kept it simple to sum, avg, min,max, etc...

I got it to work on cases when I have either a single function or a list, but when it comes to the aggregation variables I get stuck on introducing a list of them to the function

def aggregate(dataframe,grouping,aggregation,functions):
   
   **First part works ok on single functions and single columns**

   if hasattr(aggregation,'__iter__') == False and hasattr(functions,'__iter__') == False:
    if functions == sum:
      df = dataframe.groupby(grouping).sum(aggregation)
    elif functions == avg:
       df = dataframe.groupby(grouping).avg(aggregation)
    elif functions == min:
       df = dataframe.groupby(grouping).min(aggregation)
    elif functions == max:
       df = dataframe.groupby(grouping).max(aggregation)
    elif functions == count:
       df = dataframe.groupby(grouping).count(aggregation)
    elif functions == countDistinct:
      df = dataframe.groupby(grouping).countDistinct(aggregation)

  **Here is where I got into the part I struggle with, if aggregation == [some list] it will not work

  elif hasattr(aggregation,'__iter__') == True and hasattr(functions,'__iter__') == False:
    if functions == sum:
      df = dataframe.groupby(grouping).sum(aggregation)
    elif functions == avg:
       df = dataframe.groupby(grouping).avg(aggregation)
    elif functions == min:
       df = dataframe.groupby(grouping).min(aggregation)
    elif functions == max:
       df = dataframe.groupby(grouping).max(aggregation)
    elif functions == count:
       df = dataframe.groupby(grouping).count(aggregation)
    elif functions == countDistinct:
      df = dataframe.groupby(grouping).countDistinct(aggregation)
  

  **Expression to get inputs as lists works too**     
  else:
    expression_def = [f(col(c)) for f in functions for c in aggregation]
    df = dataframe.groupby(grouping).agg(*expression_def)
  return df

2 Answers 2

2

You can use the agg method:

def aggregate(dataframe, grouping, aggregation, functions):
    if hasattr(aggregation, "__iter__"):
        return dataframe.groupBy(grouping).agg({f"{item}": f"{functions}" for item in aggregation})
    else:
        return dataframe.groupBy(grouping).agg({f"{aggregation}": f"{functions}"})
Sign up to request clarification or add additional context in comments.

Comments

2

You can save yourself a lot of writing by just using agg() function.

Example:

grouping = ["A", "B", "C"]
aggregation = {"A": "max", "B": "avg"}
df = dataframe.groupBy(grouping).agg(agregation)

That will also help you with aggregation over multiple columns, because you can pass it as a dict of aggregation functions over columns.

In your case that would look something like:

aggregation = {"A": "max", "B": "max": "C":"max"}

Ref.: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=agg#pyspark.sql.DataFrame.agg

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.