0

I am developing Spark using Scala, and I don't have any background of Scala. I don't get the ValueError Yet, but I am preparing the ValueError Handler for my code.

|location|arrDate|deptDate|
|JFK     |1201   |1209    |
|LAX     |1208   |1212    |
|NYC     |       |1209    |
|22      |1201   |1209    |
|SFO     |1202   |1209    |

If we have data like this, I would like to store Third row and Fourth row into Error.dat then process the fifth row again. In the error log, I would like to put the information of the data such as which file, the number of the row, and details of error. For logger, I am using log4j now.

What is the best way to implement that function? Can you guys help me?

2
  • on what conditions 3'rd and 4'th row are rejected to error files? Commented Feb 26, 2017 at 4:15
  • @rogue-one from 3'rd arrDate is empty, and the 4'th row location data should be String. These are the condition of rejection. Commented Feb 26, 2017 at 4:19

1 Answer 1

1

I am assuming all the three columns are type String. in that case I would solve this using the below snippet. I have created two udf to check for the error records.

  • if a field is has only numeric characters [isNumber]
  • and if the string field is empty [isEmpty]

code snippet

 import org.apache.spark.sql.functions.row_number
 import org.apache.spark.sql.expressions.Window
 import org.apache.spark.sql.functions.udf

 val df = rdd.zipWithIndex.map({case ((x,y,z),index) => (index+1,x,y,z)}).toDF("row_num", "c1", "c2", "c3")
 val isNumber = udf((x: String) => x.replaceAll("\\d","") == "")
 val isEmpty = udf((x: String) => x.trim.length==0)
 val errDF = df.filter(isNumber($"c1") || isEmpty($"c2"))
 val validDF = df.filter(!(isNumber($"c1") || isEmpty($"c2")))


scala> df.show()
+-------+---+-----+-----+
|row_num| c1|   c2|   c3|
+-------+---+-----+-----+
|      1|JFK| 1201| 1209|
|      2|LAX| 1208| 1212|
|      3|NYC|     | 1209|
|      4| 22| 1201| 1209|
|      5|SFO| 1202| 1209|
+-------+---+-----+-----+

scala> errDF.show()
+-------+---+----+----+
|row_num| c1|  c2|  c3|
+-------+---+----+----+
|      3|NYC|    |1209|
|      4| 22|1201|1209|
+-------+---+----+----+
Sign up to request clarification or add additional context in comments.

5 Comments

Thanks for the brilliant suggestion. Another question is what if I want to check all columns if there is any empty columns?
This is what I did to check all columns.
@BryanK. I missed your comment. you can apply the same udf to all columns to check for blank string
val columnNames = Seq("row_num","c1","c2","c3") var errDF = df.filter(isEmpty(columnNames.head, columnNames.tail: _*)), but it did not work. Is there any way to check multiple columns in once?
I created the sequence because I have 46 columns, and I don't want to run it 46 times.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.