0

In spark 1.6.0(I am not very familiar with spark and scala) when I iterate over a collection and add items to an array when the iteration is over the array seems to be empty.

var testing = unlabeled.map { line =>
  val parts = line.split(',')
  val text = parts(7).split(' ')
  (line, htf.transform(text))
}

var lowPropQueue = new mutable.ArrayBuffer[(String, org.apache.spark.mllib.linalg.Vector)]
var highPropQueue = new mutable.ArrayBuffer[(String, org.apache.spark.mllib.linalg.Vector)]

for(counter <- 1 to 5){

  logger.info("this is the " + counter + " run -----------------")
  for (i <- testing) {
    val label = model.predict(i._2).toString
    //        logger.info(i._1.split(",")(7))
    //        logger.info(label)
    var probs = model.predictProbabilities(i._2)
    logger.info("prob 0 : " + probs(0))
    logger.info("prob 1 : " + probs(1))
    logger.info("--------------------- ")

    if (probs(0).toDouble <= 0.95 && probs(1).toDouble <= 0.95 ) {
      lowPropQueue.+=(i)
    } else {
      highPropQueue.+=((i._1 + "," + label , i._2))
    }

    logger.info("size of high array : " + highPropQueue.length)
    logger.info("size of low array : " + lowPropQueue.length)

  }

  logger.info("passed: " + lowPropQueue.length)
  logger.info("NOT passed: " + highPropQueue.length)

  var xx=  sc.parallelize(highPropQueue).collect()
  var yy = sc.parallelize(lowPropQueue).collect()

  logger.info("passed: " + xx.length)
  logger.info("NOT passed: " + yy.length)
...
}

but based on the logs the inner loop seems to add elements to the Arrays, i.e.:

16/10/11 11:22:31 INFO SelfLearningMNB$: size of high array : 500

16/10/11 11:22:31 INFO SelfLearningMNB$: size of low array : 83

16/10/11 11:22:31 INFO SelfLearningMNB$: prob 0 : 0.37094327822665185

16/10/11 11:22:31 INFO SelfLearningMNB$: prob 1 : 0.6290567217733481

16/10/11 11:22:31 INFO SelfLearningMNB$: ---------------------

16/10/11 11:22:31 INFO SelfLearningMNB$: size of high array : 500

16/10/11 11:22:31 INFO SelfLearningMNB$: size of low array : 84

16/10/11 11:22:31 INFO SelfLearningMNB$: prob 0 : 0.16872929936216619

16/10/11 11:22:31 INFO SelfLearningMNB$: prob 1 : 0.8312707006378338

But when the inner loop ends I get this:

16/10/11 11:43:53 INFO SelfLearningMNB$: passed: 0

16/10/11 11:43:53 INFO SelfLearningMNB$: NOT passed: 0

What is going on ?

EDIT

How can you obtain the data from executors or save data from executors to HDFS so they can be read from master node later?

1 Answer 1

1

TL; DR This cannot work in Spark.

What is going on ?

  • each executor gets its own copy of lowPropQueue, highPropQueue.
  • during iteration local copies are modified
  • after iteration local copies are discarded

FYI Naive append to ArrayBuffer is not thread safe.

Sign up to request clarification or add additional context in comments.

3 Comments

I though about that. But how is it possible to store the data from executors to a "global" array ?
You can try accumulators but you'd need synchronized access and looking at your code it just won't scale.
I did some search and this approach definitely is not appropriate for Spark. I had to map everything but it worked.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.