1

I am trying to execute the following lines of code but in a much larger RDD. Apparently, I get a heap size error when a is very large. How can I make this work? p is usually small.

val p = Array("id1", "id3", "id2");
val a = sc.parallelize(Array(("id1", ("1", "1")), ("id4", ("4", "4")), ("id2", ("2", "2"))));
val f = a.filter(x=> p contains x._1);
println(f.collect().mkString(";"));
4
  • Try broadcasting p. I don't know if this will solve your problem as it seems to be tied to the size of a, but it is good practice in general. Commented Feb 27, 2015 at 16:55
  • nice idea. so, I did val p_b = sc.broadcast(p); val f = a.filter(x=> p_b contains x._1);but unfortunately I get the following error <console>:62: error: value contains is not a member of org.apache.spark.broadcast.Broadcast[Array[String]] val f = a.filter(x=> p_b contains x._1); Commented Feb 27, 2015 at 17:16
  • It should be p_b.value because you have to dereference the broadcast variable. Commented Feb 27, 2015 at 18:19
  • If your usecase if check for contains use a Set which optimizes such operation. Commented Feb 28, 2015 at 12:25

1 Answer 1

2

The problem here is not the filter or the small array, but the attempt to collect a large RDD which will effectively send all data to the driver, probably exhausting the driver's available memory.

What happens to the string afterwards? What's probably needed is another method to store the results of the filter computation.

Another note: if the main usecase of the small dataset is contains, consider using a Set instead of an Array, as contains is amortized O(1) on Sets and O(n) on arrays.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.