0
val tableDF = spark.read.option("delimiter",",").csv("/Volumes/Data/ap/click/test.csv")
import org.apache.spark.sql.types.{StringType, StructField, StructType, IntegerType}

val schemaTd = StructType(List(StructField("time_id",IntegerType),StructField("week",IntegerType),StructField("month",IntegerType),StructField("calendar",StringType)))

val result = spark.createDataFrame(tableDF,schemaTd)

test.csv data sample below

6659,951,219,2018-03-25 00:00:00
6641,949,219,2018-03-07 00:00:00
6645,949,219,2018-03-11 00:00:00
6638,948,219,2018-03-04 00:00:00
6646,950,219,2018-03-12 00:00:00
6636,948,219,2018-03-02 00:00:00
6643,949,219,2018-03-09 00:00:00

all the columns except last value are Int type in the file still getting an error

scala> result.show
2018-05-20 17:08:54 ERROR Executor:91 - Exception in task 0.0 in stage 1.0 (TID 1)
java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.lang.String is not a valid external type for schema of int
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, time_id), IntegerType) AS time_id#23
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, week), IntegerType) AS week#24
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, month), IntegerType) AS month#25
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, calendar), StringType), true, false) AS calendar#26
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:291)
    at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:589)
    at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:589)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: java.lang.String is not a valid external type for schema of int
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.If$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:288)

1 Answer 1

1

In this case you should provide schema to DataFrameReader:

import org.apache.spark.sql.types._

val schemaTd = StructType(List(
   StructField("time_id",IntegerType),
   StructField("week",IntegerType),
   StructField("month",IntegerType),
   StructField("calendar",StringType)))

val tableDF = spark.read.option("delimiter",",")
  .schema(schemaTd)
  .csv("/Volumes/Data/ap/click/test.csv")

When Dataset is created from RDD[Row] (I assume your actual code is spark.createDataFrame(tableDF.rdd, schemaTd), otherwise it shouldn't really compile), types have to be consistent with schema. You cannot provide String (default type for CSV reader) and declare schema with IntegerType.

Sign up to request clarification or add additional context in comments.

1 Comment

Great that helped. So schema can't be changed once declared in .schema? What if i need to change the datatypes of columns? How can I do that?

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.