4

I want spark to avoid creating two separate window stage, for same window object used twice in my code.

How can I use it once in my code in the following example, and tell spark to do sum and division under single window.

df = df.withColumn("colum_c", 
            f.sum(f.col("colum_a")).over(window) /
            f.sum(f.col("colum_b")).over(window))

Example:

days = lambda i: (i - 1) * 86400

window = (
    Window()
    .partitionBy(f.col("account_id"))
    .orderBy(f.col("event_date").cast("timestamp").cast("long"))
    .rangeBetween(-days(7), 0)
)

df.withColumn(
    "fea_fuel_consumption_ratio_petrol_diesel_01w",
    (
        f.sum(f.col("fea_fuel_consumption_petrol")).over(window)
        / f.sum(f.col("fea_fuel_consumption_diesel")).over(window)
    ),
).show(1000, False)

Spark Created Two different window

2
  • There might be some optimization based on ur specific use case. Could u tell us what window is and sample input/output please. Commented May 10, 2020 at 6:55
  • 1
    Okay adding that Commented May 10, 2020 at 6:56

1 Answer 1

4

You could use collect_list over only one window and then use higher order function aggregate to get your desired result (sum/sum).

df.show() #sample data

#+----------+--------+--------+----------+
#|account_id|column_a|column_b|event_date|
#+----------+--------+--------+----------+
#|         1|      90|      23| 2019-2-23|
#|         1|      45|      12| 2019-2-28|
#|         1|      80|      38| 2019-3-21|
#|         1|      62|      91| 2019-3-24|
#|         2|      21|      11| 2019-3-29|
#|         2|      57|      29| 2019-1-08|
#|         2|      68|      13| 2019-1-12|
#|         2|      19|      14| 2019-1-14|
#+----------+--------+--------+----------+

from pyspark.sql import functions as f
from pyspark.sql.window import Window

days = lambda i: i * 86400

window =\
    Window()\
    .partitionBy(f.col("account_id"))\
    .orderBy(f.col("event_date").cast("timestamp").cast("long"))\
    .rangeBetween(-days(7), 0)

df.withColumn("column_c",f.collect_list(f.array("column_a","column_b")).over(window))\
  .withColumn("column_c", f.expr("""aggregate(column_c,0,(acc,x)-> int(x[0])+acc)/\
                               aggregate(column_c,0,(acc,x)-> int(x[1])+acc)""")).show()

#+----------+--------+--------+----------+------------------+
#|account_id|column_a|column_b|event_date|          column_c|
#+----------+--------+--------+----------+------------------+
#|         1|      90|      23| 2019-2-23|3.9130434782608696|
#|         1|      45|      12| 2019-2-28| 3.857142857142857|
#|         1|      80|      38| 2019-3-21|2.1052631578947367|
#|         1|      62|      91| 2019-3-24|1.1007751937984496|
#|         2|      57|      29| 2019-1-08|1.9655172413793103|
#|         2|      68|      13| 2019-1-12|2.9761904761904763|
#|         2|      19|      14| 2019-1-14|2.5714285714285716|
#|         2|      21|      11| 2019-3-29|1.9090909090909092|
#+----------+--------+--------+----------+------------------+

As you can see in the physical plan, using this method, you can only see 1 windowspecdefinition or specifiedwindowframe, hence 1 window used.

.explain()

== Physical Plan ==
*(2) Project [account_id#4848L, column_b#4849L, column_a#4850L, event_date#4851, (cast(aggregate(column_c#6838, 0, lambdafunction((cast(lambda x#6846[0] as int) + lambda acc#6845), lambda acc#6845, lambda x#6846, false), lambdafunction(lambda id#6847, lambda id#6847, false)) as double) / cast(aggregate(column_c#6838, 0, lambdafunction((cast(lambda x#6849[1] as int) + lambda acc#6848), lambda acc#6848, lambda x#6849, false), lambdafunction(lambda id#6850, lambda id#6850, false)) as double)) AS column_c#6844]
+- Window [account_id#4848L, column_b#4849L, column_a#4850L, event_date#4851, collect_list(_w1#6857, 0, 0) windowspecdefinition(account_id#4848L, _w0#6856L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -518400, currentrow$())) AS column_c#6838], [account_id#4848L], [_w0#6856L ASC NULLS FIRST]
   +- Sort [account_id#4848L ASC NULLS FIRST, _w0#6856L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(account_id#4848L, 200), [id=#1554]
         +- *(1) Project [account_id#4848L, column_b#4849L, column_a#4850L, event_date#4851, cast(cast(event_date#4851 as timestamp) as bigint) AS _w0#6856L, array(column_a#4850L, column_b#4849L) AS _w1#6857]
            +- *(1) Scan ExistingRDD[account_id#4848L,column_b#4849L,column_a#4850L,event_date#4851]

Instead of:(2 windows)

df.withColumn("colum_c",f.sum(f.col("column_a")).over(window)\
                              /f.sum(f.col("column_b")).over(window)).show()

In this physical plan, we can see 2 instances of windowspecdefinition or specifiedwindowframe. hence 2 windows used.

.explain()

== Physical Plan ==
Window [account_id#4848L, column_b#4849L, column_a#4850L, event_date#4851, (cast(sum(column_a#4850L) windowspecdefinition(account_id#4848L, _w0#6804L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -604800, currentrow$())) as double) / cast(sum(column_b#4849L) windowspecdefinition(account_id#4848L, _w0#6804L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -604800, currentrow$())) as double)) AS colum_c#6798], [account_id#4848L], [_w0#6804L ASC NULLS FIRST]
+- Sort [account_id#4848L ASC NULLS FIRST, _w0#6804L ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(account_id#4848L, 200), [id=#1453]
      +- *(1) Project [account_id#4848L, column_b#4849L, column_a#4850L, event_date#4851, cast(cast(event_date#4851 as timestamp) as bigint) AS _w0#6804L]
         +- *(1) Scan ExistingRDD[account_id#4848L,column_b#4849L,column_a#4850L,event_date#4851]
Sign up to request clarification or add additional context in comments.

1 Comment

If fea_fuel_consumption_petrol is not an integer, can u show what it looks like (how many decimal places max) n I will update

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.