1

I'm trying to create a Dataset from a RDD of type T, which is known to be a case class, passed as parameter of my function. Problem is, implicits Encoders do not apply here. How should I set my type parameter to be able to create a Dataset ?

I've tried to set T as T: ClassTag or use implicit ClassTag but it didn't help. If I use this code providing the Type, it works, so no problem with the specific class type I want to pass (basic case class).

In my use case, I do other things in the function but here is the basic problem.

def createDatasetFromRDD[T](rdd: RDD[T])(implicit tag: ClassTag[T]): Dataset[T] = {
  // Convert RDD to Dataset
  sparkSession.createDataset(rdd)
}

I get the error message :

error: Unable to find encoder for type T. An implicit Encoder[T] is needed to store T instances in a Dataset.
 Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.

Any help or suggestion ?

EDIT :

T is known to be a case class. I know case classes can use the product Encoder, so I basically want to let scala know it can use this one. Kryo sounds good but does not provide advantages of product Encoder.

3
  • Why don't you ask for an Encoder[T] instead? Commented Aug 1, 2019 at 15:09
  • An implicit Encoder exists for case classes, so that I don't have to provide an Encoder. Problem here is that T is not indentified as a case class (but it is !) so the Encoder is not provided automatically. My question is about not having to provide my own custom encoder, and use type parameter in a way that allows spark to provide produtEncoder. If you have another solution than kode, I'd be interesting to hear about it, so don't hesitate to answer, I'm pretty sure there is multiple ways to do this ! Commented Aug 2, 2019 at 8:09
  • 2
    I was about to type the same answer you had. Anyways, as I said, you simple can do def f[T: Encoder](rdd: RDD[T]): Dataset[T] and the caller is the one with the responsability of having the encoder on the implicit scope, which if will be a case class, a simple import spark.implicits._ will be enough. And if not, then the user is the one that has to provide the kryo encoder. Commented Aug 2, 2019 at 13:03

2 Answers 2

4

I searched and found the solution without using Kryo when you know Product Encoder should be enough.

TLDR

def createDatasetFromRDD[T <: Product : TypeTag](rdd: RDD[T]): Dataset[T] = {
  // Convert RDD to Dataset
  sparkSession.createDataset(rdd)(Encoders.product[T])
}

Explanation :

Kryo has some disadvantages that are explained here. Instead, why not use the Product encoder, that is actually the one that spark uses for case classes ?

So if I go :

  sparkSession.createDataset(rdd)(Encoders.product[T])

I get error type arguments [T] do not conform to method product's type parameter bounds [T <: Product]. Alright then, let's mention Product :

def createDatasetFromRDD[T <: Product](rdd: RDD[T]): Dataset[T]

Now I got No TypeTag available for T. That's ok, let's put a TypeTag !

def createDatasetFromRDD[T <: Product : TypeTag](rdd: RDD[T]): Dataset[T]

And that's it ! Now you can provide a case class type to this function and the product Encoder will be used without any other code needed. In case your class doesn't apply to [T <: Product] then you may want to look into kode's answer.

EDIT

As commented by Luis Miguel Mejía Suárez, another solution is to provide an encoder like this:

def createDatasetFromRDD[T : Encoder](rdd: RDD[T]): Dataset[T]

and the caller is the one with the responsability of having the encoder on the implicit scope, which if will be a case class, a simple import spark.implicits._ will be enough. And if not, then the user is the one that has to provide the kryo encoder.

Sign up to request clarification or add additional context in comments.

Comments

2

This article has nice explanation, to understand compiler warning and also a solution to fix the problem. It covers what, why and how.

In short, this should fix your problem:

implicit def kryoEncoder[T](implicit ct: ClassTag[T]) = org.apache.spark.sql.Encoders.kryo[T](ct)

def createDatasetFromRDD[T](rdd: RDD[T]): Dataset[T] = {
    // Convert RDD to Dataset
    sparkSession.createDataset(rdd)
}

1 Comment

I'm pretty sure this is the 100th time I read this article but when I need it, I forgot about it... Thanks a lot ! Still, kryo Encoder has limitation (tuple workaround in the article) and it's quite painful when you know there is a product Encoder that exists. This is a good answer but I hope there is some better way... I'll look into it !

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.