1

Suppose a dataframe with two columns: C1 and C2

+---+-----+
|C1 | C2  |
+---+-----+
|A  |  B  |
|C  |  D  |
|A  |  E  |
|E  |  F  |
+---+-----+

My goal is: collect into in array intersections

+--------------+
| intersections|
+--------------+
|[A, B, E, F]  |
|[C, D]        |
+--------------+

How it can be done good if the dataframe has the large number of rows (~ 1 billion)

2
  • 1
    this problem is really best solved using a network graph approach. Load the data into a graph where the distinct values from both columns are the nodes and the pairs between columns are edges. Then first test if your graph is completely connected - this would mean that every value intersects with every other value, in that case you don't have to proceed. If the graph is not fully connected, then calculate clusters (communities) and the nodes in each cluster will represent your intersections Commented Sep 16, 2021 at 20:06
  • 3
    Please check this question. You could use a similar approach Commented Sep 16, 2021 at 20:08

1 Answer 1

0

Solution is GraphFrame library (https://graphframes.github.io/graphframes/docs/_site/index.html)

DISCLAIMER: tested with Spark 2.4.4 and GraphFrame 0.7.0

import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.expressions.Window

import org.apache.spark.storage.StorageLevel

import scala.collection._

import org.graphframes.GraphFrame

object SparkApp extends App {

val appName = "appName"
val master = "local[*]"
  
val spark = SparkSession
  .builder
  .appName(appName)
  .master(master)
  .getOrCreate
 
import spark.implicits._

val dataTest =
      Seq(
        ("A", "B"),
        ("C", "D"),
        ("A", "E"),
        ("E", "F")
      ).toDF("C1", "C2")

// it's mandatory for GraphFrame
spark.sparkContext.setCheckpointDir("/some/path/hdfs/test_checkpoints")

// dataframe to list of vertices and connections list
val graphTest: GraphFrame = 
GraphFrame(
    dataTest.select('C1 as "id").union(dataTest.select('C2 as "id")).distinct, 
    dataTest.withColumnRenamed("C1", "src").withColumnRenamed("C2","dst")
    )

val graphComponentsTest = graphTest.connectedComponents.run()

val clustersResultTestDF = 
graphComponentsTest
  .groupBy("component")
  .agg(collect_list("id") as "intersections")


clustersResultTestDF.show
}

output is

+--------------+
| intersections|
+--------------+
|[A, B, E, F]  |
|[C, D]        |
+--------------+
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.