1

I'm having a difficult time trying to find a good way to filter a spark Dataset. I've described the basic problem below:

  1. For every key check if there is a statusCode === UV.
  2. If there is no UV status code associated with that key ignore that key completely.
    • Please Note: There should only ever be one UV for each key.
  3. If there is then search for the closest OA event that is after the UV timestamp.
    • Please note: There could be multiple OA events after the UV timestamp. I want the one closest to the UV timestamp.
  4. If the only OA event is in the past (i.e. before the UV I still want to keep that record because an expected OA will come in but I want to still capture the row with a status code of OA but replace the value will null

Input

+-----------+----------+-------------------+
|key        |statusCode|statusTimestamp    |
+-----------+----------+-------------------+
|AAAAAABBBBB|OA        |2019-05-24 14:46:00|
|AAAAAABBBBB|VD        |2019-05-31 19:31:00|
|AAAAAABBBBB|VA        |2019-06-26 00:00:00|
|AAAAAABBBBB|E         |2019-06-26 02:00:00|
|AAAAAABBBBB|UV        |2019-06-29 00:00:00|
|AAAAAABBBBB|OA        |2019-07-01 00:00:00|
|AAAAAABBBBB|EE        |2019-07-03 01:00:00|
+-----------+----------+-------------------+

Expected Output

+-----------+----------+-------------------+
|key        |statusCode|statusTimestamp    |
+-----------+----------+-------------------+
|AAAAAABBBBB|UV        |2019-06-29 00:00:00|
|AAAAAABBBBB|OA        |2019-07-01 00:00:00|
+-----------+----------+-------------------+

I know I could likely solve the problem by setting up the data like this, but does anyone have a suggestion on how to solve the above filter.

someDS
  .groupBy("key")
  .pivot("statusCode", Seq("UV", "OA"))
  .agg(collect_set($"statusTimestamp"))
  .thenSomeOtherStuff...

1 Answer 1

1

While the groupBy/pivot approach would group the timestamps nicely, it would require non-trivial steps (most likely a UDF) to perform the necessary filtering followed by re-expansion. Here's a different approach with the following steps:

  1. Filter the dataset for statusCode "UV" or "OA" only
  2. For each row, use Window functions to create a String of statusCode from the previous, current, and next 2 rows
  3. Use Regex pattern matching to identify the wanted rows

Sample code below:

import java.sql.Timestamp
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import spark.implicits._

// Sample data:
//   key `A`: requirement #3
//   key `B`: requirement #2
//   key `C`: requirement #4  
val df = Seq(
  ("A", "OA", Timestamp.valueOf("2019-05-20 00:00:00")),
  ("A", "E",  Timestamp.valueOf("2019-05-30 00:00:00")),
  ("A", "UV", Timestamp.valueOf("2019-06-22 00:00:00")),
  ("A", "OA", Timestamp.valueOf("2019-07-01 00:00:00")),
  ("A", "OA", Timestamp.valueOf("2019-07-03 00:00:00")),
  ("B", "C",  Timestamp.valueOf("2019-06-15 00:00:00")),
  ("B", "OA", Timestamp.valueOf("2019-06-25 00:00:00")),
  ("C", "D",  Timestamp.valueOf("2019-06-01 00:00:00")),
  ("C", "OA", Timestamp.valueOf("2019-06-30 00:00:00")),
  ("C", "UV", Timestamp.valueOf("2019-07-02 00:00:00"))
).toDF("key", "statusCode", "statusTimestamp")

val win = Window.partitionBy("key").orderBy("statusTimestamp")

val df2 = df.
  where($"statusCode" === "UV" || $"statusCode" === "OA").
  withColumn("statusPrevCurrNext2", concat(
    coalesce(lag($"statusCode", 1).over(win), lit("")),
    lit("#"),
    $"statusCode",
    lit("#"),
    coalesce(lead($"statusCode", 1).over(win), lit("")),
    lit("#"),
    coalesce(lead($"statusCode", 2).over(win), lit(""))
  ))

Let's look at df2 (result of steps 1 and 2):

df2.show(false)
// +---+----------+-------------------+-------------------+
// |key|statusCode|statusTimestamp    |statusPrevCurrNext2|
// +---+----------+-------------------+-------------------+
// |B  |OA        |2019-06-25 00:00:00|#OA##              |
// |C  |OA        |2019-06-30 00:00:00|#OA#UV#            | <-- Req #4: Ends with `#UV#`
// |C  |UV        |2019-07-02 00:00:00|OA#UV##            | <-- Req #4: Ends with `#UV##`
// |A  |OA        |2019-05-20 00:00:00|#OA#UV#OA          |
// |A  |UV        |2019-06-22 00:00:00|OA#UV#OA#OA        | <-- Req #3: Starts with `[^#]*#UV#`
// |A  |OA        |2019-07-01 00:00:00|UV#OA#OA#          | <-- Req #3: starts with `UV#`
// |A  |OA        |2019-07-03 00:00:00|OA#OA##            |
// +---+----------+-------------------+-------------------+

Applying step 3:

df2.
  where($"statusPrevCurrNext2".rlike("^[^#]*#UV#.*|^UV#.*|.*#UV#+$")).
  orderBy("key", "statusTimestamp").
  show(false)
// +---+----------+-------------------+-------------------+
// |key|statusCode|statusTimestamp    |statusPrevCurrNext2|
// +---+----------+-------------------+-------------------+
// |A  |UV        |2019-06-22 00:00:00|OA#UV#OA#OA        |
// |A  |OA        |2019-07-01 00:00:00|UV#OA#OA#          |
// |C  |OA        |2019-06-30 00:00:00|#OA#UV#            |
// |C  |UV        |2019-07-02 00:00:00|OA#UV##            |
// +---+----------+-------------------+-------------------+
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.