0

I'm trying to "convert" my Spark Application, which is written in Java into Scala. Because I'm new to Scala and Spark's Scala API, I don't know how to write this "transformToPair" function in Scala:

Java:

JavaPairDStream<String, Boolean> outlierPairDStream = avgAll1h.union(avgPerPlug1h).transformToPair(findOutliersPerComparisonFunction);

*** FUNCTION ***

private static Function<JavaPairRDD<String,Float>, JavaPairRDD<String,Boolean>> findOutliersPerComparisonFunction = new Function<JavaPairRDD<String,Float>, JavaPairRDD<String,Boolean>>() {
    public JavaPairRDD<String, Boolean> call(JavaPairRDD<String, Float> v1) throws Exception {

        float avgOfAll;
        if(v1.count() > 0) {
            avgOfAll = v1.filter(new Function<Tuple2<String,Float>, Boolean>() {
                public Boolean call(Tuple2<String, Float> v1) throws Exception {
                    return v1._1().equals("all");
                }
            }).values().collect().get(0);
        } else {
            avgOfAll = 0.0f;
        }

        final float finalAvg = avgOfAll;

        JavaPairRDD<String, Boolean> rddBool = v1.mapValues(new Function<Float, Boolean>() {
            public Boolean call(Float v1) throws Exception {
                return v1 > finalAvg;
            }
        });


        return rddBool.filter(new Function<Tuple2<String,Boolean>, Boolean>() {
            public Boolean call(Tuple2<String, Boolean> v1) throws Exception {
                return !v1._1().equals("all");
            }
        });
    }
};

Here my attempt with Scala:

val outlierPairDStream = avgAll1h.union(avgPerPlug1h).transform{rdd => 
  var avgOfAll = 0.0

  if(rdd.count() > 0) {
    avgOfAll = rdd.filter({case (k, v) => (k == "all")}).map({case (k, v) => v}).collect()(0)
  }

  val finalAvg = avgOfAll

  val rddBool = rdd.map({case(k, v) => (k, v > finalAvg)})

  val rddNew = rddBool.filter({case(k, v) => (k != "all")})
}

I get the following error message:

<console>:281: error: type mismatch;
 found   : Unit
 required: org.apache.spark.rdd.RDD[?]
       }
       ^

Can someone help me? How can I return the "rddNew" DStream?

If I say

return rddNew

at the end of the "transform" function, I get the following error:

<console>:293: error: return outside method definition
       return rddNew
       ^

1 Answer 1

1

You have to actually return the last value, e.g. like that:

val outlierPairDStream = avgAll1h.union(avgPerPlug1h).transform{rdd => 
  var avgOfAll = 0.0

  if(rdd.count() > 0) {
    avgOfAll = rdd.filter({case (k, v) => (k == "all")}).map({case (k, v) => v}).collect()(0)
  }

  val finalAvg = avgOfAll

  val rddBool = rdd.map({case(k, v) => (k, v > finalAvg)})

  val rddNew = rddBool.filter({case(k, v) => (k != "all")})

  rddNew
}

Or simply skip defining the variable altogether:

val outlierPairDStream = avgAll1h.union(avgPerPlug1h).transform{rdd => 
  var avgOfAll = 0.0

  if(rdd.count() > 0) {
    avgOfAll = rdd.filter({case (k, v) => (k == "all")}).map({case (k, v) => v}).collect()(0)
  }

  val finalAvg = avgOfAll

  val rddBool = rdd.map({case(k, v) => (k, v > finalAvg)})

  rddBool.filter({case(k, v) => (k != "all")})
}

A bit more Scala-like could be:

val outlierPairDStream = avgAll1h.union(avgPerPlug1h).transform{rdd => 

  val finalAvg = if(rdd.count() > 0) {
    rdd.filter({case (k, v) => (k == "all")}).map({case (k, v) => v}).collect()(0)
  } else { 0.0 }

  val rddBool = rdd.map({case(k, v) => (k, v > finalAvg)})

  rddBool.filter({case(k, v) => (k != "all")})
}
Sign up to request clarification or add additional context in comments.

1 Comment

This did it for me, I just tried to use the row "rddNew" WITHOUT the RETURN keyword, and it works now! You saved my day, thank you!

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.