0

I want to achieve the below in scala for a spark dataframe,

  1. For each column, select colname and flag variable ( 0 or 1)
  2. find mean of column when flag = 0 and then when flag = 1
  3. std dev of the column

I am not sure how to loop through columns and select each column and flag variable each iteration of the loop. What I tried is :-

 for (a <- colnames) {
      val dat1 = data.filter($"cust_flag".isin("1")).select(a)
      val dat0 = data.filter($"cust_flag".isin("0")).select(a)
      val m0 = dat1.select(avg(a)).asInstanceOf[Double]
      val m1 = dat0.select(avg(a)).asInstanceOf[Float]
      val stdev = data.agg(stddev(a)).asInstanceOf[Float]
      val rpb = ((m1 - m0) / stdev)*p*q
      println(rpb)

Now I am getting an error - Exception in thread "main" java.lang.ClassCastException: org.apache.spark.sql.Dataset cannot be cast to java.lang.Float

3
  • 1
    can you provide input dataframe and expected output Commented Jan 21, 2020 at 5:52
  • edited the question to add the code Commented Jan 21, 2020 at 12:09
  • what is p and q ? and also provide sample data if possible Commented Jan 21, 2020 at 12:44

4 Answers 4

1

To create a column with a given name from a string, a simple way is to use:

import org.apache.spark.sql.{functions => sf}

df.select(sf.col(colName))

You can combine this in control logic (your loop) as you see fit.

If you want to know what columns are in the dataframe, use df.columns.

Sign up to request clarification or add additional context in comments.

Comments

1

We have direct function for mean() and stddev()

Create two filter data set

ie.

1 for flag =0 and 2 for flag =1 and

dfcol0= df.filter(df("colname") === "0")
dfcol1= df.filter(df("colname") === "1")

Now using stddev() and mean() function get what is required .

 dfcol0.select(stddev("coname")).show(false)
 dfcol0.select(mean("coname")).show(false)

Comments

0

As I understand, you are trying to get average of each columns according to flag and Standard Deviation of each column irrespective of flag. After it you are applying formula and calculating rpb.

On the basis of same logic, I have taken sample data and written code without loop. It will be faster than the loop logic you are using. Spark does not good with loop logic so try to get all required data into one row (such as avg0, avg1 and StdDev in below example) and after it process horizontally or in batch.

Please note, as I commented above. I didn't understand the value of p and q so I have ignored it in final Output dataframe logic. You can add directly if these are variables declared before.

scala> import org.apache.spark.sql.types._
scala> val df = Seq(
     |   ("121",  "442",  "512","1"),
     |   ("134",  "434",  "752","0"),
     |   ("423",  "312",  "124","1"),
     |     ("432",  "677",  "752","0"),
     |   ("332",  "424",  "111","1")).
     |   toDF("col1","col2","col3","cust_flag").
     |   withColumn("col1", $"col1".cast(DoubleType)).
     |   withColumn("col2", $"col2".cast(DoubleType)).
     |   withColumn("col3", $"col3".cast(DoubleType))

scala> df.show
+-----+-----+-----+---------+
| col1| col2| col3|cust_flag|
+-----+-----+-----+---------+
|121.0|442.0|512.0|        1|
|134.0|434.0|752.0|        0|
|423.0|312.0|124.0|        1|
|432.0|677.0|752.0|        0|
|332.0|424.0|111.0|        1|
+-----+-----+-----+---------+

scala>val colSeq =  Seq("col1", "col2", "col3")


scala>  val aggdf  = colSeq.map(c => {
     | df.groupBy("cust_flag").agg( lit(c).alias("columnName"), avg(c).cast("Decimal(14,2)").alias("avg"))
     | })


scala> val devdf  = colSeq.map(c => {
     |   df.agg( lit(c).alias("columnName"), stddev(c).cast("Decimal(14,2)").alias("StdDev"))
     |   })

scala> val avgDF = aggdf.reduce(_ union _)

scala> val stdDevDF = devdf.reduce(_ union _)

scala> val finalAvgDF = avgDF.filter(col("cust_flag") === 1).alias("1").join(avgDF.filter(col("cust_flag") === 0).alias("0"), List("columnName")).select(col("columnName"), col("1.avg").alias("avg1"), col("0.avg").alias("avg0"))

scala> val outDF = finalAvgDF.join(stdDevDF, List("columnName"))

scala> outDF.show()
+----------+------+------+------+                                               
|columnName|  avg1|  avg0|StdDev|
+----------+------+------+------+
|      col1|292.00|283.00|152.07|
|      col2|392.67|555.50|133.48|
|      col3|249.00|752.00|319.16|
+----------+------+------+------+

//apply your final formula to ger rpb
scala> outDF.withColumn("rpb", (col("avg1") - col("avg0"))/col("StdDev")).show
+----------+------+------+------+--------------------+                          
|columnName|  avg1|  avg0|StdDev|                 rpb|
+----------+------+------+------+--------------------+
|      col1|292.00|283.00|152.07| 0.05918327086210298|
|      col2|392.67|555.50|133.48|-1.21988312855858556|
|      col3|249.00|752.00|319.16|-1.57601203158290513|
+----------+------+------+------+--------------------+ 

Comments

0

I would suggest you use df.selectExpr() which can take a sequence of string:

val expressions = Seq("avg(col1) as avg_col1","std_dev(col1) as sd_col1", "...")

df.selectExpr(expressions:_*)

You can do almost everything with that function building the array of expressions as you wish in a for loop.

I suggest anyway that you show us an example of expected input/output (the code you wrote doesn't tell much).

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.