0

I am trying to filter a Pyspark dataframe based on a list of tuples of timestamps [(start1, stop1), (start2, stop2), ...]. Each tuple represents a time window. The Pyspark dataframe as the following structure:

+-------------------+------+
|                 ts|   var|
+-------------------+------+
|2018-09-01 20:10:00|     0|
|2018-09-01 20:12:00|     2|
|2018-09-01 20:13:00|     1|
|2018-09-01 20:17:00|     5|
+-------------------+------+

ts is a column of timestamps and var is a column of a variable of interest. I am looking for an efficient method to filter out all rows which are not within one of the time windows. For example if my list of time windows consits of a single window [(datetime(2018, 9, 1, 20, 11), datetime(2018, 9, 1, 20, 14))] the filtered dataframe should be

+-------------------+------+ 
|                 ts|   var| 
+-------------------+------+ 
|2018-09-01 20:12:00|     2| 
|2018-09-01 20:13:00|     1|
+-------------------+------+ 

I was able to come up with a working code snippet using a udf and a for-loop which iterates for each row over all time windows (see code below). However looping for each row over all time windows is slow.

Some additional information:

  • the sizes and number of time windows is not known in advance i.e. no hard coding is possible
  • the Pyspark dataframe has typically several millions of rows
  • the number of time windows is typically between 100-1000

If someone could point out a more efficient solution, I would greatly appreciate it.

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.types import BooleanType
import pandas as pd
from datetime import datetime

spark = SparkSession.builder.getOrCreate()

# create Pyspark dataframe
data = {'ts': [datetime(2018, 9, 1, 20, 10), datetime(2018, 9, 1, 20, 12),
               datetime(2018, 9, 1, 20, 13), datetime(2018, 9, 1, 20, 17)],
         'var': [0, 2, 1, 5]}
df = spark.createDataFrame(pd.DataFrame(data))

# list of windows [(start1, stop1), (start2, stop2), ...] for filtering
windows = [(datetime(2018, 9, 1, 20, 11), datetime(2018, 9, 1, 20, 14))]

# udf for filtering
def is_in_windows_udf(windows):
    def _is_in_windows(t, windows):
        for ts_l, ts_h in windows:
            if ts_l <= t <= ts_h:
                return True
            return False
    return udf(lambda t: _is_in_windows(t, windows), BooleanType())

# perform actual filtering operation
df.where(is_in_windows_udf(windows)(col("ts"))).show()

1 Answer 1

1

A simpler solution could be the below one and because we are doing union over the same data set so it will parallelize the execution as well:

for count, item in enumerate(windows):
    if count == 0:
        result = df.filter(
            (F.col("ts")<= item[1]) &
            (F.col("ts")>= item[0])
        )
    else:
        result = result.union(
            df.filter(
            (F.col("ts")<= item[1]) &
            (F.col("ts")>= item[0])
            )
        )
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.