4

I have created a graph in Spark GraphX using the following codes. (See my question and solution)

import scala.math.random
import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import scala.util.Random
import org.apache.spark.HashPartitioner

object SparkER {

  val nPartitions: Integer = 4
  val n: Long = 100
  val p: Double = 0.1

  def genNodeIds(nPartitions: Int, n: Long)(i: Int) = {
    (0L until n).filter(_ % nPartitions == i).toIterator
  }

  def genEdgesForId(p: Double, n: Long, random: Random)(i: Long) = {
    (i + 1 until n).filter(_ => random.nextDouble < p).map(j => Edge(i, j, ()))
  }

  def genEdgesForPartition(iter: Iterator[Long]) = {
    val random = new Random(new java.security.SecureRandom())
    iter.flatMap(genEdgesForId(p, n, random))
  }

  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Spark ER").setMaster("local[4]")
    val sc = new SparkContext(conf)

    val empty = sc.parallelize(Seq.empty[Int], nPartitions)
    val ids = empty.mapPartitionsWithIndex((i, _) => genNodeIds(nPartitions, n)(i))

    val edges = ids.mapPartitions(genEdgesForPartition)
    val vertices: VertexRDD[Unit] = VertexRDD(ids.map((_, ())))

    val graph = Graph(vertices, edges)

    val cc = graph.connectedComponents().vertices //Throwing Exceptions

    println("Stopping Spark Context")
    sc.stop()
  }
}

Now, I can access the graph and see the degrees of the nodes. But when I try to get some measures, such as Connected components, I am getting the following exceptions.

15/12/22 12:12:57 ERROR Executor: Exception in task 3.0 in stage 6.0 (TID 19)
java.lang.ArrayIndexOutOfBoundsException: -1
    at org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap$mcJI$sp.apply$mcJI$sp(GraphXPrimitiveKeyOpenHashMap.scala:64)
    at org.apache.spark.graphx.impl.EdgePartition.updateVertices(EdgePartition.scala:91)
    at org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:75)
    at org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:73)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.insertAll(BypassMergeSortShuffleWriter.java:99)
    at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:88)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
15/12/22 12:12:57 ERROR Executor: Exception in task 1.0 in stage 6.0 (TID 17)
java.lang.ArrayIndexOutOfBoundsException: -1
    at org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap$mcJI$sp.apply$mcJI$sp(GraphXPrimitiveKeyOpenHashMap.scala:64)
    at org.apache.spark.graphx.impl.EdgePartition.updateVertices(EdgePartition.scala:91)
    at org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:75)
    at org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:73)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.insertAll(BypassMergeSortShuffleWriter.java:99)
    at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:88)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)

Why am I nable to perform these operations on the generated graph using GraphX?

1 Answer 1

2

I found that, if I do the following the exception does not occur.

val graph = Graph(vertices, edges).partitionBy(PartitionStrategy.RandomVertexCut)

Apparently, some GraphX algorithms require the repartitioning. But the purpose is not entirely clear to me.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.