6

I have below Scala Spark code base, which works well, but should not.

The 2nd column has data of mixed types, whereas in Schema I have defined it of IntegerType. My actual program has over 100 columns, and keep deriving multiple child DataFrames after transformations.

How can I validate that contents of RDD or DataFrame fields have correct datatype values, and thus ignore invalid rows or change contents of column to some default value. Any more pointers for data quality checks with DataFrame or RDD are appreciated.

var theSeq = Seq(("X01", "41"),
    ("X01", 41),
    ("X01", 41),
    ("X02", "ab"),
    ("X02", "%%"))

val newRdd = sc.parallelize(theSeq)
val rowRdd = newRdd.map(r => Row(r._1, r._2))

val theSchema = StructType(Seq(StructField("ID", StringType, true),
    StructField("Age", IntegerType, true)))
val theNewDF = sqc.createDataFrame(rowRdd, theSchema)
theNewDF.show()  
0

2 Answers 2

11

First of all passing schema is simply a way to avoid type inference. It is not validated or enforced during DataFrame creation. On a side note I wouldn't describe ClassCastException as working well. For a moment I thought you actually found a bug.

I think the important question is how you get data like theSeq / newRdd in the first place. Is it something you parse by yourself, is it received from an external component? Simply looking at the type (Seq[(String, Any)] / RDD[(String, Any)] respectively) you already know it is not a valid input for a DataFrame. Probably the way to handle things at this level is to embrace static typing. Scala provides quite a few neat ways to handle unexpected conditions (Try, Either, Option) where the last one is the simplest one, and as a bonus works well with Spark SQL. Rather simplistic way to handle things could look like this

def validateInt(x: Any) = x match {
  case x: Int => Some(x)
  case _ => None
}

def validateString(x: Any) = x match { 
  case x: String => Some(x)
  case _ => None
}

val newRddOption: RDD[(Option[String], Option[Int])] = newRdd.map{
  case (id, age) => (validateString(id), validateInt(age))}

Since Options can be easily composed you can add additional checks like this:

def validateAge(age: Int) = {
  if(age >= 0 && age < 150) Some(age)
  else None
}

val newRddValidated: RDD[(Option[String], Option[Int])] = newRddOption.map{
  case (id, age) => (id, age.flatMap(validateAge))}

Next instead of Row which is a very crude container I would use cases classes:

case class Record(id: Option[String], age: Option[Int])

val records: RDD[Record] = newRddValidated.map{case (id, age) => Record(id, age)}

At this moment all you have to do is call toDF:

import org.apache.spark.sql.DataFrame

val df: DataFrame = records.toDF
df.printSchema

// root
//  |-- id: string (nullable = true)
//  |-- age: integer (nullable = true)

This was the hard but arguably a more elegant way. A faster is to let SQL casting system to do a job for you. First lets convert everything to Strings:

val stringRdd: RDD[(String, String)] = sc.parallelize(theSeq).map(
  p => (p._1.toString, p._2.toString))

Next create a DataFrame:

import org.apache.spark.sql.types._
import org.apache.spark.sql.Column
import org.apache.spark.sql.functions.col

val df: DataFrame = stringRdd.toDF("id", "age")

val expectedTypes = Seq(StringType, IntegerType)
val exprs: Seq[Column] = df.columns.zip(expectedTypes).map{
  case (c, t) => col(c).cast(t).alias(c)}

val dfProcessed: DataFrame = df.select(exprs: _*)

And the result:

dfProcessed.printSchema

// root
//  |-- id: string (nullable = true)
//  |-- age: integer (nullable = true)


dfProcessed.show

// +---+----+
// | id| age|
// +---+----+
// |X01|  41|
// |X01|  41|
// |X01|  41|
// |X02|null|
// |X02|null|
// +---+----+
Sign up to request clarification or add additional context in comments.

2 Comments

The code I posted was only to prove issue I have....Thanks a lot for the reply....Although the 1st solution is elegant, but hardly works when its 125 columns in a file...and the file being dynamic in nature...even case class wouldnt work....thus Row being the only solution.... The casting solution is appropriate in my case.. Thank you very much. SS
Thats why have metaprogramming. Arguably there better languages where it comes more naturally than in Scala but it is perfectly possible.
0

In version 1.4 or older

import org.apache.spark.sql.execution.debug._
theNewDF.typeCheck

It was removed via SPARK-9754 though. I haven't checked but I think typeCheck becomes sqlContext.debug beforehand

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.