1

I struggle to write my spark scala code to fill rows for which the coverage is empty using self join with conditions.

This is the data :

+----+--------------+----------+--------+
| ID | date_in_days | coverage | values |
+----+--------------+----------+--------+
|  1 | 2020-09-01   |          | 0.128  |
|  1 | 2020-09-03   |        0 | 0.358  |
|  1 | 2020-09-04   |        0 | 0.035  |
|  1 | 2020-09-05   |          |        |
|  1 | 2020-09-06   |          |        |
|  1 | 2020-09-19   |          |        |
|  1 | 2020-09-12   |          |        |
|  1 | 2020-09-18   |          |        |
|  1 | 2020-09-11   |          |        |
|  1 | 2020-09-16   |          |        |
|  1 | 2020-09-21   |       13 | 0.554  |
|  1 | 2020-09-23   |          |        |
|  1 | 2020-09-30   |          |        |
+----+--------------+----------+--------+

Expected result :

+----+--------------+----------+--------+
| ID | date_in_day  | coverage | values |
+----+--------------+----------+--------+
|  1 | 2020-09-01   |       -1 | 0.128  |
|  1 | 2020-09-03   |        0 | 0.358  |
|  1 | 2020-09-04   |        0 | 0.035  |
|  1 | 2020-09-05   |        0 |        |
|  1 | 2020-09-06   |        0 |        |
|  1 | 2020-09-19   |        0 |        |
|  1 | 2020-09-12   |        0 |        |
|  1 | 2020-09-18   |        0 |        |
|  1 | 2020-09-11   |        0 |        |
|  1 | 2020-09-16   |        0 |        |
|  1 | 2020-09-21   |       13 | 0.554  |
|  1 | 2020-09-23   |       -1 |        |
|  1 | 2020-09-30   |       -1 |        |

What I am trying to do:

For each different ID (Dataframe partitioned by ID) sorted by date

Use case : row coverage column is null let's call it rowEmptycoverage:

  1. Find in the DF the first row with date_in_days > rowEmptycoverage.date_in_days and with coverage >= 0. Let's call it rowFirstDateGreater
  2. Then if rowFirstDateGreater.values > 500 set rowEmptycoverage.coverage to 0. Set it to -1 otherwise.

I am kind of lost in mixing when join where...

2
  • Can you explain the logic more clearly? An example that illustrates all the cases would be welcome. Commented Oct 6, 2020 at 8:28
  • Yeah it wasn't clear, i tried to redescribe let me know :) Commented Oct 6, 2020 at 12:12

1 Answer 1

1

I am assuming that you mean values > 0.500 and not values > 500. Also the logic remains unclear. Here I am assuming that you are searching in the order of the column date_in_days and not in the order of the dataframe.

In any case we can refine the solution to match your exact need. The overall idea is to use a Window to fetch the next date for which the coverage is not null, check if values meet the desired criteria and update coverage.

It goes as follows:

val win = Window.partitionBy("ID").orderBy("date_in_days")
    .rangeBetween(Window.currentRow, Window.unboundedFollowing)

df
  // creating a struct binding coverage and values
  .withColumn("cov_str", when('coverage isNull, lit(null))
                                    .otherwise(struct('coverage, 'values)))
  // finding the first row (starting from the current date, in order of 
  // date_in_days) for which the coverage is not null
  .withColumn("next_cov_str", first('cov_str, ignoreNulls=true) over win)
  // updating coverage. We keep the original value if not null, put 0 if values
  // meets the criteria (that you can change) and -1 otherwise.
  .withColumn("coverage", coalesce(
             'coverage,
             when($"next_cov_str.values" > 0.500, lit(0)),
             lit(-1)
  ))
  .show(false)
+---+-------------------+--------+------+-----------+------------+
|ID |date_in_days       |coverage|values|cov_str    |next_cov_str|
+---+-------------------+--------+------+-----------+------------+
|1  |2020-09-01 00:00:00|-1      |0.128 |null       |[0, 0.358]  |
|1  |2020-09-03 00:00:00|0       |0.358 |[0, 0.358] |[0, 0.358]  |
|1  |2020-09-04 00:00:00|0       |0.035 |[0, 0.035] |[0, 0.035]  |
|1  |2020-09-05 00:00:00|0       |null  |null       |[13, 0.554] |
|1  |2020-09-06 00:00:00|0       |null  |null       |[13, 0.554] |
|1  |2020-09-11 00:00:00|0       |null  |null       |[13, 0.554] |
|1  |2020-09-12 00:00:00|0       |null  |null       |[13, 0.554] |
|1  |2020-09-16 00:00:00|0       |null  |null       |[13, 0.554] |
|1  |2020-09-18 00:00:00|0       |null  |null       |[13, 0.554] |
|1  |2020-09-19 00:00:00|0       |null  |null       |[13, 0.554] |
|1  |2020-09-21 00:00:00|13      |0.554 |[13, 0.554]|[13, 0.554] |
|1  |2020-09-23 00:00:00|-1      |null  |null       |null        |
|1  |2020-09-30 00:00:00|-1      |null  |null       |null        |
+---+-------------------+--------+------+-----------+------------+

You may then use drop("cov_str", "next_cov_str") but I leave them here for clarity.

Sign up to request clarification or add additional context in comments.

2 Comments

So this is what i was missing, the windows, i was turning around with join on dates, i will try this thanks a lot.
No problem. Let me know if that's not exactly what you wanted, I was still unsure about the exact logic ;)

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.