1

I have csv file with hundreds of columns. So I represented values in the file explicitly to show as floats, but Spark infers as double. This is troubling as data size is huge and casting again on all columns is to be avoided. Though I could not find any clue upon searching, I was wondering if there is a solution to this problem. I am using Spark 3.3 and the issue is demonstrated below:

$ cat test.csv
Word    Wt1     Wt2
hello   1.0F    2.0F
hi      2.0F    4.0F

In spark-shell:

scala> val x = 2.0F
val x: Float = 2.0

scala> val df = sqlContext.read.format("csv").option("delimiter", "\t").option("header", "true").option("inferSchema", "true").csv("test.csv")
val df: org.apache.spark.sql.DataFrame = [Word: string, Wt1: double ... 1 more field]

scala> df.show()
+-----+---+---+
| Word|Wt1|Wt2|
+-----+---+---+
|hello|1.0|2.0|
|   hi|2.0|4.0|
+-----+---+---+

scala> df.dtypes
val res6: Array[(String, String)] = Array((Word,StringType), (Wt1,DoubleType), (Wt2,DoubleType))

PS: The use case of mine has strings in the first 5 columns and the rest are all floats. However, the exact number of columns is not known apriori.

PS2: Upon inclusion of the suggested solution by @leleogere that worked perfectly in spark-shell, overload errors were found. The reason was that the two column lists were created as ListBuffers that needed to be converted to Array type.

1 Answer 1

1

You can manually specify the schema to the CSV reader instead of letting it inferring it by itself:

import org.apache.spark.sql.types.{StructType, StructField, StringType, FloatType}

val numberOfStringColumns = 1  // 5 in your case

val columnsNames = spark.read.option("delimiter", "\t").option("header", "true").csv("test.csv").columns
// I don't think that the above live is a problem because
// no data is actually read as we don't call any action

val (stringCols, floatCols) = columnsNames.splitAt(numberOfStringColumns)
// the above line assumes that the string columns are the first ones of the dataframe

val schema = StructType(
  stringCols.map(c => StructField(c, StringType)) ++
  floatCols.map(c => StructField(c, FloatType))
)

val df = spark.read.format("csv").option("delimiter", "\t").option("header", "true").schema(schema).csv("test.csv")

df.show()
// +-----+---+---+
// | Word|Wt1|Wt2|
// +-----+---+---+
// |hello|1.0|2.0|
// |   hi|2.0|4.0|
// +-----+---+---+

df.dtypes
// Array((Word,StringType), (Wt1,FloatType), (Wt2,FloatType))

EDIT: this works for Scala 2.12 and Spark 3.2, see comments for a fix for Scala 2.13 and Spark 3.3

Sign up to request clarification or add additional context in comments.

12 Comments

Not sure that it can be done with that F, but I might have another idea in mine, let me try it and update the answer
What about this version? I get the column names, split into string columns and float columns, then generate appropriate schema and read the csv with the correct schema
Looks like what I was trying to get done. Will check and get back, thank you.
Did you run exactly this code? Like copy-pasting? It does work for me. You don't have to specify Seq or Array as string_cols.map(c => StructField(c, StringType)) ++ float_cols.map(c => StructField(c, FloatType)) is already an Array: both string_cols and string_cols are Arrays, using .map keeps them as Arrays, and joining them with ++ yield another Array.
I found the bug. Splitting into string_cols and float_cols was creating ListBuffer. When I converted them through .toArray, the code worked beautifully!
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.