I have a dataframe like this:
val df = Seq(
("a", Seq(2.0)),
("a", Seq(1.0)),
("a", Seq(0.5)),
("b", Seq(24.0)),
("b", Seq(12.5)),
("b", Seq(6.4)),
("b", Seq(3.2)),
("c", Seq(104.0)),
("c", Seq(107.4))
).toDF("key", "value")
I need to use an algorithm that takes in input a DataFrame object on distinct groups. To make this clearer, assume that I have to use StandardScaler scaling by groups.
In pandas I would do something like this (many type changes in the process):
from sklearn.preprocessing import StandardScaler
df.groupby(key) \
.value \
.transform(lambda x: StandardScaler \
.fit_transform(x \
.values \
.reshape(-1,1)) \
.reshape(-1))
I need to do this in scala because the algorithm I need to use is not the Scaler but another thing built in scala.
So far I've tried to do something like this:
import org.apache.spark.ml.feature.StandardScaler
def f(X : org.apache.spark.sql.Column) : org.apache.spark.sql.Column = {
val scaler = new StandardScaler()
.setInputCol("value")
.setOutputCol("scaled")
val output = scaler.fit(X)("scaled")
(output)
}
df.withColumn("scaled_values", f(col("features")).over(Window.partitionBy("key")))
but of course it gives me an error:
command-144174313464261:21: error: type mismatch; found : org.apache.spark.sql.Column required: org.apache.spark.sql.Dataset[_] val output = scaler.fit(X)("scaled")
So I'm trying to transform a single Column object into a DataFrame object, without success. How do I do it?
If it's not possible, is there any workaround to solve this?
UPDATE 1
It seems I made some mistakes in the code, I tried to fix it (I think I did right):
val df = Seq(
("a", 2.0),
("a", 1.0),
("a", 0.5),
("b", 24.0),
("b", 12.5),
("b", 6.4),
("b", 3.2),
("c", 104.0),
("c", 107.4)
).toDF("key", "value")
def f(X : org.apache.spark.sql.DataFrame) : org.apache.spark.sql.Column = {
val assembler = new VectorAssembler()
.setInputCols(Array("value"))
.setOutputCol("feature")
val scaler = new StandardScaler()
.setInputCol("feature")
.setOutputCol("scaled")
val pipeline = new Pipeline()
.setStages(Array(assembler, scaler))
val output = pipeline.fit(X).transform(X)("scaled")
(output)
}
someDF.withColumn("scaled_values", f(someDF).over(Window.partitionBy("key")))
I still get an error:
org.apache.spark.sql.AnalysisException: Expression 'scaled#1294' not supported within a window function.;;
I am not sure about the reason for this error, I tried aliasing the column but it doesn't seem to work.