I have a DataFrame with columns of start_time and end_time. I want to set windows, with each observation's window being the two rows before it by end time, restricted to data with an end_time before that observation's start_time.
Example data:
data = [('a', 10, 12, 5),('b', 20, 25, 10),('c', 30, 60, 15),('d', 40, 45, 20),('e', 50, 70, 25)]
df = sqlContext.createDataFrame(data, ['name', 'start_time', 'end_time', 'resource'])
+----+----------+--------+--------+
|name|start_time|end_time|resource|
+----+----------+--------+--------+
| a| 10| 12| 5|
| b| 20| 25| 10|
| c| 30| 60| 15|
| d| 40| 45| 20|
| e| 50| 70| 25|
+----+----------+--------+--------+
So the window for 'e' should include 'b' and 'd', but not 'c'
Without the restriction of end time < start time, I was able to use
from pyspark.sql import Window
from pyspark.sql import functions as func
window = Window.orderBy("name").rowsBetween(-2, -1)
df.select('*', func.avg("resource").over(window).alias("avg")).show()
I looked into rangeBetween() but I can't figure out a way to reference the start_time of the current row, or that I want to restrict it by the end_time of the other rows. There's Window.currentRow, but in this example it would only reference the value for resource
Is this possible to do using Window? Should I be trying something else entirely?
Edit: Using Spark 2.1.1 and Python 2.7+ if it matters.
partitionBy? Without it you'll end up with all rows in the same partition and on a single executor which will kill it with large datasets.partitionBythat seems to be working fine--it's groups ofnamesuch that each group has an a, b, c, etc. When I run the code without the restriction of only considering data with end times < current row start time, it doesn't seem to have any issues. I'm just having trouble integrating this restriction.