1

I've made this piece of code :

case class RawPanda(id: Long, zip: String, pt: String, happy: Boolean, attributes: Array[Double])
case class PandaPlace(name: String, pandas: Array[RawPanda])

object TestSparkDataFrame extends App{

  System.setProperty("hadoop.home.dir", "E:\\Programmation\\Libraries\\hadoop")
  val conf = new SparkConf().setAppName("TestSparkDataFrame").set("spark.driver.memory","4g").setMaster("local[*]")
  val session = SparkSession.builder().config(conf).getOrCreate()

  import session.implicits._

  def createAndPrintSchemaRawPanda(session:SparkSession):DataFrame = {
    val newPanda = RawPanda(1,"M1B 5K7", "giant", true, Array(0.1, 0.1))
    val pandaPlace = PandaPlace("torronto", Array(newPanda))
    val df =session.createDataFrame(Seq(pandaPlace))
    df
  }
  val df2 = createAndPrintSchemaRawPanda(session)
  df2.show

+--------+--------------------+
|    name|              pandas|
+--------+--------------------+
|torronto|[[1,M1B 5K7,giant...|
+--------+--------------------+


  val pandaInfo = df2.explode(df2("pandas")) {
    case Row(pandas: Seq[Row]) =>
      pandas.map{
        case (Row(
          id: Long,
          zip: String,
          pt: String,
          happy: Boolean,
          attrs: Seq[Double])) => RawPanda(id, zip, pt , happy,      attrs.toArray)
      }
  }

  pandaInfo2.show

+--------+--------------------+---+-------+-----+-----+----------+
|    name|              pandas| id|    zip|   pt|happy|attributes|
+--------+--------------------+---+-------+-----+-----+----------+
|torronto|[[1,M1B 5K7,giant...|  1|M1B 5K7|giant| true|[0.1, 0.1]|
+--------+--------------------+---+-------+-----+-----+----------+

The problem that the explode function as I used it is deprecated, so I would like to recaculate the pandaInfo2 dataframe but using the adviced method in the warning.

use flatMap() or select() with functions.explode() instead

But then when I do :

 val pandaInfo = df2.select(functions.explode(df("pandas"))

I obtain the same result as I had in df2. I don't know how to proceed to use flatMap or functions.explode.

How could I use flatMap or functions.explode to obtain the result that I want ?(the one in pandaInfo)

I've seen this post and this other one but none of them helped me.

1 Answer 1

5

Calling select with explode function returns a DataFrame where the Array pandas is "broken up" into individual records; Then, if you want to "flatten" the structure of the resulting single "RawPanda" per record, you can select the individual columns using a dot-separated "route":

val pandaInfo2 = df2.select($"name", explode($"pandas") as "pandas")
  .select($"name", $"pandas",
    $"pandas.id" as "id",
    $"pandas.zip" as "zip",
    $"pandas.pt" as "pt",
    $"pandas.happy" as "happy",
    $"pandas.attributes" as "attributes"
  )

A less verbose version of the exact same operation would be:

import org.apache.spark.sql.Encoders // going to use this to "encode" case class into schema
val pandaColumns = Encoders.product[RawPanda].schema.fields.map(_.name)

val pandaInfo3 = df2.select($"name", explode($"pandas") as "pandas")
  .select(Seq($"name", $"pandas") ++ pandaColumns.map(f => $"pandas.$f" as f): _*)
Sign up to request clarification or add additional context in comments.

2 Comments

Wow nice, what is an Encoder and how did you know you had to use it ?
Encoder is Spark's way of using reflection to figure out the DataFrame schema appropriate to represent a given class. There are other ways to infer a schema from a class, see stackoverflow.com/q/36746055/5344058 for more options.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.