1

Using Spark dataframe , I need to compute the percentage by using the below complex formula :

Group by "KEY " and calculate "re_pct" as ( sum(sa) / sum( sa / (pct/100) ) ) * 100

For Instance , Input Dataframe is

val values1 = List(List("01", "20000", "45.30"), List("01", "30000", "45.30"))
  .map(row => (row(0), row(1), row(2)))

val DS1 = values1.toDF("KEY", "SA", "PCT")
DS1.show()

+---+-----+-----+
|KEY|   SA|  PCT|
+---+-----+-----+
| 01|20000|45.30|
| 01|30000|45.30|
+---+-----+-----+

Expected Result :

+---+-----+--------------+
|KEY|    re_pcnt         |
+---+-----+--------------+
| 01|   45.30000038505   |
+---+-----+--------------+

I have tried to calculate as below

    val result = DS1.groupBy("KEY").agg(((sum("SA").divide(
  sum(
    ("SA").divide(
      ("PCT").divide(100)
    )
  )
)) * 100).as("re_pcnt"))

But facing Error:(36, 16) value divide is not a member of String ("SA").divide({

Any suggestion on implementing the above logic ?

2 Answers 2

4

You can try importing spark.implicits._ and then use $ to refer to a column.

val spark = SparkSession.builder.getOrCreate()
import spark.implicits._

val result = DS1.groupBy("KEY")
  .agg(((sum($"SA").divide(sum(($"SA").divide(($"PCT").divide(100))))) * 100)
  .as("re_pcnt"))

Which will give you the requested output.

If you do not want to import you can always use the col() command instead of $.


It is possible to use a string as input to the agg() function with the use of expr(). However, the input string need to be changed a bit. The following gives exactly the same result as before, but uses a string instead:

val opr = "sum(SA)/(sum(SA/(PCT/100))) * 100"
val df = DS1.groupBy("KEY").agg(expr(opr).as("re_pcnt"))

Note that .as("re_pcnt") need to be inside the agg() method, it can not be outside.

Sign up to request clarification or add additional context in comments.

3 Comments

So in this case are for of the sum() performed with a groupy key or only the last one
@Dee: Not sure if I understand you question correctly or not, but the aggregation is performed separably on each key. If there are multiple keys in the input then the output will have multiple rows.
Apologies my question was very badly written. I've played around with this and realised what I want to do isn't possible i.e. what I'd like to do is sum( log(price) * (quantity / sum(quantity)) ) or sum( log(price) * (quantity / sum(quantity) over (partition by id_col) )) but I can't use nest an agg or window function with an aggregate func. In this case the outer sum()
0

Your code works almost perfectly. You just have to put the '$' symbol in order to specify you're passing a column:

val result = DS1.groupBy($"KEY").agg(((sum($"SA").divide(
  sum(
    ($"SA").divide(
      ($"PCT").divide(100)
    )
  )
)) * 100).as("re_pcnt"))

Here's the output:

result.show()
+---+-------+                                                                   
|KEY|re_pcnt|
+---+-------+
| 01|   45.3|
+---+-------+

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.