0

I need to replace my Dataframe field's blank records to "0"

Here is my code -->

import sqlContext.implicits._

case class CInspections (business_id:Int, score:String, date:String, type1:String)

val baseDir = "/FileStore/tables/484qrxx21488929011080/"
val raw_inspections = sc.textFile (s"$baseDir/inspections_plus.txt")
val raw_inspectionsmap = raw_inspections.map ( line => line.split ("\t"))
val raw_inspectionsRDD = raw_inspectionsmap.map ( raw_inspections => CInspections (raw_inspections(0).toInt,raw_inspections(1), raw_inspections(2),raw_inspections(3)))
val raw_inspectionsDF = raw_inspectionsRDD.toDF
raw_inspectionsDF.createOrReplaceTempView ("Inspections")
raw_inspectionsDF.printSchema
raw_inspectionsDF.show()

I am using case class and then converting to Dataframe. But I need "score" as Int as I have to perform some operations and sort it. But if I declare it as score:Int then I am getting error for blank values.

java.lang.NumberFormatException: For input string: "" 

+-----------+-----+--------+--------------------+
|business_id|score|    date|               type1|
+-----------+-----+--------+--------------------+
|         10|     |20140807|Reinspection/Foll...|
|         10|   94|20140729|Routine - Unsched...|
|         10|     |20140124|Reinspection/Foll...|
|         10|   92|20140114|Routine - Unsched...|
|         10|   98|20121114|Routine - Unsched...|
|         10|     |20120920|Reinspection/Foll...|
|         17|     |20140425|Reinspection/Foll...|
+-----------+-----+--------+--------------------+

I need score field as Int because for the below query, it sort as String not Int and giving wrong result

sqlContext.sql("""select raw_inspectionsDF.score  from raw_inspectionsDF where score <>"" order by score""").show()

+-----+
|score|
+-----+
|  100|
|  100|
|  100|
+-----+

1 Answer 1

1

Empty string can't be converted to Integer, you need to make the Score nullable so that if the field is missing, it is represented as null, you can try the following:

import scala.util.{Try, Success, Failure}

1) Define a customized parse function which returns None, if the string can't be converted to an Int, in your case empty string;

def parseScore(s: String): Option[Int] = {
  Try(s.toInt) match {
    case Success(x) => Some(x)
    case Failure(x) => None
  }
}

2) Define the score field in your case class to be an Option[Int] type;

case class CInspections (business_id:Int, score: Option[Int], date:String, type1:String)

val raw_inspections = sc.textFile("test.csv")
val raw_inspectionsmap = raw_inspections.map(line => line.split("\t"))

3) Use the customized parseScore function to parse the score field;

val raw_inspectionsRDD = raw_inspectionsmap.map(raw_inspections => 
    CInspections(raw_inspections(0).toInt, parseScore(raw_inspections(1)), 
                 raw_inspections(2),raw_inspections(3)))

val raw_inspectionsDF = raw_inspectionsRDD.toDF
raw_inspectionsDF.createOrReplaceTempView ("Inspections")

raw_inspectionsDF.printSchema
//root
// |-- business_id: integer (nullable = false)
// |-- score: integer (nullable = true)
// |-- date: string (nullable = true)
// |-- type1: string (nullable = true)

raw_inspectionsDF.show()

+-----------+-----+----+-----+
|business_id|score|date|type1|
+-----------+-----+----+-----+
|          1| null|   a|    b|
|          2|    3|   s|    k|
+-----------+-----+----+-----+

4) After parsing the file correctly, you can easily replace null value with 0 using na functions fill:

raw_inspectionsDF.na.fill(0).show
+-----------+-----+----+-----+
|business_id|score|date|type1|
+-----------+-----+----+-----+
|          1|    0|   a|    b|
|          2|    3|   s|    k|
+-----------+-----+----+-----+
Sign up to request clarification or add additional context in comments.

4 Comments

Thank you so much for your prompt reply! Its working now. :)
Can I write sql query inside sqlContext.sql as below? I am getting error for below query-> sqlContext.sql("""select CBusinesses.BUSINESS_ID,CBusinesses.name, CBusinesses.address, CBusinesses.city, CBusinesses.postal_code, CBusinesses.latitude, CBusinesses.longitude, Inspections_notnull.score from CBusinesses, Inspections_notnull where Inspections_notnull.score <>0 and CBusinesses.BUSINESS_ID=Inspections_notnull.BUSINESS_ID """).show() java.lang.NumberFormatException: For input string: ""
Don't really know the answer, but it seems you are trying to combine two tables, maybe you want a join?
Yes. I want to calculate Which 10 businesses got lowest scores?" I have 2 tables- "Businesss" and "inspections" with business_id common key. it works fine with sql but if I use the same query in spark, it doesn't work. How Can i join 2 tables with one column and calculate top score using Spark Sql? I tried //val df = businessesDF.join(raw_inspectionsDF, businessesDF.col("BUSINESS_ID") == raw_inspectionsDF.col("BUSINESS_ID")) also but its also giving error

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.