0

I've added following code:

var counters: Map[String, Int] = Map()
val results = rdd.filter(l => l.contains("xyz")).map(l => mapEvent(l)).filter(r => r.isDefined).map (
          i => {
            val date = i.get.getDateTime.toString.substring(0, 10)
            counters = counters.updated(date, counters.getOrElse(date, 0) + 1)
          }
        )

I want to get counts for different dates in the RDD in one single iteration. But when I run this I get message saying:

No implicits found for parameters evidence$6: Encoder[Unit]

So I added this line:

  implicit val myEncoder: Encoder[Unit] = org.apache.spark.sql.Encoders.kryo[Unit]

But then I get this error.

Exception in thread "main" java.lang.ExceptionInInitializerError
    at com.xyz.SparkBatchJob.main(SparkBatchJob.scala)
Caused by: java.lang.UnsupportedOperationException: Primitive types are not supported.
    at org.apache.spark.sql.Encoders$.genericSerializer(Encoders.scala:200)
    at org.apache.spark.sql.Encoders$.kryo(Encoders.scala:152)

How do I fix this? OR Is there a better way to get the counts I want in a single iteration (O(N) time)?

2
  • Don´t put a mutable variable inside the anonymous function of the transformation, just collect the result of the first transformation: i.get.getDateTime.toString.substring(0, 10) Commented Mar 15, 2021 at 17:51
  • @EmiCareOfCell44 - Not sure what you mean. Example please. Thanks. Commented Mar 15, 2021 at 20:16

1 Answer 1

1

A Spark RDD is a representation of a distributed collection. When you apply a map function to an RDD, the function that you use to manipulate the collection is going to be executed across the cluster so there is no sense in mutating a variable created out of the scope of the map function.

In your code, the problem is because you don´t return any value, instead you are trying to mutate a structure and for that reason the compiler infers that the new created RDD after the transformation is a RDD[Unit].

If you need to create a Map as a result of a Spark action you must create a pairRDD and then apply the reduce operation.

Include the type of the rdd and the mapEvent function to see how it could be done.

Spark builds a DAG with the transformation and the action, it does not process the data twice.

Sign up to request clarification or add additional context in comments.

1 Comment

Thanks. That makes sense but then what's the answer to the 2nd part of my question: "Is there a better way to get the counts in a single iteration?"

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.