0

I am trying to read multiple csvs into an rdd from a path. This path has many csvs Is there a way I can avoid the headers while reading all the csvs into rdd? or use spotsRDD to omit out the header without having to use filter or deal with each csv individually and then union them?

val path ="file:///home/work/csvs/*"
    val spotsRDD= sc.textFile(path)
    println(spotsRDD.count())

Thanks

2
  • which version of spark are u using? Commented Oct 17, 2016 at 15:52
  • @VladoDemcak :Unfortunately spark 1.0.0 . We need to stick to rdds right now until we upgrade. Commented Oct 17, 2016 at 15:58

1 Answer 1

1

That is pity you are using spark 1.0.0.

You can use CSV Data Source for Apache Spark but this library requires Spark 1.3+ and btw. this library was inlined to Spark 2.x.

But we can analyse and implement something similar.

When we look into the com/databricks/spark/csv/DefaultSource.scala there is

val useHeader = parameters.getOrElse("header", "false")

and then in the com/databricks/spark/csv/CsvRelation.scala there is

// If header is set, make sure firstLine is materialized before sending to executors.
val filterLine = if (useHeader) firstLine else null

baseRDD().mapPartitions { iter =>
// When using header, any input line that equals firstLine is assumed to be header
val csvIter = if (useHeader) {
  iter.filter(_ != filterLine)
} else {
  iter
}
parseCSV(csvIter, csvFormat)

so if we assume the first line is only once in RDD (our csv rows) we can do something like in the example below:

CSV example file:

Latitude,Longitude,Name
48.1,0.25,"First point"
49.2,1.1,"Second point"
47.5,0.75,"Third point"

scala> val csvData = sc.textFile("test.csv")
csvData: org.apache.spark.rdd.RDD[String] = test.csv MapPartitionsRDD[24] at textFile at <console>:24

scala> val header = csvDataRdd.first
header: String = Latitude,Longitude,Name

scala> val csvDataWithoutHeaderRdd = csvDataRdd.mapPartitions{iter => iter.filter(_ != header)}
csvDataWithoutHeaderRdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[25] at mapPartitions at <console>:28

scala> csvDataWithoutHeaderRdd.foreach(println)
49.2,1.1,"Second point"
48.1,0.25,"First point"
47.5,0.75,"Third point"
Sign up to request clarification or add additional context in comments.

1 Comment

this won't work in case of multiple csv files read in a single rdd..??

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.