38

I have a Spark Dataframe in that consists of a series of dates:

from pyspark.sql import SQLContext
from pyspark.sql import Row
from pyspark.sql.types import *
sqlContext = SQLContext(sc)
import pandas as pd

rdd = sc.parallelizesc.parallelize([('X01','2014-02-13T12:36:14.899','2014-02-13T12:31:56.876','sip:4534454450'),
                                    ('X02','2014-02-13T12:35:37.405','2014-02-13T12:32:13.321','sip:6413445440'),
                                    ('X03','2014-02-13T12:36:03.825','2014-02-13T12:32:15.229','sip:4534437492'),
                                    ('XO4','2014-02-13T12:37:05.460','2014-02-13T12:32:36.881','sip:6474454453'),
                                    ('XO5','2014-02-13T12:36:52.721','2014-02-13T12:33:30.323','sip:8874458555')])
schema = StructType([StructField('ID', StringType(), True),
                     StructField('EndDateTime', StringType(), True),
                     StructField('StartDateTime', StringType(), True)])
df = sqlContext.createDataFrame(rdd, schema)

What I want to do is find duration by subtracting EndDateTime and StartDateTime. I figured I'd try and do this using a function:

# Function to calculate time delta
def time_delta(y,x): 
    end = pd.to_datetime(y)
    start = pd.to_datetime(x)
    delta = (end-start)
    return delta

# create new RDD and add new column 'Duration' by applying time_delta function
df2 = df.withColumn('Duration', time_delta(df.EndDateTime, df.StartDateTime)) 

However this just gives me:

>>> df2.show()
ID  EndDateTime          StartDateTime        ANI            Duration
X01 2014-02-13T12:36:... 2014-02-13T12:31:... sip:4534454450 null    
X02 2014-02-13T12:35:... 2014-02-13T12:32:... sip:6413445440 null    
X03 2014-02-13T12:36:... 2014-02-13T12:32:... sip:4534437492 null    
XO4 2014-02-13T12:37:... 2014-02-13T12:32:... sip:6474454453 null    
XO5 2014-02-13T12:36:... 2014-02-13T12:33:... sip:8874458555 null  

I'm not sure if my approach is correct or not. If not, I'd gladly accept another suggested way to achieve this.

7
  • Have you tried debugging in the REPL? Commented May 18, 2015 at 4:46
  • @dskrvk I don't have much experience debugging since I'm not a developer. However, I suspect the issue is in how Spark hands off data to functions. For example, time_delta() works in pure Python. For some reason, certain Python/Pandas functions just don't play nice. E.g. import re def extract_ani(x): extract = x.str.extract(r'(\d{10})') return extract Dates = Dates.withColumn('Cell', extract_ani(Dates.ANI)) also errors out with Spark DataFrames, but works when I convert the dataframe to an RDD and use the function as part of a sc.map Commented May 18, 2015 at 23:54
  • In Scala I would use TimestampType instead of StringType to hold the dates, and then create a UDF to calculate the difference between the two columns. I don't see anywhere that you declare time_delta to be user defined function, but that's a required step in Scala to make it do what you are trying to do. Commented May 19, 2015 at 1:39
  • 1
    Yeah take a look at spark.apache.org/docs/latest/api/python/… under pyspark.sql.functions.udf. You need to create time_delta as a UDF Commented May 19, 2015 at 2:07
  • @David Griffin you were right :) I initially disregarded registering UDF's as I believed you had to register UDFs only of you wanted to use the select expression Commented May 19, 2015 at 2:46

7 Answers 7

61

As of Spark 1.5 you can use unix_timestamp:

from pyspark.sql import functions as F
timeFmt = "yyyy-MM-dd'T'HH:mm:ss.SSS"
timeDiff = (F.unix_timestamp('EndDateTime', format=timeFmt)
            - F.unix_timestamp('StartDateTime', format=timeFmt))
df = df.withColumn("Duration", timeDiff)

Note the Java style time format.

>>> df.show()
+---+--------------------+--------------------+--------+
| ID|         EndDateTime|       StartDateTime|Duration|
+---+--------------------+--------------------+--------+
|X01|2014-02-13T12:36:...|2014-02-13T12:31:...|     258|
|X02|2014-02-13T12:35:...|2014-02-13T12:32:...|     204|
|X03|2014-02-13T12:36:...|2014-02-13T12:32:...|     228|
|XO4|2014-02-13T12:37:...|2014-02-13T12:32:...|     269|
|XO5|2014-02-13T12:36:...|2014-02-13T12:33:...|     202|
+---+--------------------+--------------------+--------+
Sign up to request clarification or add additional context in comments.

1 Comment

You can divide by 3600.0 to convert to hours df.withColumn("Duration_hours", df.Duration / 3600.0)
16

Thanks to David Griffin. Here's how to do this for future reference.

from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)
from pyspark.sql.types import StringType, IntegerType, StructType, StructField
from pyspark.sql.functions import udf

# Build sample data
rdd = sc.parallelize([('X01','2014-02-13T12:36:14.899','2014-02-13T12:31:56.876'),
                      ('X02','2014-02-13T12:35:37.405','2014-02-13T12:32:13.321'),
                      ('X03','2014-02-13T12:36:03.825','2014-02-13T12:32:15.229'),
                      ('XO4','2014-02-13T12:37:05.460','2014-02-13T12:32:36.881'),
                      ('XO5','2014-02-13T12:36:52.721','2014-02-13T12:33:30.323')])
schema = StructType([StructField('ID', StringType(), True),
                     StructField('EndDateTime', StringType(), True),
                     StructField('StartDateTime', StringType(), True)])
df = sqlContext.createDataFrame(rdd, schema)

# define timedelta function (obtain duration in seconds)
def time_delta(y,x): 
    from datetime import datetime
    end = datetime.strptime(y, '%Y-%m-%dT%H:%M:%S.%f')
    start = datetime.strptime(x, '%Y-%m-%dT%H:%M:%S.%f')
    delta = (end-start).total_seconds()
    return delta

# register as a UDF 
f = udf(time_delta, IntegerType())

# Apply function
df2 = df.withColumn('Duration', f(df.EndDateTime, df.StartDateTime)) 

Applying time_delta() will give you duration in seconds:

>>> df2.show()
ID  EndDateTime          StartDateTime        Duration
X01 2014-02-13T12:36:... 2014-02-13T12:31:... 258     
X02 2014-02-13T12:35:... 2014-02-13T12:32:... 204     
X03 2014-02-13T12:36:... 2014-02-13T12:32:... 228     
XO4 2014-02-13T12:37:... 2014-02-13T12:32:... 268     
XO5 2014-02-13T12:36:... 2014-02-13T12:33:... 202 

2 Comments

Please use (end-start).total_seconds() . Otherwise you get nasty surprises like this: time_delta('2014-02-13T12:36:14.000', '2014-02-13T12:36:15.900') returns 86398 instead of -1.9
This code doesnt work any more. The duration comes out as null. Using zeppelin, spark 1.6
14
datediff(Column end, Column start)

Returns the number of days from start to end.

https://spark.apache.org/docs/1.6.2/api/java/org/apache/spark/sql/functions.html

Comments

2

This can be done in spark-sql by converting the string date to timestamp and then getting the difference.

1: Convert to timestamp:

CAST(UNIX_TIMESTAMP(MY_COL_NAME,'dd-MMM-yy') as TIMESTAMP)

2: Get the difference between dates using datediff function.

This will be combined in a nested function like:

spark.sql("select COL_1, COL_2, datediff( CAST( UNIX_TIMESTAMP( COL_1,'dd-MMM-yy') as TIMESTAMP), CAST( UNIX_TIMESTAMP( COL_2,'dd-MMM-yy') as TIMESTAMP) ) as LAG_in_days from MyTable")

Below is the result:

+---------+---------+-----------+
|    COL_1|    COL_2|LAG_in_days|
+---------+---------+-----------+
|24-JAN-17|16-JAN-17|          8|
|19-JAN-05|18-JAN-05|          1|
|23-MAY-06|23-MAY-06|          0|
|18-AUG-06|17-AUG-06|          1|
+---------+---------+-----------+

Reference: https://docs-snaplogic.atlassian.net/wiki/spaces/SD/pages/2458071/Date+Functions+and+Properties+Spark+SQL

Comments

1

Use DoubleType instead of IntegerType

from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)
from pyspark.sql.types import StringType, IntegerType, StructType, StructField
from pyspark.sql.functions import udf


# Build sample data
rdd = sc.parallelize([('X01','2014-02-13T12:36:14.899','2014-02-13T12:31:56.876'),
                      ('X02','2014-02-13T12:35:37.405','2014-02-13T12:32:13.321'),
                      ('X03','2014-02-13T12:36:03.825','2014-02-13T12:32:15.229'),
                      ('XO4','2014-02-13T12:37:05.460','2014-02-13T12:32:36.881'),
                      ('XO5','2014-02-13T12:36:52.721','2014-02-13T12:33:30.323')])
schema = StructType([StructField('ID', StringType(), True),
                     StructField('EndDateTime', StringType(), True),
                     StructField('StartDateTime', StringType(), True)])
df = sqlContext.createDataFrame(rdd, schema)

# define timedelta function (obtain duration in seconds)
def time_delta(y,x): 
    from datetime import datetime
    end = datetime.strptime(y, '%Y-%m-%dT%H:%M:%S.%f')
    start = datetime.strptime(x, '%Y-%m-%dT%H:%M:%S.%f')
    delta = (end-start).total_seconds()
    return delta

# register as a UDF 
f = udf(time_delta, DoubleType())

# Apply function
df2 = df.withColumn('Duration', f(df.EndDateTime, df.StartDateTime))

Comments

0

Here is a working version for spark 2.x derived from jason's answer

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession,SQLContext
from pyspark.sql.types import StringType, StructType, StructField

sc = SparkContext()
sqlContext = SQLContext(sc)
spark = SparkSession.builder.appName("Python Spark SQL basic example").getOrCreate()

rdd = sc.parallelize([('X01','2014-02-13T12:36:14.899','2014-02-13T12:31:56.876'),
                      ('X02','2014-02-13T12:35:37.405','2014-02-13T12:32:13.321'),
                      ('X03','2014-02-13T12:36:03.825','2014-02-13T12:32:15.229'),
                      ('XO4','2014-02-13T12:37:05.460','2014-02-13T12:32:36.881'),
                      ('XO5','2014-02-13T12:36:52.721','2014-02-13T12:33:30.323')])
schema = StructType([StructField('ID', StringType(), True),
                     StructField('EndDateTime', StringType(), True),
                     StructField('StartDateTime', StringType(), True)])
df = sqlContext.createDataFrame(rdd, schema)

# register as a UDF 
from datetime import datetime
sqlContext.registerFunction("time_delta", lambda y,x:(datetime.strptime(y, '%Y-%m-%dT%H:%M:%S.%f')-datetime.strptime(x, '%Y-%m-%dT%H:%M:%S.%f')).total_seconds())

df.createOrReplaceTempView("Test_table")

spark.sql("SELECT ID,EndDateTime,StartDateTime,time_delta(EndDateTime,StartDateTime) as time_delta FROM Test_table").show()

sc.stop()

Comments

0

Spark 4.0+ has timestamp_diff

timestamp_diff('unit', 'col_start', 'col_end')

Gets the difference between the timestamps in the specified units by truncating the fraction part.


First, convert strings into timestamps. Then, calculate differences.

from pyspark.sql import functions as F
df = spark.createDataFrame(
    [('2026-01-01T01:01:01.721', '2025-01-01T01:01:01.323')],
    ['EndDateTime', 'StartDateTime'])

df = df.withColumns({
    c: F.to_timestamp(c, "yyyy-MM-dd'T'HH:mm:ss.SSS")
    for c in ['EndDateTime', 'StartDateTime']
})

df = df.withColumns({
    'diff_year': F.timestamp_diff('year', 'StartDateTime', 'EndDateTime'),
    'diff_quarter': F.timestamp_diff('quarter', 'StartDateTime', 'EndDateTime'),
    'diff_month': F.timestamp_diff('month', 'StartDateTime', 'EndDateTime'),
    'diff_week': F.timestamp_diff('week', 'StartDateTime', 'EndDateTime'),
    'diff_day': F.timestamp_diff('day', 'StartDateTime', 'EndDateTime'),
    'diff_hour': F.timestamp_diff('hour', 'StartDateTime', 'EndDateTime'),
    'diff_minute': F.timestamp_diff('minute', 'StartDateTime', 'EndDateTime'),
    'diff_second': F.timestamp_diff('second', 'StartDateTime', 'EndDateTime'),
    'diff_millisecond': F.timestamp_diff('millisecond', 'StartDateTime', 'EndDateTime'),
    'diff_microsecond': F.timestamp_diff('microsecond', 'StartDateTime', 'EndDateTime'),
})

df.show(truncate=0, vertical=True)
-RECORD 0-----------------------------------
 EndDateTime      | 2026-01-01 01:01:01.721 
 StartDateTime    | 2025-01-01 01:01:01.323 
 diff_year        | 1                       
 diff_quarter     | 4                       
 diff_month       | 12                      
 diff_week        | 52                      
 diff_day         | 365                     
 diff_hour        | 8760                    
 diff_minute      | 525600                  
 diff_second      | 31536000                
 diff_millisecond | 31536000398             
 diff_microsecond | 31536000398000          

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.