71

Spark 2.0 (final) with Scala 2.11.8. The following super simple code yields the compilation error Error:(17, 45) Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.

import org.apache.spark.sql.SparkSession

case class SimpleTuple(id: Int, desc: String)

object DatasetTest {
  val dataList = List(
    SimpleTuple(5, "abc"),
    SimpleTuple(6, "bcd")
  )

  def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession.builder.
      master("local")
      .appName("example")
      .getOrCreate()

    val dataset = sparkSession.createDataset(dataList)
  }
}

3 Answers 3

104

Spark Datasets require Encoders for data type which is about to be stored. For common types (atomics, product types) there is a number of predefined encoders available but you have to import these first from SparkSession.implicits to make it work:

val sparkSession: SparkSession = ???
import sparkSession.implicits._
val dataset = sparkSession.createDataset(dataList)

Alternatively you can provide directly an explicit

import org.apache.spark.sql.{Encoder, Encoders}

val dataset = sparkSession.createDataset(dataList)(Encoders.product[SimpleTuple])

or implicit

implicit val enc: Encoder[SimpleTuple] = Encoders.product[SimpleTuple]
val dataset = sparkSession.createDataset(dataList)

Encoder for the stored type.

Note that Encoders also provide a number of predefined Encoders for atomic types, and Encoders for complex ones, can derived with ExpressionEncoder.

Further reading:

Sign up to request clarification or add additional context in comments.

2 Comments

Maybe this is obvious to scala developers, but i know more spark then scala... Why is using = ??? okay, and or better then actually declaring it?
@DanCiborowski-MSFT ??? is NotImplementedError - in other words fill the blank according to your requirements.
71

For other users (yours is correct), note that you it's also important that the case class is defined outside of the object scope. So:

Fails:

object DatasetTest {
  case class SimpleTuple(id: Int, desc: String)

  val dataList = List(
    SimpleTuple(5, "abc"),
    SimpleTuple(6, "bcd")
  )

  def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession.builder
      .master("local")
      .appName("example")
      .getOrCreate()
    val dataset = sparkSession.createDataset(dataList)
  }
}

Add the implicits, still fails with the same error:

object DatasetTest {
  case class SimpleTuple(id: Int, desc: String)

  val dataList = List(
    SimpleTuple(5, "abc"),
    SimpleTuple(6, "bcd")
  )

  def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession.builder
      .master("local")
      .appName("example")
      .getOrCreate()

    import sparkSession.implicits._
    val dataset = sparkSession.createDataset(dataList)
  }
}

Works:

case class SimpleTuple(id: Int, desc: String)

object DatasetTest {   
  val dataList = List(
    SimpleTuple(5, "abc"),
    SimpleTuple(6, "bcd")
  )

  def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession.builder
      .master("local")
      .appName("example")
      .getOrCreate()

    import sparkSession.implicits._
    val dataset = sparkSession.createDataset(dataList)
  }
}

Here's the relevant bug: https://issues.apache.org/jira/browse/SPARK-13540, so hopefully it will be fixed in the next release of Spark 2.

(Edit: Looks like that bugfix is actually in Spark 2.0.0... So I'm not sure why this still fails).

5 Comments

This still seems to be an issue in Spark 2.4
Worked for me though!
very important case class is defined outside of the object scope. Thanks!
For spark 3.0.1, it still fails and case class needs to be outside the object scope.
For those having this error using Zeppelin, declaring the case classes in one paragraph and creating the Dataset in the next one it will solve the issue. I had the error when I made both calls in the same paragraph.
-1

I'd clarify with an answer to my own question, that if the goal is to define a simple literal SparkData frame, rather than use Scala tuples and implicit conversion, the simpler route is to use the Spark API directly like this:

  import org.apache.spark.sql._
  import org.apache.spark.sql.types._
  import scala.collection.JavaConverters._

  val simpleSchema = StructType(
    StructField("a", StringType) ::
    StructField("b", IntegerType) ::
    StructField("c", IntegerType) ::
    StructField("d", IntegerType) ::
    StructField("e", IntegerType) :: Nil)

  val data = List(
    Row("001", 1, 0, 3, 4),
    Row("001", 3, 4, 1, 7),
    Row("001", null, 0, 6, 4),
    Row("003", 1, 4, 5, 7),
    Row("003", 5, 4, null, 2),
    Row("003", 4, null, 9, 2),
    Row("003", 2, 3, 0, 1)
  )

  val df = spark.createDataFrame(data.asJava, simpleSchema)

1 Comment

The goat was to create a dataset, not a dataframe.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.