2

I have two data frames which are not of the same length. The first looks like this

A      | Time_Stop             | B
------ | ----------------------|-----
Green  | 2016-10-01 00:10:15   | 77
Yellow | 2016-10-03 00:11:15   | 80
Blue   | 2016-10-04 00:12:15   | 6 

The second looks like this

D      | Time_Start            | Z
------ | ----------------------|-----
Foo    | 2016-10-01 00:12:15   | 7
Cookie | 2016-10-03 00:45:15   | 99

My goal is to return only the rows from the first data frame which are within a certain time limit (lets say within 5 minutes) thus the output frame should look something like this

A      | Time_Stop             | B
------ | ----------------------|-----
Green  | 2016-10-01 00:10:15   | 77

I am having trouble figuring this out. So far I have tried to do this

from pyspark.sql import functions as F
timeFmt = "yyyy-MM-dd' 'HH:mm:ss"
result = df.where(F.unix_timestamp(df1.Time_Start, format = timeFmt) - F.unix_timestamp(df.Time_Stop, format = timeFmt) <= 300)

This however is not working. How can I go about achieving the result I am looking for?

Edit: I forgot to mention that the time column for both DataFrames is in a string format.

Edit 2: I have tried the following and am receiving errors.

from pyspark.sql.functions import expo
df2 = df2.withColumn("Time_Start", df2["Time_Start"].cast("timestamp"))
df = df.withColumn("Time_Stop", df['Time_Stop'].cast('timestamp'))
condition = df.Time_Stop + expr("INTERVAL 10 MINUTES") <= df2.Time_Start
df.filter(condition).show()

AnalysisException: u'resolved attribute(s) starttime#2251 missing from pickup_time#1964,dropoff_latitude#2090,tip#2180,dropoff_longitude#2072,pickup_latitude#2036,pickup_longitude#2018,payment_type#2108,dropoff_time#2268,mta_tax#2162,trip_distance#2000,fare_amount#2126,toll#2198,rate_code#2054,total#2216,row#1946,surcharge#2144 in operator !Filter (cast(dropoff_time#2268 + interval 10 minutes as timestamp) <= starttime#2251);'

Edit 3: I was able to work through this using my local machine however I don't think my code will translate well when I transfer this to run on the cluster. Here is my code, maybe someone can point out ways to make it run faster or just look cleaner. I am still leaving this question as open.

df = list(df.toLocalIterator())
df1 = list(df1.toLocalIterator())
rand = []
for i in df:
    for j in df1:
        elapsed_time = (i['Time_Start'] - j['Time_Stop']).total_seconds()
        time_limit = 600
        if (abs(elapsed_time) <= time_limit):
            rand.append(j)
rand = list(set(rand))

1 Answer 1

1

Using toLocalIterator() with list() (which works exactly like collect()) and loops will be very inefficient on big datasets (it does not use spark capacilities at all).

Cartesian join seems to be the best solution in this case. Let's call DF with Time_Stop the firstDF and the one with Time_Start: secondDF, both with dates casted to timestamps. Then try the following:

from pyspark.sql import functions as F
interval = F.unix_timestamp(secondDF.Time_Start) - F.unix_timestamp(firstDF.Time_Stop)
firstDF.join(secondDF).where(F.abs(interval) < 300).select('A', 'Time_Stop', 'B')
Sign up to request clarification or add additional context in comments.

2 Comments

Thanks for this. Looks like I was on the right track to begin with. Just as a follow up question. Will this compare each row of the firstDF with every row of the secondDF. They just happen to be ordered in my example when in fact they are actually quite jumbled.
Yes, cartesian joins matches every row from firstDf with every row from secondDf, initial ordering does not matter at all.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.