0

Following test for Dataset comparison is failing with the error:

Error:(55, 38) Unable to find encoder for type org.apache.spark.sql.Dataset[(String, Long)]. An implicit Encoder[org.apache.spark.sql.Dataset[(String, Long)]] is needed to store org.apache.spark.sql.Dataset[(String, Long)] instances in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.
    ).toDF("lower(word)", "count").as[Dataset[(String, Long)]]
Error:(55, 38) not enough arguments for method as: (implicit evidence$2: org.apache.spark.sql.Encoder[org.apache.spark.sql.Dataset[(String, Long)]])org.apache.spark.sql.Dataset[org.apache.spark.sql.Dataset[(String, Long)]].
Unspecified value parameter evidence$2.
    ).toDF("lower(word)", "count").as[Dataset[(String, Long)]]

Test

As you can see, I tried creating the Kryo Encoder for (String, Long)

class WordCountDSAppTestSpec extends FlatSpec with SparkSessionTestWrapper with DatasetComparer {

  import spark.implicits._

  "countWords" should "return count of each word" in {

    val wordsDF = Seq(
      ("one", "one"),
      ("two", "two"),
      ("three Three", "three"),
      ("three Three", "Three"),
      ("", "")
    ).toDF("line", "word").as[LineAndWord]

    implicit val tupleEncoder = org.apache.spark.sql.Encoders.kryo[(String, Long)]
    val expectedDF = Seq(
      ("one", 1L),
      ("two", 1L),
      ("three", 2L)
    ).toDF("lower(word)", "count").as[Dataset[(String, Long)]]

    val actualDF = WordCountDSApp.countWords(wordsDF)

    assertSmallDatasetEquality(actualDF, expectedDF, orderedComparison = false)
  }
}

Spark App under test

import com.aravind.oss.Logging
import com.aravind.oss.eg.wordcount.spark.WordCountUtil.{WhitespaceRegex, getClusterCfg, getPaths, getSparkSession}
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.functions.{explode, split}

object WordCountDSApp extends App with Logging {
  logInfo("WordCount with Dataset API and multiple Case classes")
  val paths = getPaths(args)
  val cluster = getClusterCfg(args)

  if (paths.size > 1) {
    logInfo("More than one file to process")
  }
  logInfo("Path(s): " + paths)
  logInfo("Cluster: " + cluster)

  val spark = getSparkSession("WordCountDSApp", cluster)

  import spark.implicits._

  /*
  case class <code>Line<code> SHOULD match the number of columns in the input file
   */
  val linesDs: Dataset[Line] = spark.read
    .textFile(paths: _*)
    .toDF("line")
    .as[Line]
  logInfo("Dataset before splitting line")
  linesDs.show(false)

  /*
  <code>toWords<code> adds additional column (word) to the output so we need a
  new case class <code>LineAndWord</code> that contains two properties to represent two columns.
  The names of the properties should match the name of the columns as well.
   */
  val wordDs: Dataset[LineAndWord] = toWords(linesDs)
  logInfo("Dataset after splitting the line into words")
  wordDs.show(false)

  val wordCount = countWords(wordDs)
  wordCount
    .orderBy($"count(1)".desc)
    .show(false)

  def toWords(linesDs: Dataset[Line]): Dataset[LineAndWord] = {
    import linesDs.sparkSession.implicits._
    linesDs
      .select($"line",
        explode(split($"line", WhitespaceRegex)).as("word"))
      .as[LineAndWord]
  }

  def countWords(wordsDs: Dataset[LineAndWord]): Dataset[(String, Long)] = {
    import wordsDs.sparkSession.implicits._
    val result = wordsDs
      .filter(_.word != null)
      .filter(!_.word.isEmpty)
      .groupByKey(_.word.toLowerCase)
      .count()

    result
  }

  case class Line(line: String)

  case class LineAndWord(line: String, word: String)

}

1 Answer 1

1

You should call as[Something], not .as[Dataset[Something]]. Here is working version:


"countWords" should "return count of each word" in {
  import org.apache.spark.sql.{Encoder, Encoders}
  import spark.implicits._
  implicit def tuple2[A1, A2](implicit e1: Encoder[A1],
                              e2: Encoder[A2]): Encoder[(A1, A2)] =
    Encoders.tuple[A1, A2](e1, e2)

  val expectedDF = Seq(("one", 1L), ("two", 1L), ("three", 2L))
    .toDF("value", "count(1)")
    .as[(String, Long)]

  val wordsDF1 = Seq(
    ("one", "one"),
    ("two", "two"),
    ("three Three", "three"),
    ("three Three", "Three"),
    ("", "")
  ).toDF("line", "word").as[LineAndWord]

  val actualDF = WordCountDSApp.countWords(wordsDF1)
  actualDF.show()
  expectedDF.show()

  assertSmallDatasetEquality(actualDF, expectedDF, orderedComparison = false)
}
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.