I have two data frames which are not of the same length. The first looks like this
A | Time_Stop | B
------ | ----------------------|-----
Green | 2016-10-01 00:10:15 | 77
Yellow | 2016-10-03 00:11:15 | 80
Blue | 2016-10-04 00:12:15 | 6
The second looks like this
D | Time_Start | Z
------ | ----------------------|-----
Foo | 2016-10-01 00:12:15 | 7
Cookie | 2016-10-03 00:45:15 | 99
My goal is to return only the rows from the first data frame which are within a certain time limit (lets say within 5 minutes) thus the output frame should look something like this
A | Time_Stop | B
------ | ----------------------|-----
Green | 2016-10-01 00:10:15 | 77
I am having trouble figuring this out. So far I have tried to do this
from pyspark.sql import functions as F
timeFmt = "yyyy-MM-dd' 'HH:mm:ss"
result = df.where(F.unix_timestamp(df1.Time_Start, format = timeFmt) - F.unix_timestamp(df.Time_Stop, format = timeFmt) <= 300)
This however is not working. How can I go about achieving the result I am looking for?
Edit: I forgot to mention that the time column for both DataFrames is in a string format.
Edit 2: I have tried the following and am receiving errors.
from pyspark.sql.functions import expo
df2 = df2.withColumn("Time_Start", df2["Time_Start"].cast("timestamp"))
df = df.withColumn("Time_Stop", df['Time_Stop'].cast('timestamp'))
condition = df.Time_Stop + expr("INTERVAL 10 MINUTES") <= df2.Time_Start
df.filter(condition).show()
AnalysisException: u'resolved attribute(s) starttime#2251 missing from pickup_time#1964,dropoff_latitude#2090,tip#2180,dropoff_longitude#2072,pickup_latitude#2036,pickup_longitude#2018,payment_type#2108,dropoff_time#2268,mta_tax#2162,trip_distance#2000,fare_amount#2126,toll#2198,rate_code#2054,total#2216,row#1946,surcharge#2144 in operator !Filter (cast(dropoff_time#2268 + interval 10 minutes as timestamp) <= starttime#2251);'
Edit 3: I was able to work through this using my local machine however I don't think my code will translate well when I transfer this to run on the cluster. Here is my code, maybe someone can point out ways to make it run faster or just look cleaner. I am still leaving this question as open.
df = list(df.toLocalIterator())
df1 = list(df1.toLocalIterator())
rand = []
for i in df:
for j in df1:
elapsed_time = (i['Time_Start'] - j['Time_Stop']).total_seconds()
time_limit = 600
if (abs(elapsed_time) <= time_limit):
rand.append(j)
rand = list(set(rand))