1

I have a list of directed edges which represent a tree. 'u v' means u is a child of v.

sc = SparkContext(conf = conf)
lines = sc.textFile("/content/sample_data/data.txt")
lines.take(10)
['0 0', '1 0', '2 0', '3 1', '4 1', '5 2', '6 2', '7 3', '8 3', '9 4']

I converted the above to the following form stored as intermediate:

[(0, ('out', 0)),
 (0, ('in', 0)),
 (1, ('out', 0)),
 (0, ('in', 1)),...]

I am trying to build an adjacency list of the form from the above:

[(8721, [('out', 4360), ('in', 17443), ('in', 17444)]),
 (9291, [('out', 4645), ('in', 18583), ('in', 18584)]),
 (9345, [('out', 4672), ('in', 18691), ('in', 18692)]),..]

Here, First row tells that 8721 is a child of 4360 and [17443, 17444] are children of 8721

I am using groupByKey or reduceByKey methods exposed by the Spark module.

intermediate.groupByKey().mapValues(list)

Above line is taking a lot of time. It is taking almost 250 seconds for 100 MB of test data on 8-core machine with 12 GB RAM. I have to eventually deploy it for >15GB data on distributed environment.

I understand that groupByKey causes shuffling of data across all nodes. Is there any way to avoid it in my case ? Any suggestions on how to optimise this operation is appreciated.

1 Answer 1

2

You can't avoid shuffle as you're grouping rows of your dataset. However, you can use dataframe API instead of RDDs. Dataframe API is more performant than RDDs, see this answer

If your txt file is in a form as follows:

0 0
0 1
...

Then you can read it as a dataframe.

df = spark.read.csv('test.txt', sep=' ')
df.show()

+---+---+
|_c0|_c1|
+---+---+
|  0|  0|
|  1|  0|
|  2|  0|
|  3|  1|
|  4|  1|
|  5|  2|
|  6|  2|
|  7|  3|
|  8|  3|
|  9|  4|
+---+---+

Do cross join or unionAll by attaching the type column:

df2 = df.withColumn('_c2', f.lit('in')).unionAll(df.select('_c1', '_c0').withColumn('_c2', f.lit('out')))
df2.show()

+---+---+---+
|_c0|_c1|_c2|
+---+---+---+
|  0|  0| in|
|  1|  0| in|
|  2|  0| in|
|  3|  1| in|
|  4|  1| in|
|  5|  2| in|
|  6|  2| in|
|  7|  3| in|
|  8|  3| in|
|  9|  4| in|
|  0|  0|out|
|  0|  1|out|
|  0|  2|out|
|  1|  3|out|
|  1|  4|out|
|  2|  5|out|
|  2|  6|out|
|  3|  7|out|
|  3|  8|out|
|  4|  9|out|
+---+---+---+

and groupBy the result.

df3 = df2.groupBy('_c0').agg(f.collect_list(f.array('_c2','_c1'))).toDF('edge', 'list')
df3.show(truncate=False)
df3.printSchema()

+----+---------------------------------------+
|edge|list                                   |
+----+---------------------------------------+
|7   |[[in, 3]]                              |
|3   |[[in, 1], [out, 7], [out, 8]]          |
|8   |[[in, 3]]                              |
|0   |[[in, 0], [out, 0], [out, 1], [out, 2]]|
|5   |[[in, 2]]                              |
|6   |[[in, 2]]                              |
|9   |[[in, 4]]                              |
|1   |[[in, 0], [out, 3], [out, 4]]          |
|4   |[[in, 1], [out, 9]]                    |
|2   |[[in, 0], [out, 5], [out, 6]]          |
+----+---------------------------------------+

root
 |-- edge: string (nullable = true)
 |-- list: array (nullable = false)
 |    |-- element: array (containsNull = false)
 |    |    |-- element: string (containsNull = true)
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.