I'm trying to "convert" my Spark Application, which is written in Java into Scala. Because I'm new to Scala and Spark's Scala API, I don't know how to write this "transformToPair" function in Scala:
Java:
JavaPairDStream<String, Boolean> outlierPairDStream = avgAll1h.union(avgPerPlug1h).transformToPair(findOutliersPerComparisonFunction);
*** FUNCTION ***
private static Function<JavaPairRDD<String,Float>, JavaPairRDD<String,Boolean>> findOutliersPerComparisonFunction = new Function<JavaPairRDD<String,Float>, JavaPairRDD<String,Boolean>>() {
public JavaPairRDD<String, Boolean> call(JavaPairRDD<String, Float> v1) throws Exception {
float avgOfAll;
if(v1.count() > 0) {
avgOfAll = v1.filter(new Function<Tuple2<String,Float>, Boolean>() {
public Boolean call(Tuple2<String, Float> v1) throws Exception {
return v1._1().equals("all");
}
}).values().collect().get(0);
} else {
avgOfAll = 0.0f;
}
final float finalAvg = avgOfAll;
JavaPairRDD<String, Boolean> rddBool = v1.mapValues(new Function<Float, Boolean>() {
public Boolean call(Float v1) throws Exception {
return v1 > finalAvg;
}
});
return rddBool.filter(new Function<Tuple2<String,Boolean>, Boolean>() {
public Boolean call(Tuple2<String, Boolean> v1) throws Exception {
return !v1._1().equals("all");
}
});
}
};
Here my attempt with Scala:
val outlierPairDStream = avgAll1h.union(avgPerPlug1h).transform{rdd =>
var avgOfAll = 0.0
if(rdd.count() > 0) {
avgOfAll = rdd.filter({case (k, v) => (k == "all")}).map({case (k, v) => v}).collect()(0)
}
val finalAvg = avgOfAll
val rddBool = rdd.map({case(k, v) => (k, v > finalAvg)})
val rddNew = rddBool.filter({case(k, v) => (k != "all")})
}
I get the following error message:
<console>:281: error: type mismatch;
found : Unit
required: org.apache.spark.rdd.RDD[?]
}
^
Can someone help me? How can I return the "rddNew" DStream?
If I say
return rddNew
at the end of the "transform" function, I get the following error:
<console>:293: error: return outside method definition
return rddNew
^