1

I have a bunch of columns, sample like my data displayed as show below. I need to check the columns for errors and will have to generate two output files. I'm using Apache Spark 2.0 and I would like to do this in a efficient way.

Schema Details
---------------
EMPID - (NUMBER)
ENAME - (STRING,SIZE(50))
GENDER - (STRING,SIZE(1))

Data
----
EMPID,ENAME,GENDER
1001,RIO,M
1010,RICK,MM
1015,123MYA,F

My excepected output files should be as shown below:

1.
EMPID,ENAME,GENDER
1001,RIO,M
1010,RICK,NULL
1015,NULL,F

2.
EMPID,ERROR_COLUMN,ERROR_VALUE,ERROR_DESCRIPTION
1010,GENDER,"MM","OVERSIZED"
1010,GENDER,"MM","VALUE INVALID FOR GENDER"
1015,ENAME,"123MYA","NAME SHOULD BE A STRING"

Thanks

1 Answer 1

8

I have not really worked with Spark 2.0, so I'll try answering your question with a solution in Spark 1.6.

 // Load you base data
val input  = <<you input dataframe>>

//Extract the schema of your base data
val originalSchema = input.schema

// Modify you existing schema with you additional metadata fields
val modifiedSchema= originalSchema.add("ERROR_COLUMN", StringType, true)
                                  .add("ERROR_VALUE", StringType, true)
                                  .add("ERROR_DESCRIPTION", StringType, true)

// write a custom validation function                                 
def validateColumns(row: Row): Row = {

var err_col: String = null
var err_val: String = null
var err_desc: String = null
val empId = row.getAs[String]("EMPID")
val ename = row.getAs[String]("ENAME")
val gender = row.getAs[String]("GENDER")

// do checking here and populate (err_col,err_val,err_desc) with values if applicable

Row.merge(row, Row(err_col),Row(err_val),Row(err_desc))
}

// Call you custom validation function
val validateDF = input.map { row => validateColumns(row) }  

// Reconstruct the DataFrame with additional columns                      
val checkedDf = sqlContext.createDataFrame(validateDF, newSchema)

// Filter out row having errors
val errorDf = checkedDf.filter($"ERROR_COLUMN".isNotNull && $"ERROR_VALUE".isNotNull && $"ERROR_DESCRIPTION".isNotNull)

// Filter our row having no errors
val errorFreeDf = checkedDf.filter($"ERROR_COLUMN".isNull && !$"ERROR_VALUE".isNull && !$"ERROR_DESCRIPTION".isNull)

I have used this approach personally and it works for me. I hope it points you in the right direction.

Sign up to request clarification or add additional context in comments.

1 Comment

If it works for you can you please accept the answer? Meanwhile I'll give a thought to you problem, if I can come up with something

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.