I have two Dataframe in pyspark:
d1: (x,y,value) and d2: (k,v, value). The entries in d1 are unique (you can consider the column x alone is a unique, and y alone as a key)
x y value
a b 0.2
c d 0.4
e f 0,8
d2 is the following format:
k v value
a c 0.7
k k 0.3
j h 0.8
e p 0.1
a b 0.1
I need to filter d2 accorning the co-occurence on d1. i.e., a , c 0.7 and e p 0.1 should be deleted as a can occur only with b and similarly for e.
I tried to select from d1 the x and y columns.
sourceList = df1.select("x").collect()
sourceList = [row.x for row in sourceList]
sourceList_b = sc.broadcast(sourceList)
then
check_id_isin = sf.udf(lambda x: x in sourceList , BooleanType())
d2 = d2.where(~d2.k.isin(sourceList_b.value))
for small datasets it works well but for large one, the collect cause an exception. I want to know if there is a better logic to compute this step.