1

I have a spark data frame which looks like below:

+--+-----+---------+
|id|phone|  address|
+--+-----+---------+
| 0|  123| james st|
| 1|  177|avenue st|
| 2|  123|spring st|
| 3|  999|avenue st|
| 4|  678|  5th ave|
+--+-----+---------+

I am trying to use graphframes package to identify the connected component of ids using phone and address from above spark data frame. So this data frame can be treated as vertices data frame of the graph.

I am wondering what would be the optimal approach creating the edges data frame of the graph to feed into the connectedComponents() function in graphframes?

Ideally, the edges data frame should look like below:

+---+---+------------+
|src|dst|relationship|
+---+---+------------+
| 0 |  2|  same_phone|
| 1 |  3|same_address|
+---+---+------------+

Finally, the connectedComponents() results should be like below. id 0 & 1 are in the same component based on the same_phone relationship and 1 & 3 based on the same_address relationship. Then, this would leave 4 as another component which has no connection with other ids.

+---+-------------------+
|id |connected_component|
+---+-------------------+
|0  |1                  |
|1  |2                  |
|2  |1                  |
|3  |2                  |
|4  |3                  |
+---+-------------------+

Thanks in advance!

5
  • 1
    what if there is one more row with id=5, phone=123 and address=avenue st, what would be the desired connected_component? Commented Dec 29, 2020 at 2:22
  • 1
    here's a nice example of explaining how to find connected components of the graph : towardsdatascience.com/… Commented Dec 29, 2020 at 6:13
  • Hi @jxc, in that case, id (0, 1, 2, 3, 5) would be in the same component. It leaves id 4 in a separated component. Commented Dec 29, 2020 at 14:34
  • @user238607. Yes this is a good reference. I looked into it and a few other ones. They are all creating edge list by manually creating an edge data frame. However, this won't work in my case as my vertices data have a few million records. Commented Dec 29, 2020 at 14:37
  • The connected component IDs are totally unrelated to the edge IDs. They start from 0 and count up. Commented May 20, 2022 at 1:45

1 Answer 1

3
from functools import reduce

edges = reduce(
    lambda x, y: x.union(y),
    [df.alias('t1')
       .join(df.alias('t2'), c)
       .filter('t1.id < t2.id')
       .selectExpr('t1.id src', 't2.id dst', "'same_%s' relationship"% c) for c in df.columns[1:]
    ]
)

edges.show()
+---+---+------------+
|src|dst|relationship|
+---+---+------------+
|  0|  2|  same_phone|
|  1|  3|same_address|
+---+---+------------+
import pyspark.sql.functions as F
from pyspark.sql.window import Window

connect = edges.select(
    F.array_sort(F.array('src', 'dst')).alias('arr')
).distinct().union(
    df.join(edges, (df.id == edges.src) | (df.id == edges.dst), 'anti').select(F.array('id'))
).withColumn(
    'connected_component', 
    F.row_number().over(Window.orderBy('arr'))
).select(F.explode('arr').alias('id'), 'connected_component')

connect.show()
+---+-------------------+
| id|connected_component|
+---+-------------------+
|  0|                  1|
|  2|                  1|
|  1|                  2|
|  3|                  2|
|  4|                  3|
+---+-------------------+
Sign up to request clarification or add additional context in comments.

2 Comments

this one requires graphframe.connectedComponents as the OP mentioned, from his another post, it should be an easy fix now for this task.
Thank you both for answering my questions. Both this and the self join solutions we discussed in another post work for my task!

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.