2

Assuming I am having the following rdd:

val aSeq = Seq(("a",Seq(("aa",1.0),("bb",2.0),("cc",3.0))),
               ("b",Seq(("aa",3.0),("bb",4.0),("cc",5.0))),
               ("c",Seq(("aa",6.0),("bb",7.0),("cc",8.0))),
               ("d",Seq(("aa",9.0),("bb",10.0),("cc",11.0))))

val anRdd = sc.parallelize(aSeq)

How can I create a dataframe which uses the first values from the Sequence to name and stracture the schema? If I transform it to df I got the following:

val aDF = anRDD.toDF("id","column2")
aDF.printSchema

root
  |---id: string
  |---column2: array
         |---- element: struct
                 |-----_1: string
                 |-----_2: double

To be more clear what I want is something like the following:

root
 |--id: String(nullable = true)
 |--column2:struct (nullable = true)
        |----aa: Double
        |----bb: Double
        |----cc: Double

Edit

@eliasah gave a pretty understandable answer which give the desired output. I tried to implimented in a my real example, which is more 'deep'/nested. To illustrate I give the following example of an one level more that the first example:

val aSeq = Seq(("a",Seq(("aa",(("aaa",1.0),("bbb",Array(2.0,2.0)))),("bb",(("aaa",8.0),("bbb",Array(3.0,4.0)))),("cc",(("aaa",4.0),("bbb",Array(9.0,3.0)))))),
               ("b",Seq(("aa",(("aaa",1.0),("bbb",Array(3.0,2.0)))),("bb",(("aaa",8.0),("bbb",Array(3.0,3.0)))),("cc",(("aaa",4.0),("bbb",Array(3.0,9.0)))))),
               ("c",Seq(("aa",(("aaa",1.0),("bbb",Array(3.0,2.0)))),("bb",(("aaa",8.0),("bbb",Array(3.0,3.0)))),("cc",(("aaa",4.0),("bbb",Array(3.0,9.0)))))),
               ("d",Seq(("aa",(("aaa",1.0),("bbb",Array(3.0,2.0)))),("bb",(("aaa",8.0),("bbb",Array(3.0,3.0)))),("cc",(("aaa",4.0),("bbb",Array(3.0,9.0)))))))

val anRddB = sc.parallelize(aSeqB)

How can I have a DF with the following schema:

root
 |--id: String
 |--column2:struct
       |----aa:struct
             |--aaa:Double
             |--bbb:array
                 |--element: double
       |----bb:struct
             |--aaa:Double
             |--bbb:array
                 |--element: double
       |----cc:struct
             |--aaa:Double
             |--bbb:array
                 |--element: double

How can this be done?

1 Answer 1

2

If I understand your question correctly, the solution isn't pretty but here it is. You'll need to import the struct function :

scala> import org.apache.spark.sql.functions.struct
// import org.apache.spark.sql.functions.struct

scala> val seq = Seq(("a",Seq(("aa",(("aaa",1.0),("bbb",Array(2.0,2.0)))),("bb",(("aaa",8.0),("bbb",Array(3.0,4.0)))),("cc",(("aaa",4.0),("bbb",Array(9.0,3.0)))))),
           ("b",Seq(("aa",(("aaa",1.0),("bbb",Array(3.0,2.0)))),("bb",(("aaa",8.0),("bbb",Array(3.0,3.0)))),("cc",(("aaa",4.0),("bbb",Array(3.0,9.0)))))),
           ("c",Seq(("aa",(("aaa",1.0),("bbb",Array(3.0,2.0)))),("bb",(("aaa",8.0),("bbb",Array(3.0,3.0)))),("cc",(("aaa",4.0),("bbb",Array(3.0,9.0)))))),
           ("d",Seq(("aa",(("aaa",1.0),("bbb",Array(3.0,2.0)))),("bb",(("aaa",8.0),("bbb",Array(3.0,3.0)))),("cc",(("aaa",4.0),("bbb",Array(3.0,9.0)))))))

scala> val anRdd = sc.parallelize(seq)

Convert your column2 to a Map :

scala> val df = anRDD.map(x => (x._1, x._2.toMap)).toDF("x", "y")
// df: org.apache.spark.sql.DataFrame = [x: string, y: map<string,double>]

Pull up the first set of fields :

scala> val df2 = df.select($"x".as("id"), struct($"y".getItem("aa").as("aa"),$"y".getItem("bb").as("bb"),$"y".getItem("cc").as("cc")).as("column2"))
// df2: org.apache.spark.sql.DataFrame = [id: string, column2: struct<aa:struct<_1:struct<_1:string,_2:double>,_2:struct<_1:string,_2:array<double>>>,bb:struct<_1:struct<_1:string,_2:double>,_2:struct<_1:string,_2:array<double>>>,cc:struct<_1:struct<_1:string,_2:double>,_2:struct<_1:string,_2:array<double>>>>]

scala> df2.printSchema
// root
//  |-- id: string (nullable = true)
//  |-- column2: struct (nullable = false)
//  |    |-- aa: struct (nullable = true)
//  |    |    |-- _1: struct (nullable = true)
//  |    |    |    |-- _1: string (nullable = true)
//  |    |    |    |-- _2: double (nullable = false)
//  |    |    |-- _2: struct (nullable = true)
//  |    |    |    |-- _1: string (nullable = true)
//  |    |    |    |-- _2: array (nullable = true)
//  |    |    |    |    |-- element: double (containsNull = false)
//  |    |-- bb: struct (nullable = true)
//  |    |    |-- _1: struct (nullable = true)
//  |    |    |    |-- _1: string (nullable = true)
//  |    |    |    |-- _2: double (nullable = false)
//  |    |    |-- _2: struct (nullable = true)

scala> df2.show(false)
// +---+----------------------------------------------------------------------------------------------------------------------------+
// |id |column2                                                                                                                     |
// +---+----------------------------------------------------------------------------------------------------------------------------+
// |a  |[[[aaa,1.0],[bbb,WrappedArray(2.0, 2.0)]],[[aaa,8.0],[bbb,WrappedArray(3.0, 4.0)]],[[aaa,4.0],[bbb,WrappedArray(9.0, 3.0)]]]|
// |b  |[[[aaa,1.0],[bbb,WrappedArray(3.0, 2.0)]],[[aaa,8.0],[bbb,WrappedArray(3.0, 3.0)]],[[aaa,4.0],[bbb,WrappedArray(3.0, 9.0)]]]|
// |c  |[[[aaa,1.0],[bbb,WrappedArray(3.0, 2.0)]],[[aaa,8.0],[bbb,WrappedArray(3.0, 3.0)]],[[aaa,4.0],[bbb,WrappedArray(3.0, 9.0)]]]|
// |d  |[[[aaa,1.0],[bbb,WrappedArray(3.0, 2.0)]],[[aaa,8.0],[bbb,WrappedArray(3.0, 3.0)]],[[aaa,4.0],[bbb,WrappedArray(3.0, 9.0)]]]|
// +---+----------------------------------------------------------------------------------------------------------------------------+

Update: To follow up the question update, I'll use the DataFrame df2 to continue pull up the nested fields. It is a bit tricky but here it goes :

val df3 = df2.select(
    $"id",
    struct(
        struct($"column2.aa._1".getItem("_2").as("aaa"),$"column2.aa._2".getItem("_2").as("bbb")).as("aa"),
        struct($"column2.bb._1".getItem("_2").as("aaa"),$"column2.bb._2".getItem("_2").as("bbb")).as("bb"),
        struct($"column2.cc._1".getItem("_2").as("aaa"),$"column2.cc._2".getItem("_2").as("ccc")).as("cc")
    ).as("column2")
)
// df3: org.apache.spark.sql.DataFrame = [id: string, column2: struct<aa:struct<aaa:double,bbb:array<double>>,bb:struct<aaa:double,bbb:array<double>>,cc:struct<aaa:double,ccc:array<double>>>]

There is no magic here, you need to understand well the gymnastics of struct types and nested types to be able to combine it a get the expected output :

df3.printSchema
// root
//  |-- id: string (nullable = true)
//  |-- column2: struct (nullable = false)
//  |    |-- aa: struct (nullable = false)
//  |    |    |-- aaa: double (nullable = true)
//  |    |    |-- bbb: array (nullable = true)
//  |    |    |    |-- element: double (containsNull = false)
//  |    |-- bb: struct (nullable = false)
//  |    |    |-- aaa: double (nullable = true)
//  |    |    |-- bbb: array (nullable = true)
//  |    |    |    |-- element: double (containsNull = false)
//  |    |-- cc: struct (nullable = false)
//  |    |    |-- aaa: double (nullable = true)
//  |    |    |-- ccc: array (nullable = true)
//  |    |    |    |-- element: double (containsNull = false)

Note: Tested with spark-shell 2.0

Sign up to request clarification or add additional context in comments.

6 Comments

I am maybe mistaken considering your example, but please explain further what kind of relationship should be between your nested fields.
thanks for the answer. I cant reproduce it. From which library is struct imported?
you should import org.apache.spark.sql.functions.struct
Its correct, but I was unable to implemented in a more 'deep' format. I have updated my question. Have a look if you got any spare time. Thanks :)
Thanks eliasah. I managed yesterday night to do it :) thanks for the help!
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.