I'm having a difficult time trying to find a good way to filter a spark Dataset. I've described the basic problem below:
- For every key check if there is a statusCode === UV.
- If there is no UV status code associated with that key ignore that key completely.
- Please Note: There should only ever be one UV for each key.
- If there is then search for the closest OA event that is after the UV timestamp.
- Please note: There could be multiple OA events after the UV timestamp. I want the one closest to the UV timestamp.
- If the only OA event is in the past (i.e. before the UV I still want to keep that record because an expected OA will come in but I want to still capture the row with a status code of OA but replace the value will
null
Input
+-----------+----------+-------------------+
|key |statusCode|statusTimestamp |
+-----------+----------+-------------------+
|AAAAAABBBBB|OA |2019-05-24 14:46:00|
|AAAAAABBBBB|VD |2019-05-31 19:31:00|
|AAAAAABBBBB|VA |2019-06-26 00:00:00|
|AAAAAABBBBB|E |2019-06-26 02:00:00|
|AAAAAABBBBB|UV |2019-06-29 00:00:00|
|AAAAAABBBBB|OA |2019-07-01 00:00:00|
|AAAAAABBBBB|EE |2019-07-03 01:00:00|
+-----------+----------+-------------------+
Expected Output
+-----------+----------+-------------------+
|key |statusCode|statusTimestamp |
+-----------+----------+-------------------+
|AAAAAABBBBB|UV |2019-06-29 00:00:00|
|AAAAAABBBBB|OA |2019-07-01 00:00:00|
+-----------+----------+-------------------+
I know I could likely solve the problem by setting up the data like this, but does anyone have a suggestion on how to solve the above filter.
someDS
.groupBy("key")
.pivot("statusCode", Seq("UV", "OA"))
.agg(collect_set($"statusTimestamp"))
.thenSomeOtherStuff...