13

This is my DataFrame in PySpark:

utc_timestamp               data    feed
2015-10-13 11:00:00+00:00   1       A
2015-10-13 12:00:00+00:00   5       A
2015-10-13 13:00:00+00:00   6       A
2015-10-13 14:00:00+00:00   10      B
2015-10-13 15:00:00+00:00   11      B

The values of data are cumulative.

I want to get this result (differences between consecutive rows, grouped by feed):

utc_timestamp               data    feed
2015-10-13 11:00:00+00:00   1       A
2015-10-13 12:00:00+00:00   4       A
2015-10-13 13:00:00+00:00   1       A  
2015-10-13 14:00:00+00:00   10      B
2015-10-13 15:00:00+00:00   1       B

In pandas I would do it this way:

df["data"] -= (df.groupby("feed")["data"].shift(fill_value=0))

How can I do the same thing in PySpark?

2 Answers 2

22

You can do this using lag function with a window:

from pyspark.sql.window import Window
import pyspark.sql.functions as f

window = Window.partitionBy("feed").orderBy("utc_timestamp")

df = df.withColumn("data", f.col("data") - f.lag(f.col("data"), 1, 0).over(window))
Sign up to request clarification or add additional context in comments.

Comments

9

You can use lag as a substitute for shift, and coalesce( , F.lit(0)) as a substitute for fill_value=0

from pyspark.sql.window import Window
import pyspark.sql.functions as F

window = Window.partitionBy("feed").orderBy("utc_timestamp")

data = F.col("data") - F.coalesce(F.lag(F.col("data")).over(window), F.lit(0))
df.withColumn("data", data)

1 Comment

You can avoid the use of coalesce setting in lag function the value 0 on default: lag(col, offset=1, default=0)

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.