0

I would like to know if it is possible using pyspark if I can calculate the time difference of a dataset by group. For example I have

CODE1 | CODE2  | TIME 
00001 |  AAA   | 2019-01-01 14:00:00
00001 |  AAA   | 2019-01-01 14:05:00
00001 |  AAA   | 2019-01-01 14:10:00
00001 |  BBB   | 2019-01-01 14:15:00
00001 |  BBB   | 2019-01-01 14:20:00
00001 |  AAA   | 2019-01-01 14:25:00
00001 |  AAA   | 2019-01-01 14:30:00

What I would like is something like

CODE1 | CODE2  | TIME_DIFF
00001 |  AAA   | 10 MINUTES 
00001 |  BBB   | 5 MINUTES
00001 |  AAA   | 5 MINUTES

The time difference is from the last record to the first one in the same category. I have already sorted the information by time. Is it possible?

1 Answer 1

1

I have coded it with a pretty normal & decent approach. However, the below can be optimized utilizing more inbuilt functions available in spark.

>>> df.show()
+-----+-----+-------------------+
|CODE1|CODE2|               TIME|
+-----+-----+-------------------+
|    1|  AAA|2019-01-01 14:00:00|
|    1|  AAA|2019-01-01 14:05:00|
|    1|  AAA|2019-01-01 14:10:00|
|    1|  BBB|2019-01-01 14:15:00|
|    1|  BBB|2019-01-01 14:20:00|
|    1|  AAA|2019-01-01 14:25:00|
|    1|  AAA|2019-01-01 14:30:00|
+-----+-----+-------------------+

>>> df.printSchema()
root
 |-- CODE1: long (nullable = true)
 |-- CODE2: string (nullable = true)
 |-- TIME: string (nullable = true)

>>> from pyspark.sql import functions as F, Window
>>> win = Window.partitionBy(F.lit(0)).orderBy('TIME')

#batch_order column is to group CODE2 as per the ordered timestamp
>>> df_1=df.withColumn('prev_batch', F.lag('CODE2').over(win)) \
...   .withColumn('flag', F.when(F.col('CODE2') == F.col('prev_batch'),0).otherwise(1)) \
...   .withColumn('batch_order', F.sum('flag').over(win)) \
...   .drop('prev_batch', 'flag') \
...   .sort('TIME')

>>> df_1.show()
+-----+-----+-------------------+-----------+
|CODE1|CODE2|               TIME|batch_order|
+-----+-----+-------------------+-----------+
|    1|  AAA|2019-01-01 14:00:00|          1|
|    1|  AAA|2019-01-01 14:05:00|          1|
|    1|  AAA|2019-01-01 14:10:00|          1|
|    1|  BBB|2019-01-01 14:15:00|          2|
|    1|  BBB|2019-01-01 14:20:00|          2|
|    1|  AAA|2019-01-01 14:25:00|          3|
|    1|  AAA|2019-01-01 14:30:00|          3|
+-----+-----+-------------------+-----------+

#Extract min and max timestamps for each group
>>> df_max=df_1.groupBy([df_1.batch_order,df_1.CODE2]).agg(F.max("TIME").alias("mx"))
>>> df_min=df_1.groupBy([df_1.batch_order,df_1.CODE2]).agg(F.min("TIME").alias("mn"))
>>> df_max.show()
+-----------+-----+-------------------+
|batch_order|CODE2|                 mx|
+-----------+-----+-------------------+
|          1|  AAA|2019-01-01 14:10:00|
|          2|  BBB|2019-01-01 14:20:00|
|          3|  AAA|2019-01-01 14:30:00|
+-----------+-----+-------------------+

>>> df_min.show()
+-----------+-----+-------------------+
|batch_order|CODE2|                 mn|
+-----------+-----+-------------------+
|          1|  AAA|2019-01-01 14:00:00|
|          2|  BBB|2019-01-01 14:15:00|
|          3|  AAA|2019-01-01 14:25:00|
+-----------+-----+-------------------+

#join on batch_order
>>> df_joined=df_max.join(df_min,df_max.batch_order==df_min.batch_order)
>>> df_joined.show()
+-----------+-----+-------------------+-----------+-----+-------------------+
|batch_order|CODE2|                 mx|batch_order|CODE2|                 mn|
+-----------+-----+-------------------+-----------+-----+-------------------+
|          1|  AAA|2019-01-01 14:10:00|          1|  AAA|2019-01-01 14:00:00|
|          3|  AAA|2019-01-01 14:30:00|          3|  AAA|2019-01-01 14:25:00|
|          2|  BBB|2019-01-01 14:20:00|          2|  BBB|2019-01-01 14:15:00|
+-----------+-----+-------------------+-----------+-----+-------------------+


>>> from pyspark.sql.functions import unix_timestamp
>>> from pyspark.sql.types import IntegerType
#difference between the max and min timestamp
>>> df_joined.withColumn("diff",((unix_timestamp(df_joined.mx, 'yyyy-MM-dd HH:mm:ss')-unix_timestamp(df_joined.mn, 'yyyy-MM-dd HH:mm:ss'))/60).cast(IntegerType())).show()
+-----------+-----+-------------------+-------------------+----+
|batch_order|CODE2|                 mx|                 mn|diff|
+-----------+-----+-------------------+-------------------+----+
|          1|  AAA|2019-01-01 14:10:00|2019-01-01 14:00:00|  10|
|          3|  AAA|2019-01-01 14:30:00|2019-01-01 14:25:00|   5|
|          2|  BBB|2019-01-01 14:20:00|2019-01-01 14:15:00|   5|
+-----------+-----+-------------------+-------------------+----+
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.