1

I am doing a Spark project. In the following code, I have a string which I use to collect my results in order to write to a file later on (I know this is not the correct way, I am just checking what is inside a Tuple3 returned by a method). The string gets truncated after a for each loop. Here is the relevant part of my code:

val newLine = sys.props("line.separator") // also tried "\n". I am using OS X.

var str = s"*** ${newLine}"

for (tuple3 <- ArrayOfTuple3s) {
  for (list <- tuple3._3) {
    for (strItem <- list) {
      str += s"${strItem}, "
    }
    str += s"${newLine}"
  }
  str += s"${newLine}"
  println(tempStr)
}

print("str=" + str)

The first println method call prints the correct value of the string (the concatenated result), but when the loop ends, the value of str is *** (the same value assigned to it before the first loop).

Edit: I replaced the str immutable String object with a StringBuilder, but no change in the result:

val newLine: String = sys.props("line.separator")

var str1: StringBuilder = new StringBuilder(15000)

for (tuple3 <- ArrayOfTuple3s) {
  for (list <- tuple3._3) {
    for (str <- list) {
      str1.append(s"${str}, ")
    }
    str1.append(s"${newLine}")
  }
  str1.append(s"${newLine}")
  println(str1.toString())
}

print("resulting str1=" + str1.toString())

Edit 2: I mapped the RDD to take the Tuple3's third field directly. This field itself is an RDD of Arrays of Lists. I changed the code accordingly, but I am still getting the same result (the resulting string is empty, although inside the for loop it is not).

val rddOfArraysOfLists = getArrayOfTuple3s(mainRdd).map(_._3)

for (arrayOfLists <- rddOfArraysOfLists) {
  for (list <- arrayOfLists) {
    for (field <- list) {
      str1.append(s"${field}, ")
    }
    str1.append(" -- ")
  }
  str1.append(s"${newLine}")
  println(str1.toString())
}

Edit 4: I think the problem is not with strings at all. There seems to be a problem with all types of variables.

var count = 0

for (arrayOfLists <- myArray) {
  count = arrayOfLists.last(3).toInt
  println(s"count=$count")
}

println(s"count=$count")

The value is non-zero inside the loop, but it is 0 outside the loop. Any idea?

Edit 5: I cannot publish the whole code (due to confidentiality restrictions), but here is the major part of it. If it matters, I am running Spark on my local machine in Intellij Idea (for debugging).

System.setProperty("spark.cores.max", "8")
System.setProperty("spark.executor.memory", "15g")    
val sc = new SparkContext("local", getClass.getName)            
val samReg = sc.objectFile[Sample](sampleLocation, 200).distinct

val samples = samReg.filter(f => f.uuid == "dce03545e8034242").sortBy(_.time).cache()

val top3Samples = samples.take(3)
for (sample <- top3Samples) {
  print("sample: ")
  println(s"uuid=${sample.uuid}, time=${sample.time}, model=${sample.model}")
}

val firstTimeStamp = samples.first.time
val targetTime = firstTimeStamp + 2592000 // + 1 month in seconds (samples during the first month)

val rddOfArrayOfSamples = getCountsRdd(samples.filter(_.time <= targetTime)).map(_._1).cache()
// Due to confidentiality matters, I cannot reveal the code, 
// but here is a description:
// I have an array of samples. Each sample has a few String fields 
// and is represented by a List[String]
// The above RDD is of the type RDD[Array[List[String]]]. 
// It contains only a single array of samples
// (because I passed a filtered set of samples to the function), 
// but it may contain more.
// The fourth field of each sample (list) is an increasing number (count)

println(s"number of arrays in the RDD: ${rddOfArrayOfSamples.count()}")

var maxCount = 0
for (arrayOfLists <- rddOfArrayOfSamples) {
  println(s"Last item of the array (a list)=${arrayOfLists.last}")
  maxCount = arrayOfLists.last(3).toInt
  println(s"maxCount=${maxCount}")
}
println(s"maxCount=${maxCount}")

The output:

sample: uuid=dce03545e8034242, time=1360037324, model=Nexus 4

sample: uuid=dce03545e8034242, time=1360037424, model=Nexus 4

sample: uuid=dce03545e8034242, time=1360037544, model=Nexus 4

number of arrays in the RDD: 1

Last item of the array (a list)=List(dce03545e8034242, Nexus 4, 1362628767, 32, 2089, 0.97, 0.15999999999999992, 0)

maxCount=32

maxCount=0

13
  • The first cannot be the code you actually run (as it mentions tempStr in the println). Can you post a complete example? Most likely you have two things named str1. Commented Jan 16, 2015 at 7:53
  • Also, I think your loops could be replaced with ArrayOfTuples..map(t => t._3.mkString("", ", ", newLine)).mkString("", newLine, "") Commented Jan 16, 2015 at 8:11
  • Thank you @Paul. The naming is correct in my main code, I did a partial rename when I was creating this post (to make the name shorter and less distractive), but in my main code, all of the names are identical (tempStr). Commented Jan 16, 2015 at 22:27
  • Regarding the mkstring, I think I cannot use it, because my RDD is not flat. See the code in my last edit. The third object of the tuple itself is an RDD (list) of arrays of lists. Commented Jan 16, 2015 at 22:53
  • 1
    So I don't really know the architecture of Spark, but what I think is happening is: in general the map (that the for translates into) can execute in multiple nodes. So there can't really be one maxCount that's shared by all executions of the body of the for/map. So the maxCount you're modifying isn't the same maxCount you're declaring. Imagine if there were many lines in your rddOfArrayOfSamples, and the map/for was distributed across nodes, and for every row, a single/shared maxCount was updated - it would be fairly random which one got updated. Commented Jan 19, 2015 at 16:02

2 Answers 2

1

Uprating my explanation in a comment to an answer:

See this answer to a somewhat-related question:

Not to get into too many details, but when you run different transformations on a RDD (map, flatMap, filter and others), your transformation code (closure) is:

serialized on the driver node,
shipped to the appropriate nodes in the cluster,
deserialized,
and finally executed on the nodes

The for in your code is just syntactic sugar for a map.

Because of this, the maxCount that each execution updates is not the same maxCount in your invoking program. That one never changes.

The lesson here is don't use closures (blocks) that update vars outside the block

Sign up to request clarification or add additional context in comments.

5 Comments

I am wondering why this is happening even when I am running Spark on a single node (my local machine). Probably even when you run Spark on a single machine, there are two distinct threads/processes: a master process and a worker process, and probably they use different segments of the memory and they don't share memory?
By the way, then what approach do you suggest for handling this problem?
Of course one approach (or more accurately, a hack) that I used was to write the value I am fetching into a file, inside the closure/block.
If your use case is to calculate the max, map the RDD to just the counts, then use reduce. Or call .max(), passing in an appropriate Ordering.
Wonderful! I also found this useful: spark.apache.org/docs/1.2.0/…
0

Since you didn't post a complete example I had to arbitrate some portion of the code.

For your 4th edit I made:

val myArray = Array(
  List(List(0, 0, 0, 0), List(0, 0, 0, 0), List(0, 0, 0, 0)),
  List(List(1, 1, 1, 1), List(1, 1, 1, 1), List(1, 1, 1, 1)),
  List(List(2, 2, 2, 2), List(2, 2, 2, 2), List(2, 2, 2, 2))
)

Running in the REPL:

var count = 0

for (arrayOfLists <- myArray) {
  count = arrayOfLists.last(3).toInt
  println(s"count=$count")
}

println(s"count=$count")

I get:

scala> for (arrayOfLists <- myArray) {
     |   count = arrayOfLists.last(3).toInt
     |   println(s"count=$count")
     | }
count=0
count=1
count=2

scala> println(s"count=$count")
count=2

The value is non-zero inside the loop and non-zero outside the loop.

If you please post a complete example, maybe we can help you more.

1 Comment

Thank you for your answer, but I still have the same problem. Please see my last edit.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.