10

I have two dataframes and I would like to join them based on one column, with a caveat that this column is a timestamp, and that timestamp has to be within a certain offset (5 seconds) in order to join records. More specifically, a record in dates_df with date=1/3/2015:00:00:00 should be joined with events_df with time=1/3/2015:00:00:01 because both timestamps are within 5 seconds from each other.

I'm trying to get this logic working with python spark, and it is extremely painful. How do people do joins like this in spark?

My approach is to add two extra columns to dates_df that will determine the lower_timestamp and upper_timestamp bounds with a 5 second offset, and perform a conditional join. And this is where it fails, more specifically:

joined_df = dates_df.join(events_df, 
    dates_df.lower_timestamp < events_df.time < dates_df.upper_timestamp)

joined_df.explain()

Captures only the last part of the query:

Filter (time#6 < upper_timestamp#4)
 CartesianProduct
 ....

and it gives me a wrong result.

Do I really have to do a full blown cartesian join for each inequality, removing duplicates as I go along?

Here is the full code:

from datetime import datetime, timedelta

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import udf


master = 'local[*]'
app_name = 'stackoverflow_join'

conf = SparkConf().setAppName(app_name).setMaster(master)
sc = SparkContext(conf=conf)

sqlContext = SQLContext(sc)

def lower_range_func(x, offset=5):
    return x - timedelta(seconds=offset)

def upper_range_func(x, offset=5):
    return x + timedelta(seconds=offset)


lower_range = udf(lower_range_func, TimestampType())
upper_range = udf(upper_range_func, TimestampType())

dates_fields = [StructField("name", StringType(), True), StructField("date", TimestampType(), True)]
dates_schema = StructType(dates_fields)

dates = [('day_%s' % x, datetime(year=2015, day=x, month=1)) for x in range(1,5)]
dates_df = sqlContext.createDataFrame(dates, dates_schema)

dates_df.show()

# extend dates_df with time ranges
dates_df = dates_df.withColumn('lower_timestamp', lower_range(dates_df['date'])).\
           withColumn('upper_timestamp', upper_range(dates_df['date']))


event_fields = [StructField("time", TimestampType(), True), StructField("event", StringType(), True)]
event_schema = StructType(event_fields)

events = [(datetime(year=2015, day=3, month=1, second=3), 'meeting')]
events_df = sqlContext.createDataFrame(events, event_schema)

events_df.show()

# finally, join the data
joined_df = dates_df.join(events_df, 
    dates_df.lower_timestamp < events_df.time < dates_df.upper_timestamp)    

joined_df.show()

I get the following output:

+-----+--------------------+
| name|                date|
+-----+--------------------+
|day_1|2015-01-01 00:00:...|
|day_2|2015-01-02 00:00:...|
|day_3|2015-01-03 00:00:...|
|day_4|2015-01-04 00:00:...|
+-----+--------------------+

+--------------------+-------+
|                time|  event|
+--------------------+-------+
|2015-01-03 00:00:...|meeting|
+--------------------+-------+


+-----+--------------------+--------------------+--------------------+--------------------+-------+
| name|                date|     lower_timestamp|     upper_timestamp|                time|  event|
+-----+--------------------+--------------------+--------------------+--------------------+-------+
|day_3|2015-01-03 00:00:...|2015-01-02 23:59:...|2015-01-03 00:00:...|2015-01-03 00:00:...|meeting|
|day_4|2015-01-04 00:00:...|2015-01-03 23:59:...|2015-01-04 00:00:...|2015-01-03 00:00:...|meeting|
+-----+--------------------+--------------------+--------------------+--------------------+-------+
5
  • Spark SQL seems to handle it gracefully. results = sqlContext.sql("SELECT * FROM dates INNER JOIN events ON dates.lower_timestamp < events.time and events.time < dates.upper_timestamp") does the trick. Commented Jun 3, 2015 at 22:06
  • Just a thought: change this dates_df.lower_timestamp < events_df.time < dates_df.upper_timestamp to similar to dates_df.lower_timestamp < events_df.time and events_df.time < dates_df.upper_timestamp in DF too. There is no reason why they should behave differently Commented Jun 3, 2015 at 23:41
  • @ayan: I have tried it this way too, and it does not work in the same way. Commented Jun 3, 2015 at 23:46
  • Looks like a bug then..... Commented Jun 4, 2015 at 0:00
  • I do not know Python but this should be very simple in Scala. You should not even need to create the new columns. I would create s UDF that either adds or subtracts seconds to a Timestamp and reruns then. Then do the join where the one Timestamp is between the results of the two UDF calls. Commented Jun 4, 2015 at 16:39

2 Answers 2

13

I did spark SQL query with explain() to see how it is done, and replicated the same behavior in python. First here is how to do the same with SQL spark:

dates_df.registerTempTable("dates")
events_df.registerTempTable("events")
results = sqlContext.sql("SELECT * FROM dates INNER JOIN events ON dates.lower_timestamp < events.time and  events.time < dates.upper_timestamp")
results.explain()

This works, but the question was about how to do it in python, so the solution seems to be just a plain join, followed by two filters:

joined_df = dates_df.join(events_df).filter(dates_df.lower_timestamp < events_df.time).filter(events_df.time < dates_df.upper_timestamp)

joined_df.explain() yields the same query as sql spark results.explain() so I assume this is how things are done.

Sign up to request clarification or add additional context in comments.

Comments

1

Although a year later, but might help others..

As you said, a full cartesian product is insane in your case. Your matching records will be close in time (5 minutes) so you can take advantage of that and save a lot of time if you first group together records to buckets based on their timestamp, then join the two dataframes on that bucket and only then apply the filter. Using that method causes Spark to use a SortMergeJoin and not a CartesianProduct and greatly boosts performance.

There is a small caveat here - you must match to both the bucket and the next one.

It's better explain in my blog, with working code examples (Scala + Spark 2.0 but you can implement the same in python too...)

http://zachmoshe.com/2016/09/26/efficient-range-joins-with-spark.html

1 Comment

Don't link to a random blog post (even if it is yours), write the example here, and then link to your blog.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.