I am trying to filter a Pyspark dataframe based on a list of tuples of timestamps [(start1, stop1), (start2, stop2), ...]. Each tuple represents a time window. The Pyspark dataframe as the following structure:
+-------------------+------+
| ts| var|
+-------------------+------+
|2018-09-01 20:10:00| 0|
|2018-09-01 20:12:00| 2|
|2018-09-01 20:13:00| 1|
|2018-09-01 20:17:00| 5|
+-------------------+------+
ts is a column of timestamps and var is a column of a variable of interest. I am looking for an efficient method to filter out all rows which are not within one of the time windows. For example if my list of time windows consits of a single window [(datetime(2018, 9, 1, 20, 11), datetime(2018, 9, 1, 20, 14))] the filtered dataframe should be
+-------------------+------+
| ts| var|
+-------------------+------+
|2018-09-01 20:12:00| 2|
|2018-09-01 20:13:00| 1|
+-------------------+------+
I was able to come up with a working code snippet using a udf and a for-loop which iterates for each row over all time windows (see code below). However looping for each row over all time windows is slow.
Some additional information:
- the sizes and number of time windows is not known in advance i.e. no hard coding is possible
- the Pyspark dataframe has typically several millions of rows
- the number of time windows is typically between 100-1000
If someone could point out a more efficient solution, I would greatly appreciate it.
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.types import BooleanType
import pandas as pd
from datetime import datetime
spark = SparkSession.builder.getOrCreate()
# create Pyspark dataframe
data = {'ts': [datetime(2018, 9, 1, 20, 10), datetime(2018, 9, 1, 20, 12),
datetime(2018, 9, 1, 20, 13), datetime(2018, 9, 1, 20, 17)],
'var': [0, 2, 1, 5]}
df = spark.createDataFrame(pd.DataFrame(data))
# list of windows [(start1, stop1), (start2, stop2), ...] for filtering
windows = [(datetime(2018, 9, 1, 20, 11), datetime(2018, 9, 1, 20, 14))]
# udf for filtering
def is_in_windows_udf(windows):
def _is_in_windows(t, windows):
for ts_l, ts_h in windows:
if ts_l <= t <= ts_h:
return True
return False
return udf(lambda t: _is_in_windows(t, windows), BooleanType())
# perform actual filtering operation
df.where(is_in_windows_udf(windows)(col("ts"))).show()