1

I have two Dataframe in pyspark:

d1: (x,y,value) and d2: (k,v, value). The entries in d1 are unique (you can consider the column x alone is a unique, and y alone as a key)

x y value
a b 0.2
c d 0.4
e f 0,8

d2 is the following format:

k v value
a c 0.7
k k 0.3
j h 0.8
e p 0.1
a b 0.1

I need to filter d2 accorning the co-occurence on d1. i.e., a , c 0.7 and e p 0.1 should be deleted as a can occur only with b and similarly for e.

I tried to select from d1 the x and y columns.

sourceList = df1.select("x").collect()
sourceList = [row.x for row in sourceList]

sourceList_b = sc.broadcast(sourceList)

then

check_id_isin = sf.udf(lambda x: x in sourceList , BooleanType())
d2 = d2.where(~d2.k.isin(sourceList_b.value))

for small datasets it works well but for large one, the collect cause an exception. I want to know if there is a better logic to compute this step.

3 Answers 3

2

One way could be to join d1 to d2, then fill the missing value in the column y from the column v using coalesce, then filter the row where y and v are different such as:

import pyspark.sql.functions as F

(d2.join( d1.select('x','y').withColumnRenamed('x','k'), #rename x to k for easier join
          on=['k'], how='left') #join left to keep only d2 rows
   .withColumn('y', F.coalesce('y', 'v')) #fill the value missing in y with the one from v
   .filter((F.col('v') == F.col('y'))) #keep only where the value in v are equal to y
   .drop('y').show()) #drop the column y not necessary

and you get:

+---+---+-----+
|  k|  v|value|
+---+---+-----+
|  k|  k|  0.3|
|  j|  h|  0.8|
+---+---+-----+

and should keep also any rows where both values in couple (x,y) are in (k,v)

Sign up to request clarification or add additional context in comments.

Comments

2

So you have two problems here:

  1. Logic for joining these two tables:

This can be done by performing an inner join on two columns instead of one. This is the code for that:

# Create an expression wherein you do an inner join on two cols
joinExpr = ((d1.x = d2.k) & (d1.y == d2.y))
joinDF = d1.join(d2, joinExpr)
  1. The second problem is speed. There are multiple ways of fixing it. Here are my top two:

a. If one of the dataframes is significantly smaller (usually under 2 GB) than the other dataframe, then you can use the broadcast join. It essentially copies the smaller dataframe to all the workers so that there is no need to shuffle while joining. Here is an example:

from pyspark.sql.functions import broadcast

joinExpr = ((d1.x = d2.k) & (d1.y == d2.y))
joinDF = d1.join(broadcast(d2), joinExpr)

b. Try adding more workers and increasing the memory.

Comments

1

What you probably want todo is think of this in relational terms. Join d1 and d2 on d1.x = d2.k AND d1.y = d2.kv. An inner join will drop any records from D2 that don't have a corresponding pair in d1. By join a join spark will do a cluster wide shuffle of the data allowing for much greater parallelism and scalability compared to a broadcast exchange which general caps out at about ~10mb of data (which is what spark uses as the cut over point between a shuffle join and a broadcast join.

Also as in FYI WHERE (a,b) IS IN (...) gets translated into a join in most cases unless the (...) is a small set of data.

https://github.com/vaquarkhan/vaquarkhan/wiki/Apache-Spark--Shuffle-hash-join-vs--Broadcast-hash-join

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.