1

This is a pyspark overlapping time period problem:

Sample data

data = [
    (1, "2024-01-28T05:00:00Z", "2024-01-28T06:00:00Z", "1/24/24"),
    (1, "2024-01-28T05:30:00Z", "2024-01-28T07:00:00Z", "1/25/24"),
    (1, "2024-01-28T06:00:00Z", "2024-01-28T09:00:00Z", "1/24/24"),
    (1, "2024-01-28T07:00:00Z", "2024-01-28T10:30:00Z", "1/25/24"),
    (3, "2024-01-28T12:00:00Z", "2024-01-28T13:00:00Z", "1/26/24"),
]

columns = ["station_id", "start_time", "end_time", "partition_date"]

I am trying to identify overlapping records based on start_time and end_time fields, for the same station_id, and once I identify I want to only keep the rows with the most recent partition_date and remove the overlapping rows that have the old partition date.

The intended output would be:

output = [
    (1, "2024-01-28T05:30:00Z", "2024-01-28T07:00:00Z", "1/25/24"),
    (1, "2024-01-28T07:00:00Z", "2024-01-28T10:30:00Z", "1/25/24"),
    (3, "2024-01-28T12:00:00Z", "2024-01-28T13:00:00Z", "1/26/24"),
]

I have tried a few ways to do this from doing joins to doing windowing, but either way I end up not achieving the desired results, which are: identifying overlapping rows, and only keep the most recent overlapping rows while removing the rest, and also keeping all rows that are not overlapping. The goal is to time a specific time duration for a station_id only once so for example 5:00 to 6:00 should only have one record for one station_id.

One solution I tried:

dl_ota_airings_df_dup = dl_ota_airings_df_3.selectExpr("station_id as station_id2", "start_time as start_time2", "end_time as end_time2", "content_id as content_id2", "partition_date as partition_date2")

join_condition = ((dl_ota_airings_df_3["station_id"] == dl_ota_airings_df_dup["station_id2"]) &
    ((dl_ota_airings_df_3["start_time"] >= dl_ota_airings_df_dup["start_time2"]) & (dl_ota_airings_df_3["start_time"] < dl_ota_airings_df_dup["end_time2"]))
    |
    ((dl_ota_airings_df_3["end_time"] <= dl_ota_airings_df_dup["end_time2"]) & (dl_ota_airings_df_3["end_time"] > dl_ota_airings_df_dup["start_time2"])))
    # &
    # ((dl_ota_airings_df_3["start_time"] != dl_ota_airings_df_dup["start_time2"]) & (dl_ota_airings_df_3["end_time"] != dl_ota_airings_df_dup["end_time2"]) & (dl_ota_airings_df_3["partition_date"] != dl_ota_airings_df_dup["partition_date2"])))

df_overlapping = dl_ota_airings_df_3.join(dl_ota_airings_df_dup, join_condition, "left")

dl_ota_airings_df_4 = df_overlapping.filter("station_id2 is null or (partition_date > partition_date2)").drop("station_id2", "start_time2", "end_time2", "content_id2", "partition_date2").dropDuplicates()

I always have some edge cases that are not captured by this logic, because for any available overlap for any station_id, I want to keep only one record with the most recent partition while I remove the rest. Please advise, or point me in the right direction here.

1 Answer 1

0

You can try below code to achieve this

windowSpec = Window.partitionBy("station_id").orderBy("start_time")

result_df = df.withColumn("prev_end_time", F.lag("end_time").over(windowSpec)) \
    .withColumn("overlap", F.when(F.col("start_time") < F.col("prev_end_time"), True).otherwise(False)) \
    .withColumn("max_partition_date", F.max("partition_date").over(Window.partitionBy("station_id"))) \
    .filter((~F.col("overlap")) | (F.col("partition_date") == F.col("max_partition_date"))) \
    .select("station_id", "start_time", "end_time", "partition_date")
Sign up to request clarification or add additional context in comments.

1 Comment

There could be multiple rows that overlap with one another, and we have to take the latest, this does not do that, as this only looks for overlap with the previous row.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.