3

I have some incoming data as rowValues, I will have to apply a particular schema and create a data frame , here is my code:

val rowValues = List("12","F","1980-10-11,1980-10-11T10:10:20")
val rdd = sqlContext.sparkContext.parallelize(Seq(rowValues))
val rowRdd = rdd.map(v => Row(v: _*))

var fieldSchema = ListBuffer[StructField]()

fieldSchema += StructField("C0", IntegerType, true, null)
fieldSchema += StructField("C1", StringType, true, null)
fieldSchema += StructField("C2", TimestampType, true, null)
val schema = StructType(fieldSchema.toList)

val newRow = sqlContext.createDataFrame(rowRdd, schema)
newRow.printSchema()   // new schema prints here
newRow.show()   // This fails with ClassCast exception

This fails with org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 16.0 failed 1 times, most recent failure: Lost task 0.0 in stage 16.0 (TID 16, localhost): java.lang.ClassCastException: java.lang.String cannot be cast to java.sql.Timestamp

How do I apply this schema?

2 Answers 2

3

Rather than applying schema you can cast the column in dataframe as your schema

You can use withColumn to with cast function to change the datatype of column

Below is the simple example

import spark.implicits._

val df = spark.sparkContext.parallelize(Seq(
  ("12","F","1980-10-11T10:10:20"),
  ("12","F","1980-10-11T10:10:20")
)).toDF("c0", "c1", "c2")

val newDf = df.withColumn("c0", df("c0").cast(IntegerType))
  .withColumn("c2", df("c2").cast(TimestampType)) 
//cast string date to timestamp

val newDf = df.withColumn("c0", df("c0").cast(IntegerType))
  .withColumn("c2", to_utc_timestamp(df("c2"), "yyyy-MM-dd HH:mm:ss"))
//to_utc_timestamp creates a timestamp form given column and date format

newDf.show(false)

newDf.printSchema()

Hope this helps!

Sign up to request clarification or add additional context in comments.

2 Comments

Thanks very much, this has helped , but what if the schema is of the following type:?In that case, I am not able to do typecast. |-- C2: struct (nullable = true) | |-- Gender: string (nullable = true)
Yes, I would like to use my own schema too, the above withColumn/cast worked for primitive types , but how about custom schema, this schema will be known only at run time ... |-- C0: integer (nullable = true) |-- C1: integer (nullable = true) |-- C2: struct (nullable = true) | |-- Gender: string (nullable = true)
1

your input data are all strings but your schema for c0 is integer, c1 is string and c2 is timestamp, thus you are getting casting errors. Your string timestamp looks even more complicated.

If you are just looking for getting a dataframe you should change all columns datatypes to string and it will work

fieldSchema += StructField("C0", StringType, true, null)
fieldSchema += StructField("C1", StringType, true, null)
fieldSchema += StructField("C2", StringType, true, null)

you should have

+---+---+------------------------------+
|C0 |C1 |C2                            |
+---+---+------------------------------+
|12 |F  |1980-10-11,1980-10-11T10:10:20|
+---+---+------------------------------+

If you insist on staying with your schema, following code should give better ideas

val rowValues = List("12","F","1980-10-11,1980-10-11T10:10:20")
val rdd = sqlContext.sparkContext.parallelize(Seq(rowValues))

val rowRdd = rdd.map(v => Row(v(0).toInt, v(1), v(2).split(",")(1).replace("T", " ")))

var fieldSchema = ListBuffer[StructField]()

fieldSchema += StructField("C0", IntegerType, true)
fieldSchema += StructField("C1", StringType, true)
fieldSchema += StructField("C2", StringType, true)
val schema = StructType(fieldSchema.toList)

val newRow = sqlContext.createDataFrame(rowRdd, schema).withColumn("C2", unix_timestamp(col("C2")))
newRow.printSchema()   // new schema prints here
newRow.show(false)

You can also do it with case class as

import sqlContext.implicits._

def convertToDate(dateTime: String): Timestamp = {
  val formatter = new SimpleDateFormat("yyyy-mm-dd hh:mm:ss")
  val utilDate = formatter.parse(dateTime)
  new Timestamp(utilDate.getTime)
}
val rowValues = List("12","F","1980-10-11 10:10:20")
val rdd = sqlContext.sparkContext.parallelize(Seq(rowValues))

val rowRdd = rdd.map(v => Pratap(v(0).toInt, v(1), convertToDate(v(2))))

val newRow = rowRdd.toDF
newRow.printSchema()   
newRow.show(false)

And your case class should be outside the main class as

case class Pratap(C0: Int, C1: String, C2: java.sql.Timestamp)

5 Comments

thanks for the quick reply, but I want this schema only-c0 is integer, c1 is string and c2 is timestamp, is there a way to create dataframe with this schema?
Thanks , I would like to use my own schema too, the above withColumn/cast worked for primitive types , but how about custom schema, this schema will be known only at run time ... |-- C0: integer (nullable = true) |-- C1: integer (nullable = true) |-- C2: struct (nullable = true) | |-- Gender: string (nullable = true)
updated my answer. :) if it still doesn't answer what you are looking for please explain what dynamic means
Thanks But I can’t define the case class right away since the schema will be known only at run time. There should be a way to generate this depending on the incoming schema,. Another thing is the column types might not be primitive ones, there will be custom data types like AType, BType etc instead of int,string. So Is there a way to apply these custom data types?
can you elaborate on how you are creating dynamic schema ? you can update your question. :)

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.