1

I have a dataframe like this:

      val df = Seq(
      ("a", Seq(2.0)),
      ("a", Seq(1.0)),
      ("a", Seq(0.5)),
      ("b", Seq(24.0)),
      ("b", Seq(12.5)),
      ("b", Seq(6.4)),
      ("b", Seq(3.2)),
      ("c", Seq(104.0)),
      ("c", Seq(107.4))
    ).toDF("key", "value")

I need to use an algorithm that takes in input a DataFrame object on distinct groups. To make this clearer, assume that I have to use StandardScaler scaling by groups.

In pandas I would do something like this (many type changes in the process):

from sklearn.preprocessing import StandardScaler
       df.groupby(key) \
       .value \
       .transform(lambda x: StandardScaler \
       .fit_transform(x \
       .values \
       .reshape(-1,1)) \
       .reshape(-1))

I need to do this in scala because the algorithm I need to use is not the Scaler but another thing built in scala.

So far I've tried to do something like this:

import org.apache.spark.ml.feature.StandardScaler
def f(X : org.apache.spark.sql.Column) : org.apache.spark.sql.Column = {  
      val scaler = new StandardScaler()
        .setInputCol("value")
        .setOutputCol("scaled")

      val output = scaler.fit(X)("scaled")

      (output)

    }

    df.withColumn("scaled_values", f(col("features")).over(Window.partitionBy("key")))

but of course it gives me an error:

command-144174313464261:21: error: type mismatch; found : org.apache.spark.sql.Column required: org.apache.spark.sql.Dataset[_] val output = scaler.fit(X)("scaled")

So I'm trying to transform a single Column object into a DataFrame object, without success. How do I do it?

If it's not possible, is there any workaround to solve this?

UPDATE 1

It seems I made some mistakes in the code, I tried to fix it (I think I did right):

      val df = Seq(
      ("a", 2.0),
      ("a", 1.0),
      ("a", 0.5),
      ("b", 24.0),
      ("b", 12.5),
      ("b", 6.4),
      ("b", 3.2),
      ("c", 104.0),
      ("c", 107.4)
    ).toDF("key", "value")


    def f(X : org.apache.spark.sql.DataFrame) : org.apache.spark.sql.Column = {  
         val assembler = new VectorAssembler()
        .setInputCols(Array("value"))
        .setOutputCol("feature")
          val scaler = new StandardScaler()
        .setInputCol("feature")
        .setOutputCol("scaled")
         val pipeline = new Pipeline()
        .setStages(Array(assembler, scaler))
         val output = pipeline.fit(X).transform(X)("scaled")

      (output)
    }  

    someDF.withColumn("scaled_values", f(someDF).over(Window.partitionBy("key")))

I still get an error:

org.apache.spark.sql.AnalysisException: Expression 'scaled#1294' not supported within a window function.;;

I am not sure about the reason for this error, I tried aliasing the column but it doesn't seem to work.

1 Answer 1

2

So I'm trying to transform a single Column object into a DataFrame object, without success. How do I do it?

You can't, a column just references a column of a DataFrame, it does not contain any data, it's not a data structure like a dataframe.

Your f function will also not work like this. If you want to create a custom function to be used with Window, then you need an UDAF (User-Defined-Aggregation-Function), which is pretty hard...

In your case, I would to a groupBy key, collect_list of your values, then apply an UDF to do the scaling. Note that this only works of the data per key is not too large (larger than what fits into 1 executor), otherwise you need UDAF

Here an example:

// example scala method, scale to 0-1
def myScaler(data:Seq[Double]) = {
  val mi = data.min
  val ma = data.max
  data.map(x => (x-mi)/(ma-mi))
}

val udf_myScaler = udf(myScaler _)

df
  .groupBy($"key")
  .agg(
    collect_list($"value").as("values")
  )
  .select($"key",explode(arrays_zip($"values",udf_myScaler($"values"))))
  .select($"key",$"col.values",$"col.1".as("values_scaled"))
  .show()

gives:

+---+------+-------------------+
|key|values|      values_scaled|
+---+------+-------------------+
|  c| 104.0|                0.0|
|  c| 107.4|                1.0|
|  b|  24.0|                1.0|
|  b|  12.5|0.44711538461538464|
|  b|   6.4|0.15384615384615385|
|  b|   3.2|                0.0|
|  a|   2.0|                1.0|
|  a|   1.0| 0.3333333333333333|
|  a|   0.5|                0.0|
+---+------+-------------------+
Sign up to request clarification or add additional context in comments.

1 Comment

Thank you for the reply! I tried your code, and I get this error: org.apache.spark.SparkException: Task not serializable. I also have this problem: I am try to run an anomaly detection algorithm (IsolationForest) on the various groups, so it's quite difficult for me to rewrite all the code of the algorithm as a function that ingests a list of values. It seems to me that, unless I do that, I won't be able to complete this task. Furthermore, If I have to put everything in one single executor, do not I lose the cool side of the spark parallelism?

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.