2

I am trying to add null columns to embebed array[struct] column, by this way I will able to transform a similar complex column:

  case class Additional(id: String, item_value: String)
  case class Element(income:String,currency:String,additional: Additional)
  case class Additional2(id: String, item_value: String, extra2: String)
  case class Element2(income:String,currency:String,additional: Additional2)

  val  my_uDF = fx.udf((data: Seq[Element]) => {
    data.map(x=>new Element2(x.income,x.currency,new Additional2(x.additional.id,x.additional.item_value,null))).seq
  })
  sparkSession.sqlContext.udf.register("transformElements",my_uDF)
  val result=sparkSession.sqlContext.sql("select transformElements(myElements),line_number,country,idate from entity where line_number='1'")

The goal is add to Element.Additional an extra field called extra2, for this reason I map this field with a UDF but it fails because:

org.apache.spark.SparkException: Failed to execute user defined function(anonfun$1: (array<struct<income:string,currency:string,additional:struct<id:string,item_value:string>>>) => array<struct<income:string,currency:string,additional:struct<id:string,item_value:string,extra2:string>>>)

If I print schema for 'Elements' field shows:

 |-- myElements: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- income: string (nullable = true)
 |    |    |-- currency: string (nullable = true)
 |    |    |-- additional: struct (nullable = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- item_value: string (nullable = true)

And I am trying to convert into this schema:

 |-- myElements: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- income: string (nullable = true)
 |    |    |-- currency: string (nullable = true)
 |    |    |-- additional: struct (nullable = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- item_value: string (nullable = true)
 |    |    |    |-- extra2: string (nullable = true)
7
  • 1
    could you have just not used .withcolumn on your DF? why go though the route of UDF and case class? Commented Jul 8, 2019 at 21:50
  • Because I start with a table <year: Int, f1: Element, f2: Element2> . f2 has this extra field only from 2019 and f1 is informed only before this year, so I need make a union like this: (select f2 from table where year >=2019 union select f1 from table where year<2019) but obviously these structs does not match. For this reason I thought to include 'select transform(f1) ..." to force they will have same structure Commented Jul 9, 2019 at 7:44
  • just another approach. if both the schema are same and one having additional column. you can read the first table with the additional field. take it's schema and read the second table with that schema. spark will inherently put null for the field that is defined in schema for which it does not have any data. I would avoid the UDF route as much as possible for the sake of speed. Commented Jul 9, 2019 at 17:06
  • Thanks Aaron, this 'union' is a query that is mandatory because it will request from BI tool, it cannot be a spark code. So insert an UDF can be used from this tool. Commented Jul 9, 2019 at 18:08
  • wait are you invoking udf from your BI tool? did i understand that correctly? Commented Jul 9, 2019 at 18:12

2 Answers 2

4

Here is another approach by utilizing datasets instead of dataframes in order to achieve direct access to the objects instead of using Row. There is one extra method called asElement2 which transforms Element into Element2.

case class Additional2(id: String, item_value: String, extra2: String)
case class Element2(income: String, currency: String, additional2: Additional2)

case class Additional(id: String, item_value: String)
case class Element(income:String, currency:String, additional: Additional){
  def asElement2(): Element2 ={
    val additional2 = Additional2(additional.id, additional.item_value, null)
    Element2(income, currency, additional2)
  }
}

val df = Seq(
  (Seq(Element("150000", "EUR", Additional("001", "500EUR")))),
  (Seq(Element("50000", "CHF", Additional("002", "1000CHF"))))
).toDS()

df.map{
  se => se.map{_.asElement2} 
}

//or even simpler
df.map{_.map{_.asElement2}}

Output:

+-------------------------------+
|value                          |
+-------------------------------+
|[[150000, EUR, [001, 500EUR,]]]|
|[[50000, CHF, [002, 1000CHF,]]]|
+-------------------------------+

Final schema:

root
 |-- value: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- income: string (nullable = true)
 |    |    |-- currency: string (nullable = true)
 |    |    |-- additional2: struct (nullable = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- item_value: string (nullable = true)
 |    |    |    |-- extra2: string (nullable = true)
Sign up to request clarification or add additional context in comments.

1 Comment

I like your answer. It is somehow clearer for me what is going on. Thanks!
3
+50

It's easier to simply perform the necessary transformations of the nested row elements in the DataFrame with map and rename the column via toDF:

import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import spark.implicits._

case class Additional(id: String, item_value: String)
case class Element(income: String, currency: String, additional: Additional)
case class Additional2(id: String, item_value: String, extra2: String)
case class Element2(income: String, currency: String, additional: Additional2)

val df = Seq(
  (Seq(Element("70k", "US", Additional("1", "101")), Element("90k", "US", Additional("2", "202")))),
  (Seq(Element("80k", "US", Additional("3", "303"))))
).toDF("myElements")

val df2 = df.map{ case Row(s: Seq[Row] @unchecked) => s.map{
  case Row(income: String, currency: String, additional: Row) => additional match {
    case Row(id: String, item_value: String) =>
      Element2(income, currency, Additional2(id, item_value, null))
  }}
}.toDF("myElements")

df2.show(false)
// +--------------------------------------------+
// |myElements                                  |
// +--------------------------------------------+
// |[[70k, US, [1, 101,]], [90k, US, [2, 202,]]]|
// |[[80k, US, [3, 303,]]]                      |
// +--------------------------------------------+

df2.printSchema
// root
//  |-- myElements: array (nullable = true)
//  |    |-- element: struct (containsNull = true)
//  |    |    |-- income: string (nullable = true)
//  |    |    |-- currency: string (nullable = true)
//  |    |    |-- additional: struct (nullable = true)
//  |    |    |    |-- id: string (nullable = true)
//  |    |    |    |-- item_value: string (nullable = true)
//  |    |    |    |-- extra2: string (nullable = true)

If for some reason a UDF is preferred, the required transformations are essentially the same:

val  myUDF = udf((s: Seq[Row]) => s.map{
  case Row(income: String, currency: String, additional: Row) => additional match {
    case Row(id: String, item_value: String) =>
      Element2(income, currency, Additional2(id, item_value, null))
  }
})

val df2 = df.select(myUDF($"myElements").as("myElements"))

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.