2

I am trying a convert data type of some columns based on a case class.

val simpleDf = Seq(("James",34,"2006-01-01","true","M",3000.60),
                     ("Michael",33,"1980-01-10","true","F",3300.80),
                     ("Robert",37,"1995-01-05","false","M",5000.50)
                 ).toDF("firstName","age","jobStartDate","isGraduated","gender","salary")

// Output
simpleDf.printSchema()
root
|-- firstName: string (nullable = true)
|-- age: integer (nullable = false)
|-- jobStartDate: string (nullable = true)
|-- isGraduated: string (nullable = true)
|-- gender: string (nullable = true)
|-- salary: double (nullable = false)

Here I wanted to change the datatype of jobStartDate to Timestamp and isGraduated to Boolean. I am wondering if that conversion is possible using the case class? I am aware this can be done by casting each column but in my case, I need to map the incoming DF based on a case class defined.

case class empModel(firstName:String, 
                       age:Integer, 
                       jobStartDate:java.sql.Timestamp, 
                       isGraduated:Boolean, 
                       gender:String,
                       salary:Double
                      )

val newDf = simpleData.as[empModel].toDF
newDf.show(false)

I am getting errors because of the string to timestamp conversation. Is there any workaround?

4
  • in fact I receive those files from another application and all these columns are of string type. To change the column type in the DF, I need to manually operate on each DF and I have almost 50 data frame where I need to modify the some column data type. so wondering if that can be handled dynamically. Looking at the scala reflection but not much help. Commented Feb 19, 2021 at 18:57
  • Does this answer your question? How to pass case class as a variable into ScalaReflection Commented Feb 19, 2021 at 20:21
  • I checked the reflection but it is not the solution I am looking at. There is an answer posted that converts the datatype properly when I do the printSchema, but when i do df.show() then it throws exception. Commented Feb 19, 2021 at 21:13
  • For timestamp conversion, newDf.withColumn("jobStartDate", to_timestamp($"jobStartDate", "yyyy-MM-dd")) works fine for me (on Spark 2.4). Commented Feb 19, 2021 at 22:03

1 Answer 1

4

You can generate the schema from the case class using ScalaReflection:

import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.catalyst.ScalaReflection


val schema = ScalaReflection.schemaFor[empModel].dataType.asInstanceOf[StructType]

Now, you can pass this schema when you load your files into dataframe.

Or if you prefer to cast some or all columns after you read the dataframe, you can iterate the schema fields and cast into corresponding data type. By using foldLeft for example :

val df = schema.fields.foldLeft(simpleDf){ 
  (df, s) => df.withColumn(s.name, df(s.name).cast(s.dataType))     
}

df.printSchema

//root
// |-- firstName: string (nullable = true)
// |-- age: integer (nullable = true)
// |-- jobStartDate: timestamp (nullable = true)
// |-- isGraduated: boolean (nullable = false)
// |-- gender: string (nullable = true)
// |-- salary: double (nullable = false)

df.show
//+---------+---+-------------------+-----------+------+------+
//|firstName|age|       jobStartDate|isGraduated|gender|salary|
//+---------+---+-------------------+-----------+------+------+
//|    James| 34|2006-01-01 00:00:00|       true|     M|3000.6|
//|  Michael| 33|1980-01-10 00:00:00|       true|     F|3300.8|
//|   Robert| 37|1995-01-05 00:00:00|      false|     M|5000.5|
//+---------+---+-------------------+-----------+------+------+
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.