0

I have the following RDD and many just like it:

val csv = sc.parallelize(Array(
  "col1, col2, col3",
  "1, cat, dog",
  "2, bird, bee"))

I would like to convert the RDD into a dataframe where the schema is created dynamically/programmatically based on the first row of the RDD.

I would like to apply the logic to multiple similar like RDDs and cannot specify the schema programmatically using a case class nor use spark-csv to load the data in as a dataframe from the start.

I've created a flattened dataframe, but am wondering how to breakout the respective columns when creating the dataframe?

Current code:

val header= file.first()
val data = file.mapPartitionsWithIndex {
  (idx, iter) => if (idx == 0) iter.drop(1) else iter 
}.toDF(header).show()

Current output:

+----------------+
|col1, col2, col3|
+----------------+
|     1, cat, dog|
|    2, bird, bee|
+----------------+
4
  • If this is from a csv file then read it directly as a dataframe: stackoverflow.com/questions/29704333/… Commented Nov 19, 2019 at 3:34
  • I cannot read it directly into a dataframe using spark-csv Commented Nov 19, 2019 at 3:35
  • How come? Also note that in newer spark version there is no need to use spark-csv as it can be used directly without additional packages, e.g.: spark.read.format("csv").option("header", "true").load("csvfile.csv"). Commented Nov 19, 2019 at 4:15
  • Yes, that is what I meant. There are restrictions in place where I am not able to do so Commented Nov 19, 2019 at 4:23

2 Answers 2

2

In most cases it's preferred to read csv files directly as a dataframe, see e.g.: Spark - load CSV file as DataFrame?.


First you need to split the data into arrays, this is true for both the header and the RDD itself:

val header = csv.first().split(", ")

val data = full_csv.mapPartitionsWithIndex {
  (idx, iter) => if (idx == 0) iter.drop(1) else iter 
}.map(_.split(", ")).toDF("arr")

Note that the above code will convert the RDD to a dataframe, however, it will only have a single column called arr. On the other hand, header will be a Array[String].

The next step is to convert the dataframe with a single array column into the correct number of columns and with the correct names (based on header):

data.select((0 until header.size).map(i => col("arr")(i).alias(header(i))): _*)

This will result in the wanted output dataframe:

+----+----+----+
|col1|col2|col3|
+----+----+----+
|col1|col2|col3|
|   1| cat| dog|
|   2|bird| bee|
+----+----+----+
Sign up to request clarification or add additional context in comments.

Comments

1

You can use this code :

import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}

val csv = spark.sparkContext.parallelize(Array(
    "col1, col2, col3",
    "1, cat, dog",
    "2, bird, bee"))

val header = csv.first.split(",").map(_.trim)

val resultDF = spark.createDataFrame(
    csv.zipWithIndex
    .filter(_._2 > 0)
    .map{case (str, _) => Row.fromSeq(str.split(",").map(_.trim))}
  ,
    StructType(header.map(c => StructField(c, StringType)))
  )

resultDF.show(false)

Output :

+----+----+----+
|col1|col2|col3|
+----+----+----+
|1   |cat |dog |
|2   |bird|bee |
+----+----+----+

1 Comment

Note that you need zipWithIndex to filter out the header, mapPartitionsWithIndex will assign an index per partition, not per record.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.