1

Consider the following code

question = spark.createDataFrame([{'A':1,'B':5},{'A':2,'B':5},
                             {'A':3,'B':5},{'A':3,'B':6}])
#+---+---+
#|  A|  B|
#+---+---+
#|  1|  5|
#|  2|  5|
#|  3|  5|
#|  3|  6|
#+---+---+

How can I create a spark dataframe that looks as follows :

solution = spark.createDataFrame([{'C':1,'D':2},{'C':1,'D':3},
                             {'C':2,'D':3},{'C':5,'D':6}])
#+---+---+
#|  C|  D|
#+---+---+
#|  1|  2|
#|  1|  3|
#|  2|  3|
#|  5|  6|
#+---+---+

This is the notion of triadic closure, where I am connecting the third edge of the triangle based upon which edges are already connected.

I must have (1,2) since (1,5) and (2,5) are present, I must have (1,3) since (1,5) and (3,5) are present, and I must have (2,3) since (2,5) and (3,5) are present. I must have (5,6) since (3,5) and (3,6) are present (an edge in both directions). There should NOT be an additional entry for (5,6) since no two pairs from A map to 6. Since there isn't a second instance in A that maps to 6, (5,6) does not get added.

5
  • so would there have to be another entry for C=5, D=6? Commented Feb 20, 2018 at 22:31
  • If I am getting this question correctly, could you just: 1) append the original dataframe to itself, but with B and A switched.... 2) group by A... 3) flatmap group to all pairwise combinations (i think there are scala functions for this).... 4) map new column to separate C and D columns.... 5) filter duplicates, if required Commented Feb 20, 2018 at 22:38
  • No there should not be an additional entry for C=5, D=6 since no two pairs from A map to 6. I must have (1,2) since (1,5) and (2,5) are present, I must have (1,3) since (1,5) and (3,5) are present, and I must have (2,3) since (2,5) and (3,5) are present. Since there isn't a second instance in A that maps to 6, it does not get added. Does this help clarify? Commented Feb 20, 2018 at 23:59
  • So it only goes one way, from column A to B? Both 5 and 6 are related to 3 in the above example in the opposite direction. Also, could you add the clarification in the comments to the question itself? Commented Feb 21, 2018 at 1:51
  • Yes, very good point. it does need to go both ways. I will include the edit in the original post. Thank you for the feedback Commented Feb 21, 2018 at 2:04

2 Answers 2

1

Try this,

import pyspark.sql.functions as F
from pyspark.sql.types import *
from itertools import combinations

df = spark.createDataFrame([{'A':1,'B':5},{'A':2,'B':5},
                         {'A':3,'B':5},{'A':3,'B':6}])

def pairs(list_):
    if len(set(list_)) > 1:
        return [[int(x[0]),int(x[1])] for x in combinations(set(list_), r=2)]
    else:
        return None

triadic_udf = F.udf(pairs, ArrayType(ArrayType(IntegerType())))
cols = ['C','D']
splits = [F.udf(lambda val:val[0],IntegerType())\
         ,F.udf(lambda val:val[1],IntegerType())]

df1 = df.groupby('B').agg(F.collect_list('A').alias('A'))\
                 .withColumn('pairs',F.explode(triadic_udf(F.col('A'))))\
                 .dropna().select('pairs')

df2 = df.groupby('A').agg(F.collect_list('B').alias('B'))\
                 .withColumn('pairs',F.explode(triadic_udf(F.col('B'))))\
                 .dropna().select('pairs')

solution = df1.union(df2).select([s('pairs').alias(c) for s,c in zip(splits,cols)])

solution.show()
Sign up to request clarification or add additional context in comments.

6 Comments

If the answer is correct, you can check it as correct and upvote to help others find it easily ;)
I just made an account and my reputation score is less than 15 so it will not change the publicly displayed score. Will do once by reputation score goes up though!
I think this solution is pretty much good, except that imagine instead of the (3,6) pair, it was actually (6,3). I don't think it'll find that (5,6) pair.
@SiddSingal i think if that is the case, then (5,6) pair wont exist at all since there will not be any common element between the two in any given column. As per question i think we are looking at common pairs which have same element mapped to the other column.
@mayankagrawal seems that in a triadic closure, three different pairs make up the closure...so if we have (1,5) and (2,5), then we need a pair (1,2), and the three pairs (1,5), (2,5), and (1,2) make up a triadic closure..which also means that if we had just (2,5) and (1,2), we should be able to come up with (1,5).
|
0
val df = sc.parallelize(Array((1,5),(2,5),(3,5),(3,6),(1,7),(2,7))).toDF("A","B")
df.union(df.select("B","A"))
  .groupByKey(r => r.getInt(0))
  .flatMapGroups({
    (K,Vs) => Vs.map(_.getInt(1)).toArray.combinations(2).map(a => (a(0), a(1)))
  })
  .dropDuplicates
  .show

This is in Scala, not Python, but should be easy to convert. I included some extra data points to show why dropDuplicates is necessary. I basically just followed exactly the steps I wrote above in a comment:
1) append the original dataframe to itself, but with B and A switched
2) group by A
3) flatmap group to all pairwise combinations (i think there are scala functions for this)
4) map new column to separate C and D columns (i didn't actually do this)
5) filter duplicates, if required

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.