0

I am using graphframes to represent a graph in pyspark from a similar dataframe:

data = [
    ("1990", "1995"),
    ("1980", "1996"),
    ("1993", "1994"),
    ("1990", "2002"),
    ("1996", "2002"),
    ("1999", "2008"),
    ("2003", "2014"),
]

Nodes are represented as a union of date strings, ie:

nodes = [
    ("1990_1995"),
    ("1980_1996"),
    ("1993_1994"),
    ("1990_2002"),
    ("1996_2002"),
    ("1999_2008"),
    ("2003_2014"),
]

My problem is that I know how to define the edges. In theory I know how to do it, but in the implementation I have some difficulties.

The edge definition is based on this rule: if a date range (e.g. "1990-12-10" - "1995-12-10") overlaps with another date range (e.g. "1980-12-10" - "1996-12-10"), then the two date ranges represent an edge. How can I implement the creation of edge according to this rule in pyspark graphframes?

The edges should be:

node_and_edges = [
    ("1990_1995", ["1980_1996", "1993_1994", "1990_2002"]),
    ("1980_1996", ["1990_1995", "1993_1994", "1990_2002"]),
    ("1993_1994", ["1990_1995", "1980_1996", "1990_2002"]),
    ("1990_2002", ["1990_1995", "1980_1996", "1993_1994", "1996_2002", "1999_2008"]),
    ("1996_2002", ["1993_1994", "1990_2002", "1999_2008"]),
    ("1999_2008", ["1990_2002", "1996_2002", "2003_2014"]),
    ("2003_2014", ["1999_2008"]),
]

Code below:

from pyspark.sql import functions as F
from pyspark.sql import SparkSession as ss
spark = ss.builder.appName("test").getOrCreate()

data = [
    ("1990", "1995"),
    ("1980", "1996"),
    ("1993", "1994"),
    ("1990", "2002"),
    ("1996", "2002"),
    ("1999", "2008"),
    ("2003", "2014"),
]

rdd = spark.sparkContext.parallelize(data)

columns = ["start_date", "end_date"]

df = rdd.toDF(columns)

# Creating nodes
df = df.withColumn("node", F.concat_ws("_", F.col("start_date"), F.col("end_date")))

# Creating edges
# how?

df.printSchema()
df.show()

Execution output:

root
 |-- start_date: string (nullable = true)
 |-- end_date: string (nullable = true)
 |-- node: string (nullable = false)

+----------+--------+---------+
|start_date|end_date|     node|
+----------+--------+---------+
|      1990|    1995|1990_1995|
|      1980|    1996|1980_1996|
|      1993|    1994|1993_1994|
|      1990|    2002|1990_2002|
|      1996|    2002|1996_2002|
|      1999|    2008|1999_2008|
|      2003|    2014|2003_2014|
+----------+--------+---------+

Overlap edges rule:

enter image description here

1 Answer 1

0

How about a range join?

It involves a cartesian multipication under the hood which is not fun and might not work for too large of a dataset but you can optimize it, at least in Databricks runtime (not sure if available outside it) with the range_join hint.

from pyspark.sql.functions import col

data = [
    ("1990", "1995"),
    ("1980", "1996"),
    ("1993", "1994"),
    ("1990", "2002"),
    ("1996", "2002"),
    ("1999", "2008"),
    ("2003", "2014"),
]

df = spark.createDataFrame(data, schema="start string, end string") \
.withColumn("start", col("start").cast("integer")) \
.withColumn("end", col("end").cast("integer"))

df2 = df.toDF('start2', 'end2')

df \
.join(df2, on=col('start2').between(col('start'), col('end'))) \
.filter((col("start") != col("start2")) | (col("end") != col("end2"))) \
.show()


+-----+----+------+----+
|start|end |start2|end2|
+-----+----+------+----+
|1990 |1995|1993  |1994|
|1980 |1996|1990  |1995|
|1980 |1996|1993  |1994|
|1990 |1995|1990  |2002|
|1980 |1996|1990  |2002|
|1980 |1996|1996  |2002|
|1990 |2002|1990  |1995|
|1990 |2002|1993  |1994|
|1990 |2002|1996  |2002|
|1990 |2002|1999  |2008|
|1996 |2002|1999  |2008|
|1999 |2008|2003  |2014|
+-----+----+------+----+

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.