As I understand, you are trying to get average of each columns according to flag and Standard Deviation of each column irrespective of flag. After it you are applying formula and calculating rpb.
On the basis of same logic, I have taken sample data and written code without loop. It will be faster than the loop logic you are using. Spark does not good with loop logic so try to get all required data into one row (such as avg0, avg1 and StdDev in below example) and after it process horizontally or in batch.
Please note, as I commented above. I didn't understand the value of p
and q so I have ignored it in final Output dataframe logic. You can
add directly if these are variables declared before.
scala> import org.apache.spark.sql.types._
scala> val df = Seq(
| ("121", "442", "512","1"),
| ("134", "434", "752","0"),
| ("423", "312", "124","1"),
| ("432", "677", "752","0"),
| ("332", "424", "111","1")).
| toDF("col1","col2","col3","cust_flag").
| withColumn("col1", $"col1".cast(DoubleType)).
| withColumn("col2", $"col2".cast(DoubleType)).
| withColumn("col3", $"col3".cast(DoubleType))
scala> df.show
+-----+-----+-----+---------+
| col1| col2| col3|cust_flag|
+-----+-----+-----+---------+
|121.0|442.0|512.0| 1|
|134.0|434.0|752.0| 0|
|423.0|312.0|124.0| 1|
|432.0|677.0|752.0| 0|
|332.0|424.0|111.0| 1|
+-----+-----+-----+---------+
scala>val colSeq = Seq("col1", "col2", "col3")
scala> val aggdf = colSeq.map(c => {
| df.groupBy("cust_flag").agg( lit(c).alias("columnName"), avg(c).cast("Decimal(14,2)").alias("avg"))
| })
scala> val devdf = colSeq.map(c => {
| df.agg( lit(c).alias("columnName"), stddev(c).cast("Decimal(14,2)").alias("StdDev"))
| })
scala> val avgDF = aggdf.reduce(_ union _)
scala> val stdDevDF = devdf.reduce(_ union _)
scala> val finalAvgDF = avgDF.filter(col("cust_flag") === 1).alias("1").join(avgDF.filter(col("cust_flag") === 0).alias("0"), List("columnName")).select(col("columnName"), col("1.avg").alias("avg1"), col("0.avg").alias("avg0"))
scala> val outDF = finalAvgDF.join(stdDevDF, List("columnName"))
scala> outDF.show()
+----------+------+------+------+
|columnName| avg1| avg0|StdDev|
+----------+------+------+------+
| col1|292.00|283.00|152.07|
| col2|392.67|555.50|133.48|
| col3|249.00|752.00|319.16|
+----------+------+------+------+
//apply your final formula to ger rpb
scala> outDF.withColumn("rpb", (col("avg1") - col("avg0"))/col("StdDev")).show
+----------+------+------+------+--------------------+
|columnName| avg1| avg0|StdDev| rpb|
+----------+------+------+------+--------------------+
| col1|292.00|283.00|152.07| 0.05918327086210298|
| col2|392.67|555.50|133.48|-1.21988312855858556|
| col3|249.00|752.00|319.16|-1.57601203158290513|
+----------+------+------+------+--------------------+