This is a pyspark overlapping time period problem:
Sample data
data = [
(1, "2024-01-28T05:00:00Z", "2024-01-28T06:00:00Z", "1/24/24"),
(1, "2024-01-28T05:30:00Z", "2024-01-28T07:00:00Z", "1/25/24"),
(1, "2024-01-28T06:00:00Z", "2024-01-28T09:00:00Z", "1/24/24"),
(1, "2024-01-28T07:00:00Z", "2024-01-28T10:30:00Z", "1/25/24"),
(3, "2024-01-28T12:00:00Z", "2024-01-28T13:00:00Z", "1/26/24"),
]
columns = ["station_id", "start_time", "end_time", "partition_date"]
I am trying to identify overlapping records based on start_time and end_time fields, for the same station_id, and once I identify I want to only keep the rows with the most recent partition_date and remove the overlapping rows that have the old partition date.
The intended output would be:
output = [
(1, "2024-01-28T05:30:00Z", "2024-01-28T07:00:00Z", "1/25/24"),
(1, "2024-01-28T07:00:00Z", "2024-01-28T10:30:00Z", "1/25/24"),
(3, "2024-01-28T12:00:00Z", "2024-01-28T13:00:00Z", "1/26/24"),
]
I have tried a few ways to do this from doing joins to doing windowing, but either way I end up not achieving the desired results, which are: identifying overlapping rows, and only keep the most recent overlapping rows while removing the rest, and also keeping all rows that are not overlapping. The goal is to time a specific time duration for a station_id only once so for example 5:00 to 6:00 should only have one record for one station_id.
One solution I tried:
dl_ota_airings_df_dup = dl_ota_airings_df_3.selectExpr("station_id as station_id2", "start_time as start_time2", "end_time as end_time2", "content_id as content_id2", "partition_date as partition_date2")
join_condition = ((dl_ota_airings_df_3["station_id"] == dl_ota_airings_df_dup["station_id2"]) &
((dl_ota_airings_df_3["start_time"] >= dl_ota_airings_df_dup["start_time2"]) & (dl_ota_airings_df_3["start_time"] < dl_ota_airings_df_dup["end_time2"]))
|
((dl_ota_airings_df_3["end_time"] <= dl_ota_airings_df_dup["end_time2"]) & (dl_ota_airings_df_3["end_time"] > dl_ota_airings_df_dup["start_time2"])))
# &
# ((dl_ota_airings_df_3["start_time"] != dl_ota_airings_df_dup["start_time2"]) & (dl_ota_airings_df_3["end_time"] != dl_ota_airings_df_dup["end_time2"]) & (dl_ota_airings_df_3["partition_date"] != dl_ota_airings_df_dup["partition_date2"])))
df_overlapping = dl_ota_airings_df_3.join(dl_ota_airings_df_dup, join_condition, "left")
dl_ota_airings_df_4 = df_overlapping.filter("station_id2 is null or (partition_date > partition_date2)").drop("station_id2", "start_time2", "end_time2", "content_id2", "partition_date2").dropDuplicates()
I always have some edge cases that are not captured by this logic, because for any available overlap for any station_id, I want to keep only one record with the most recent partition while I remove the rest. Please advise, or point me in the right direction here.