1

Hello guys i need some recomendations on this problem, i have this DataFrame:

+------------------------+--------------------+---+---+----------+-----------------------+-------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+----------------------+-----------------------+----------------------+-----------------------+
|_id                     |h                   |inc|op |ts        |webhooks__0__failed_at |webhooks__0__status|webhooks__0__updated_at|webhooks__1__failed_at |webhooks__1__updated_at|webhooks__2__failed_at |webhooks__2__updated_at|webhooks__3__failed_at|webhooks__3__updated_at|webhooks__5__failed_at|webhooks__5__updated_at|
+------------------------+--------------------+---+---+----------+-----------------------+-------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+----------------------+-----------------------+----------------------+-----------------------+
|5926115bffecf947d9fdf965|-3783513890158363801|148|u  |1564077339|null                   |null               |null                   |2019-07-25 17:55:39.813|2019-07-25 17:55:39.819|null                   |null                   |null                  |null                   |null                  |null                   |
|5926115bffecf947d9fdf965|-6421919050082865687|151|u  |1564077339|null                   |null               |null                   |2019-07-25 17:55:39.822|2019-07-25 17:55:39.845|null                   |null                   |null                  |null                   |null                  |null                   |
|5926115bffecf947d9fdf965|-1953717027542703837|155|u  |1564077339|null                   |null               |null                   |2019-07-25 17:55:39.873|2019-07-25 17:55:39.878|null                   |null                   |null                  |null                   |null                  |null                   |
|5926115bffecf947d9fdf965|7260191374440479618 |159|u  |1564077339|null                   |null               |null                   |2019-07-25 17:55:39.945|2019-07-25 17:55:39.951|null                   |null                   |null                  |null                   |null                  |null                   |
|57d17de901cc6a6c9e0000ab|-2430099739381353477|131|u  |1564077339|2019-07-25 17:55:39.722|error              |2019-07-25 17:55:39.731|null                   |null                   |null                   |null                   |null                  |null                   |null                  |null                   |
|5b9bf21bffecf966c2878b11|4122669520839049341 |30 |u  |1564077341|null                   |listening          |2019-07-25 17:55:41.453|null                   |null                   |null                   |null                   |null                  |null                   |null                  |null                   |
|5b9bf21bffecf966c2878b11|4122669520839049341 |30 |u  |1564077341|null                   |listening          |2019-07-25 17:55:41.453|null                   |null                   |null                   |null                   |null                  |null                   |null                  |null                   |
|5b9bf21bffecf966c2878b11|-7191334145177061427|60 |u  |1564077341|null                   |null               |2019-07-25 17:55:41.768|null                   |null                   |null                   |null                   |null                  |null                   |null                  |null                   |
|5b9bf21bffecf966c2878b11|1897433358396319399 |58 |u  |1564077341|null                   |null               |2019-07-25 17:55:41.767|null                   |null                   |null                   |null                   |null                  |null                   |null                  |null                   |
|5b9bf21bffecf966c2878b11|1897433358396319399 |58 |u  |1564077341|null                   |null               |2019-07-25 17:55:41.767|null                   |null                   |null                   |null                   |null                  |null                   |null                  |null                   |
|58c6d048edbb6e09eb177639|8363076784039152000 |23 |u  |1564077342|null                   |null               |2019-07-25 17:55:42.216|null                   |null                   |null                   |null                   |null                  |null                   |null                  |null                   |
|5b9bf21bffecf966c2878b11|-7191334145177061427|60 |u  |1564077341|null                   |null               |2019-07-25 17:55:41.768|null                   |null                   |null                   |null                   |null                  |null                   |null                  |null                   |
|58c6d048edbb6e09eb177639|8363076784039152000 |23 |u  |1564077342|null                   |null               |2019-07-25 17:55:42.216|null                   |null                   |null                   |null                   |null                  |null                   |null                  |null                   |
|5ac6a0d3b795b013a5a73a43|-3790832816225805697|36 |u  |1564077346|null                   |null               |null                   |null                   |null                   |2019-07-25 17:55:46.384|2019-07-25 17:55:46.400|null                  |null                   |null                  |null                   |
|5ac6a0d3b795b013a5a73a43|-1747137668935062717|34 |u  |1564077346|null                   |null               |null                   |null                   |null                   |2019-07-25 17:55:46.385|2019-07-25 17:55:46.398|null                  |null                   |null                  |null                   |
|5ac6a0d3b795b013a5a73a43|-1747137668935062717|34 |u  |1564077346|null                   |null               |null                   |null                   |null                   |2019-07-25 17:55:46.385|2019-07-25 17:55:46.398|null                  |null                   |null                  |null                   |
|5ac6a0d3b795b013a5a73a43|-3790832816225805697|36 |u  |1564077346|null                   |null               |null                   |null                   |null                   |2019-07-25 17:55:46.384|2019-07-25 17:55:46.400|null                  |null                   |null                  |null                   |
|5ac6a0d3b795b013a5a73a43|6060575882395080442 |63 |u  |1564077346|null                   |null               |null                   |null                   |null                   |2019-07-25 17:55:46.506|2019-07-25 17:55:46.529|null                  |null                   |null                  |null                   |
|5ac6a0d3b795b013a5a73a43|6060575882395080442 |63 |u  |1564077346|null                   |null               |null                   |null                   |null                   |2019-07-25 17:55:46.506|2019-07-25 17:55:46.529|null                  |null                   |null                  |null                   |
|594e88f1ffecf918a14c143e|736029767610412482  |58 |u  |1564077346|2019-07-25 17:55:46.503|null               |2019-07-25 17:55:46.513|null                   |null                   |null                   |null                   |null                  |null                   |null                  |null                   |
+------------------------+--------------------+---+---+----------+-----------------------+-------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+----------------------+-----------------------+----------------------+-----------------------+

The column names are growing in a format like this

webhooks__0__failed_at,webhooks__0__failed_at

Would it be possible to make a new DataFrame taking the number of the column name as an Index and grouping the results like this?

Index | webhooks__failed_at         |  webhooks__status

0     |     null                    |      null

0     |     null                    |      null

0     |    2019-07-25 17:55:39.722  |     error
5
  • What you have tried so far.? It definitely possible. You can take make different data frame taking only the columns required based on index and finally union all of them together and then group it. If you columns are static it will be easy. For dynamic number of columns you will need more implementation. Commented Jul 26, 2019 at 1:22
  • i'm thinking in doing an UDF but dont know where to start Commented Jul 26, 2019 at 14:56
  • @JesusZuñiga What if webhook failed twice? Commented Jul 28, 2019 at 22:10
  • @KrzysztofAtłasik it just adds another record to the column it alway follows the structure updated_at , failed at for each idex Commented Jul 28, 2019 at 22:54
  • Can you add more output, so that it would be easy to help you out?, Do upi have more fields like webhooks__1__status and so on?, when index 0 has value as 2019-07-25 17:55:39.722 and error does the value will be same at some point index is 1, 2, 3, and so on ? Commented Jul 29, 2019 at 14:39

2 Answers 2

4
+50

If your initial data frame is referenced as df with the following schema:

df.printSchema
root
 |-- _id: string (nullable = true)
 |-- h: string (nullable = true)
 |-- inc: string (nullable = true)
 |-- op: string (nullable = true)
 |-- ts: string (nullable = true)
 |-- webhooks__0__failed_at: string (nullable = true)
 |-- webhooks__0__status: string (nullable = true)
 |-- webhooks__0__updated_at: string (nullable = true)
 |-- webhooks__1__failed_at: string (nullable = true)
 |-- webhooks__1__updated_at: string (nullable = true)
 |-- webhooks__2__failed_at: string (nullable = true)
 |-- webhooks__2__updated_at: string (nullable = true)
 |-- webhooks__3__failed_at: string (nullable = true)
 |-- webhooks__3__updated_at: string (nullable = true)
 |-- webhooks__5__failed_at: string (nullable = true)
 |-- webhooks__5__updated_at: string (nullable = true)

You can regroup all webhook data in an array of struct simply by manipulating column name expressions and you can use the lit spark function to insert column names as values in the resulting dataset.

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import df.sparkSession.implicits._

val (webhooks_columns, base_columns) = df.columns.partition(_.startsWith("webhooks"))

val parsed_webhooks_columns = webhooks_columns
     .map(_.split("__"))
     .map { case Array(_: String, idx: String, f: String) => (idx, f) }

val all_fields = parsed_webhooks_columns.map(_._2).toSet

val webhooks_structs = parsed_webhooks_columns
    .groupBy(_._1)
    .map(t => {
      val fields = t._2.map(_._2)
      val all_struct_fields = 
          Seq(lit(t._1).as("index")) ++ 
          all_fields.map { f =>
            if (fields.contains(f))
                col(s"webhooks__${t._1}__${f}").as(f)
            else
                lit(null).cast(StringType).as(f)
          }
      struct(all_struct_fields:_*)
    }).toArray


val df_step1 = df.select(base_columns.map(col) ++
    Seq(array(webhooks_structs:_*).as("webhooks")):_*)

Most of the complexity in the code above deals with the fact that you have varying number of fields depending on the webhook index (index 0 has a status field not found in the other indexes) and you need to ensure all structs have exactly the same columns with the same types and in the same order for the transformation to work.

You end up with the following schema:

df_step1.printSchema
root
 |-- _id: string (nullable = true)
 |-- h: string (nullable = true)
 |-- inc: string (nullable = true)
 |-- op: string (nullable = true)
 |-- ts: string (nullable = true)
 |-- webhooks: array (nullable = false)
 |    |-- element: struct (containsNull = false)
 |    |    |-- index: string (nullable = false)
 |    |    |-- failed_at: string (nullable = true)
 |    |    |-- status: string (nullable = true)
 |    |    |-- updated_at: string (nullable = true)

Now you can explode the dataset to split the different webhooks in separate rows

val df_step2 = df_step1.withColumn("webhook", explode('webhooks)).drop("webhooks")

And you get the following schema

df_step2.printSchema
root
 |-- _id: string (nullable = true)
 |-- h: string (nullable = true)
 |-- inc: string (nullable = true)
 |-- op: string (nullable = true)
 |-- ts: string (nullable = true)
 |-- webhook: struct (nullable = false)
 |    |-- index: string (nullable = false)
 |    |-- failed_at: string (nullable = true)
 |    |-- status: string (nullable = true)
 |    |-- updated_at: string (nullable = true)

You can then optionally flatten the dataset to simplify final schema

val df_step2_flattened = df_step2.schema
       .filter(_.name == "webhook")
       .flatMap(_.dataType.asInstanceOf[StructType])
       .map(f => (s"webhook_${f.name}", 'webhook(f.name)))
       .foldLeft(df_step2) { case (df, (colname, colspec)) => df.withColumn(colname, colspec) }
       .drop("webhook")

At this point you'll probably want to filter out rows with null webhook_updated_at and run whatever aggregation you need.

Your final schema is now:

df_step2_flattened.printSchema
root
 |-- _id: string (nullable = true)
 |-- h: string (nullable = true)
 |-- inc: string (nullable = true)
 |-- op: string (nullable = true)
 |-- ts: string (nullable = true)
 |-- webhook_index: string (nullable = false)
 |-- webhook_failed_at: string (nullable = true)
 |-- webhook_status: string (nullable = true)
 |-- webhook_updated_at: string (nullable = true)

This is not the only way to do what you want but the key benefit of the above approach is that it's only using built-in Spark expressions and functions and thus can fully leverage all the catalyst engine optimizations.

Sign up to request clarification or add additional context in comments.

3 Comments

I do not know how this will behave in terms of performance, but I think this solution is way more elegant than creating many dataframes and applying union. Thanks for sharing the approach!
It should be reasonably efficient since it's mostly column metadata manipulation. The most expensive operation is the explode and it's necessary to convert the input format to something you can apply relational algebra on.
man you are brilliant, can you give me some recommendations to learn to manipulate Dataframes like this on scala
1

I would advise looping. The example below is basic, but could help point you in the write direction. The example aims at searching for one column instead of two, however it can be built to factor in for multiple different columns and built into a sub-process if need be.

//Build the DataFrame
val inputDF = spark.sql("select 'a' as Column_1, 'value_1' as test_0_value, 'value_2' as test_1_value, 'value_3' as test_2_value, 'value_4' as test_3_value")

//Make my TempDFs
var interimDF = spark.sql("select 'at-at' as column_1")
var actionDF = interimDF
var finalDF = interimDF

//This would be your search and replacement characteristics
val lookForValue = "test"
val replacementName = "test_check"

//Holds the constants
var constantArray = Array("Column_1")
//Based on above makes an array based on the columns you need to hit
var changeArray = Seq(inputDF.columns:_*).toDF("Columns").where("Columns rlike '" + lookForValue + "'").rdd.map(x=>x.mkString).collect

//Iterator
var iterator = 1

//Need this for below to run commands
var runStatement = Array("")

//Runs until all columns are hit
while(iterator <= changeArray.length) {
  //Adds constants
  runStatement = constantArray
  //Adds the current iteration columns
  runStatement = runStatement ++ Array(changeArray(iterator - 1) + " as " + replacementName)
  //Adds the iteration event
  runStatement = runStatement ++ Array("'" + iterator + "' as Iteration_Number")

  //Runs all the prebuilt commands
  actionDF = inputDF.selectExpr(runStatement:_*)

  //The reason for this is going from input -> action -> interim <-> final allows for interim and final to be semi-dynamic and allows vertical and horizontal catalogue keeping in spark
  interimDF = if(iterator == 1) {
    actionDF
  } else {
    finalDF.unionAll(actionDF)
  }
  finalDF = interimDF
  iterator = iterator + 1
}

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.