1

I have a DataFrame with columns of start_time and end_time. I want to set windows, with each observation's window being the two rows before it by end time, restricted to data with an end_time before that observation's start_time.

Example data:

data = [('a', 10, 12, 5),('b', 20, 25, 10),('c', 30, 60, 15),('d', 40, 45, 20),('e', 50, 70, 25)]
df = sqlContext.createDataFrame(data, ['name', 'start_time', 'end_time', 'resource'])
+----+----------+--------+--------+
|name|start_time|end_time|resource|
+----+----------+--------+--------+
|   a|        10|      12|       5|
|   b|        20|      25|      10|
|   c|        30|      60|      15|
|   d|        40|      45|      20|
|   e|        50|      70|      25|
+----+----------+--------+--------+

So the window for 'e' should include 'b' and 'd', but not 'c'

Without the restriction of end time < start time, I was able to use

from pyspark.sql import Window        
from pyspark.sql import functions as func
window = Window.orderBy("name").rowsBetween(-2, -1)
df.select('*', func.avg("resource").over(window).alias("avg")).show()

I looked into rangeBetween() but I can't figure out a way to reference the start_time of the current row, or that I want to restrict it by the end_time of the other rows. There's Window.currentRow, but in this example it would only reference the value for resource

Is this possible to do using Window? Should I be trying something else entirely?

Edit: Using Spark 2.1.1 and Python 2.7+ if it matters.

2
  • What's the partitionBy? Without it you'll end up with all rows in the same partition and on a single executor which will kill it with large datasets. Commented Jul 14, 2017 at 1:19
  • Yeah, my actual data is pretty large, so I have a partitionBy that seems to be working fine--it's groups of name such that each group has an a, b, c, etc. When I run the code without the restriction of only considering data with end times < current row start time, it doesn't seem to have any issues. I'm just having trouble integrating this restriction. Commented Jul 14, 2017 at 18:55

2 Answers 2

2

you can actually use groupBy function for aggregation for different partitions and then use the inner join between the output dataframes over the same common key. Partition by or window function takes much time in spark so better to use groupby instead if you can.

Sign up to request clarification or add additional context in comments.

Comments

1

I don't think this is possible purely using windows. From a given row, you need to be able to work in reverse sort order back through prior rows until you have two hits which satisfy your condition.

You could use a window function to create a list of all previous values encountered for each row, and then a UDF with some pure scala/python to determine the sum, accounting for your exclusions.

In scala:

val window = Window.partitionBy(???).orderBy("end_time").rowsBetween(Long.MinValue, -1)

val udfWithSelectionLogic = udf { values: Seq[Row] => INSERT_LOGIC_HERE_TO_CALCULATE_AGGREGATE }

val dataPlus = data.withColumn("combined", struct($"start_time", $"end_time", $"resource"))
        .withColumn("collected", collect_list($"combined") over window)
        .withColumn("result", udfWithSelectionLogic($"collected"))

This isn't ideal, but might be helpful.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.