0

How can I check for the dates from the adjacent rows (preceding and next) in a Dataframe. This should happen at a key level

I have following data after sorting on key, dates

source_Df.show()
+-----+--------+------------+------------+
| key | code   | begin_dt   | end_dt     |
+-----+--------+------------+------------+
| 10  |  ABC   | 2018-01-01 | 2018-01-08 |
| 10  |  BAC   | 2018-01-03 | 2018-01-15 |
| 10  |  CAS   | 2018-01-03 | 2018-01-21 |
| 20  |  AAA   | 2017-11-12 | 2018-01-03 |
| 20  |  DAS   | 2018-01-01 | 2018-01-12 |
| 20  |  EDS   | 2018-02-01 | 2018-02-16 |
+-----+--------+------------+------------+

When the dates are in a range from these rows (i.e. the current row begin_dt falls in between begin and end dates of the previous row), I need to have the lowest begin date on all such rows and the highest end date. Here is the output I need..

final_Df.show()
+-----+--------+------------+------------+
| key | code   | begin_dt   | end_dt     |
+-----+--------+------------+------------+
| 10  |  ABC   | 2018-01-01 | 2018-01-21 |
| 10  |  BAC   | 2018-01-01 | 2018-01-21 |
| 10  |  CAS   | 2018-01-01 | 2018-01-21 |
| 20  |  AAA   | 2017-11-12 | 2018-01-12 |
| 20  |  DAS   | 2017-11-12 | 2018-01-12 |
| 20  |  EDS   | 2018-02-01 | 2018-02-16 |
+-----+--------+------------+------------+

Appreciate any ideas to achieve this. Thanks in advance!

1

1 Answer 1

2

Here's one approach:

  1. Create new column group_id with null value if begin_dt is within date range from the previous row; otherwise a unique integer
  2. Backfill nulls in group_id with the last non-null value
  3. Compute min(begin_dt) and max(end_dt) within each (key, group_id) partition

Example below:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val df = Seq(
  (10, "ABC", "2018-01-01", "2018-01-08"),
  (10, "BAC", "2018-01-03", "2018-01-15"),
  (10, "CAS", "2018-01-03", "2018-01-21"),
  (20, "AAA", "2017-11-12", "2018-01-03"),
  (20, "DAS", "2018-01-01", "2018-01-12"),
  (20, "EDS", "2018-02-01", "2018-02-16")
).toDF("key", "code", "begin_dt", "end_dt")

val win1 = Window.partitionBy($"key").orderBy($"begin_dt", $"end_dt")
val win2 = Window.partitionBy($"key", $"group_id")

df.
  withColumn("group_id", when(
      $"begin_dt".between(lag($"begin_dt", 1).over(win1), lag($"end_dt", 1).over(win1)), null
    ).otherwise(monotonically_increasing_id)
  ).
  withColumn("group_id", last($"group_id", ignoreNulls=true).
      over(win1.rowsBetween(Window.unboundedPreceding, 0))
  ).
  withColumn("begin_dt2", min($"begin_dt").over(win2)).
  withColumn("end_dt2", max($"end_dt").over(win2)).
  orderBy("key", "begin_dt", "end_dt").
  show
// +---+----+----------+----------+-------------+----------+----------+
// |key|code|  begin_dt|    end_dt|     group_id| begin_dt2|   end_dt2|
// +---+----+----------+----------+-------------+----------+----------+
// | 10| ABC|2018-01-01|2018-01-08|1047972020224|2018-01-01|2018-01-21|
// | 10| BAC|2018-01-03|2018-01-15|1047972020224|2018-01-01|2018-01-21|
// | 10| CAS|2018-01-03|2018-01-21|1047972020224|2018-01-01|2018-01-21|
// | 20| AAA|2017-11-12|2018-01-03| 455266533376|2017-11-12|2018-01-12|
// | 20| DAS|2018-01-01|2018-01-12| 455266533376|2017-11-12|2018-01-12|
// | 20| EDS|2018-02-01|2018-02-16| 455266533377|2018-02-01|2018-02-16|
// +---+----+----------+----------+-------------+----------+----------+
Sign up to request clarification or add additional context in comments.

3 Comments

Thanks Leo. You are the best!
I want a column like group_id. However, it should start from a specific number and increase by 1. Any suggestion please..
@Lux, unfortunately monotonically_increasing_id only guarantees generating unique increasing numbers. To have more control on the generated numbers, you would have to consider using Window function without partition (which doesn't scale) or converting to RDD for its zipWithIndex method as shown in this SO link.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.