2

I need to conditionally modify a value of a nested field in a Dataframe (or create a new field with the nested values). I would like to do it without having to use UDF, but I really would want to avoid RDD/map since the production tables can have many hundred millions of records and map in that condition dosen't ring as efficient/fast to me.

Bellow is the test case:


case class teste(var testID: Int  = 0, var testDesc: String = "", var testValue: String = "")


val DFMain = Seq( ("A",teste(1, "AAA", "10")),("B",teste(2, "BBB", "20")),("C",teste(3, "CCC", "30"))).toDF("F1","F2")
val DFNewData = Seq( ("A",teste(1, "AAA", "40")),("B",teste(2, "BBB", "50")),("C",teste(3, "CCC", "60"))).toDF("F1","F2")

val DFJoined = DFMain.join(DFNewData,DFMain("F2.testID")===DFNewData("F2.testID"),"left").
        select(DFMain("F1"), DFMain("F2"), DFNewData("F2.testValue").as("NewValue")).
        withColumn("F2.testValue",$"NewValue")

DFJoined.show()

This will add a new column, but I need that F2.testValue to be equal to the value of NewValue inside the Struct when its above 50.

Original Data:

+---+------------+
| F1|          F2|
+---+------------+
|  A|[1, AAA, 10]|
|  B|[2, BBB, 20]|
|  C|[3, CCC, 30]|
+---+------------+

Desired Result:

+---+------------+
| F1|          F2|
+---+------------+
|  A|[1, AAA, 10]|
|  B|[2, BBB, 50]|
|  C|[3, CCC, 60]|
+---+------------+

2 Answers 2

3

Could you please try this.

case class teste(var testID: Int  = 0, var testDesc: String = "", var testValue: String = "")
val DFMain = Seq( ("A",teste(1, "AAA", "10")),("B",teste(2, "BBB", "20")),("C",teste(3, "CCC", "30"))).toDF("F1","F2")
DFMain.show(false)

+---+------------+
|F1 |F2          |
+---+------------+
|A  |[1, AAA, 10]|
|B  |[2, BBB, 20]|
|C  |[3, CCC, 30]|
+---+------------+

val DFNewData = Seq( ("A",teste(1, "AAA", "40")),("B",teste(2, "BBB", "50")),("C",teste(3, "CCC", "60"))).toDF("F1","F2")

val DFJoined = DFMain.join(DFNewData,DFMain("F2.testID")===DFNewData("F2.testID"),"left").
        select(DFMain("F1"), DFMain("F2"), DFNewData("F2.testValue").as("NewValue"))
  .withColumn("F2_testValue",$"NewValue")

DFJoined.show

+---+------------+--------+------------+
| F1|          F2|NewValue|F2_testValue|
+---+------------+--------+------------+
|  A|[1, AAA, 10]|      40|          40|
|  B|[2, BBB, 20]|      50|          50|
|  C|[3, CCC, 30]|      60|          60|
+---+------------+--------+------------+

DFJoined.printSchema

root
 |-- F1: string (nullable = true)
 |-- F2: struct (nullable = true)
 |    |-- testID: integer (nullable = false)
 |    |-- testDesc: string (nullable = true)
 |    |-- testValue: string (nullable = true)
 |-- NewValue: string (nullable = true)
 |-- F2_testValue: string (nullable = true)

DFJoined.withColumn("f2_new", expr(" case when F2_testValue>=50 then concat_ws('|',F2.testID,F2.testDesc,F2_testValue) else concat_ws('|',F2.testID,F2.testDesc,F2.testValue) end "))
.withColumn("f2_new3",struct(split($"f2_new","[|]")(0),split($"f2_new","[|]")(1),split($"f2_new","[|]")(2) ) )
.show(false)

+---+------------+--------+------------+--------+------------+
|F1 |F2          |NewValue|F2_testValue|f2_new  |f2_new3     |
+---+------------+--------+------------+--------+------------+
|A  |[1, AAA, 10]|40      |40          |1|AAA|10|[1, AAA, 10]|
|B  |[2, BBB, 20]|50      |50          |2|BBB|50|[2, BBB, 50]|
|C  |[3, CCC, 30]|60      |60          |3|CCC|60|[3, CCC, 60]|
+---+------------+--------+------------+--------+------------+

f2_new3 is the desired output.

The reason for the workaround is the below one is not working.

DFJoined.withColumn("f2_new", expr(" case when F2_testValue>=50 then struct(F2.testID,F2.testDesc,F2_testValue) else struct(F2.testID,F2.testDesc,F2.testValue) end ")).show()
Sign up to request clarification or add additional context in comments.

1 Comment

Thanks a lot man, its similar to a solution i was trying and I was able to mix both to get away from doing the workaround. Specify the "expr" as following: expr(" case when NewValue>=50 then named_struct('F2', named_struct('testID',F2.testID,'testDesc',F2.testDesc,'testValue',NewValue)) else named_struct('F2', named_struct('testID',F2.testID,'testDesc',F2.testDesc,'testValue',F2.testValue)) end")
1

In addition to stack0114106 answer, I also found this solution for the problem, they are more or less alike:


val DFFinal = DFJoined.selectExpr("""
        named_struct(
          'F1', F1,
          'F2', named_struct(
            'testID', F2.testID,
            'testDesc', F2.testDesc,
            'testValue', case when NewValue>=50 then NewValue else F2.testValue end
            )
        ) as named_struct
      """).select($"named_struct.F1", $"named_struct.F2")

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.