2

I want to convert RDD to DataFrame using StructType. But item "Broken,Line," would cause error. Is there an elegant way to process record like this? Thanks.

import org.apache.spark.sql.types.{StructType, StructField, StringType}
import org.apache.spark.sql.Row

val mySchema = StructType(Array(
  StructField("colA", StringType, true),
  StructField("colB", StringType, true),
  StructField("colC", StringType, true)))
val x = List("97573,Start,eee", "9713,END,Good", "Broken,Line,")
val inputx = sc.parallelize(x).
| map((x:String) => Row.fromSeq(x.split(",").slice(0,mySchema.size).toSeq))
val df = spark.createDataFrame(inputx, mySchema)
df.show

Error would be like this:

Name: org.apache.spark.SparkException Message: Job aborted due to stage failure: Task 0 in stage 14.0 failed 1 times, most recent failure: Lost task 0.0 in stage 14.0 (TID 14, localhost, executor driver): java.lang.RuntimeException: Error while encoding: java.lang.ArrayIndexOutOfBoundsException: 2

I'm using:

  • Spark: 2.2.0
  • Scala: 2.11.8

And I ran the code in spark-shell.

0

1 Answer 1

2

Row.fromSeq on which we apply your schema throws the error that you are getting. Your third element in your list contains just 2 elements. You can't transform it into a Row with three elements unless you add a null value instead of the missing value.

And when creating your DataFrame, Spark is expecting 3 elements per Row on which to apply the schema, thus the error.

A quick and dirty solution would be to use scala.util.Try to get fields separately :

import org.apache.spark.sql.types.{StructType, StructField, StringType}
import org.apache.spark.sql.Row
import scala.util.Try

val mySchema = StructType(Array(StructField("colA", StringType, true), StructField("colB", StringType, true), StructField("colC", StringType, true)))

val l = List("97573,Start,eee", "9713,END,Good", "Broken,Line,")

val rdd = sc.parallelize(l).map {
 x => {
  val fields = x.split(",").slice(0, mySchema.size)
  val f1 = Try(fields(0)).getOrElse("")
  val f2 = Try(fields(1)).getOrElse("")
  val f3 = Try(fields(2)).getOrElse("")
  Row(f1, f2, f3)
 }
}

val df = spark.createDataFrame(rdd, mySchema)

df.show
// +------+-----+----+
// |  colA| colB|colC|
// +------+-----+----+
// | 97573|Start| eee|
// |  9713|  END|Good|
// |Broken| Line|    |
// +------+-----+----+

I wouldn't say that it's an elegant solution like you've asked. Parsing strings is never elegant ! You ought using the csv source to read it correctly (or spark-csv for < 2.x).

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.