2

I don't understand why spliting a Stream[String] produces a GC overhead limit exceeded depending on whether str in Stream[String].flatMap{string => str.split(" ")} is invariant or randomly emitted.

When str is invariant, no overhead happens instead it will in the random case.

I am not referencing objects in the looping blocks.

I use def to declare Streams in order to produce non-accumulating Streams.

Thanks for insights.

Here's my code:

import scala.util.Random

object DataOps{
  val randomGen:Random = new Random()
  def randomText:String = (0 to 300).map(x => randomGen.nextString(10)).mkString(" ")
  val text:String = Array.fill(300)(randomGen.nextString(10)).mkString(" ")

  //return a stream of strind using the same 'txt:String'
  def infiniteInvariantDataStream(cnt:Int): Stream[String] = { 
    if (cnt>0) text#::infiniteInvariantDataStream(cnt-1)
    else Stream[String]()
  }

  //return a Stream of random string
  def infiniteDataStream(cnt:Int):Stream[String] = { 
    if (cnt>0) randomText#::infiniteDataStream(cnt-1)
    else Stream[String]()
  }
}

object BasicOps{
  def dummyStringStreamSplit(datastream: Stream[String]) = { 
      datastream
        .flatMap(txt => txt.split(" ")) 
        .foreach(word => word)
  }
}

object scalaOverflow extends App{

  val n_lines:Int = 1000000

  println("splitting looping over invariant text")
  def datastream1:Stream[String] = DataOps.infiniteInvariantDataStream(n_lines)
  BasicOps.dummyStringStreamSplit(datastream1)
  println("INVARIANT LINE SPLIT OK: no heap overflow")

  println("splitting looping over random text")
  def datastream3:Stream[String] = DataOps.infiniteDataStream(n_lines)
  BasicOps.dummyStringStreamSplit(datastream3)
  println("RANDOM LINE SPLIT OK: no heap overflow")

}

and here 's the error :

splitting looping over invariant text
INVARIANT LINE SPLIT OK: no heap overflow
splitting looping over random text
java.lang.OutOfMemoryError: GC overhead limit exceeded
        at java.lang.String.valueOf(String.java:2840)
        at java.lang.Character.toString(Character.java:2136)
        at java.lang.String.valueOf(String.java:2826)
        at scala.collection.mutable.StringBuilder.append(StringBuilder.scala:198)
        at scala.collection.TraversableOnce$$anonfun$addString$1.apply(TraversableOnce.scala:350)
        at scala.collection.immutable.List.foreach(List.scala:383)
        at scala.collection.TraversableOnce$class.addString(TraversableOnce.scala:343)
        at scala.collection.AbstractTraversable.addString(Traversable.scala:104)
        at scala.collection.TraversableOnce$class.mkString(TraversableOnce.scala:309)
        at scala.collection.AbstractTraversable.mkString(Traversable.scala:104)
        at scala.collection.TraversableOnce$class.mkString(TraversableOnce.scala:311)
        at scala.collection.AbstractTraversable.mkString(Traversable.scala:104)
        at scala.collection.TraversableOnce$class.mkString(TraversableOnce.scala:313)
        at scala.collection.AbstractTraversable.mkString(Traversable.scala:104)
        at scala.util.Random.nextString(Random.scala:89)
        at DataOps$$anonfun$randomText$1.apply(scalaOverflow.scala:5)
        at DataOps$$anonfun$randomText$1.apply(scalaOverflow.scala:5)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
        at scala.collection.immutable.Range.foreach(Range.scala:160)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at DataOps$.randomText(scalaOverflow.scala:5)
        at DataOps$.infiniteDataStream(scalaOverflow.scala:16)
        at DataOps$$anonfun$infiniteDataStream$1.apply(scalaOverflow.scala:16)
        at DataOps$$anonfun$infiniteDataStream$1.apply(scalaOverflow.scala:16)
        at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1117)
        at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1107)
        at scala.collection.immutable.Stream$$anonfun$flatMap$1.apply(Stream.scala:458)
        at scala.collection.immutable.Stream$$anonfun$flatMap$1.apply(Stream.scala:458)
        at scala.collection.immutable.Stream.append(Stream.scala:241)
        at scala.collection.immutable.Stream$$anonfun$append$1.apply(Stream.scala:241)

UPDATE

Actually, the reason of this streaming is rooted in the method below. The whole point being to turn a java while loop into a functional friendly Stream:

import java.sql.{Connection, ResultSet, Statement, DriverManager}
def sqlStream(psqlResult: ResultSet, colname:String): Stream[(Int,String)] = {
    val state:Boolean = psqlResult.next()
    if (state && psqlResult.getString(colname) != null)
        (psqlResult.getRow(), psqlResult.getString(colname))#::sqlStream(psqlResult, colname)
    else if (state)
        sqlStream(psqlResult, colname)
    else
        Stream[(Int,String)]()
}

Should I have considered a better alternative?

Thanks.

1 Answer 1

2

The parameter datastream in dummyStringStreamSplit effectively acts like a val and maintains a reference to the beginning of the passed-in stream. This is what causes the unbounded memory use and the eventual GC overhead limit exceeded error.

There is really no way to make a method that takes a Stream and computes something based on every element (rather than just returning a new Stream) safe. At the very least, there is no way to guarantee that client code didn't pass you a Stream being held in a variable somewhere.

If you instead define dummyStringStreamSplit like:

def dummyStringStreamSplit(datastream: Stream[String]) =
  datastream.flatMap(txt => txt.split(" "))

You can then do:

println("splitting looping over random text")
def datastream3:Stream[String] = DataOps.infiniteDataStream(n_lines)
def datastream3Split = BasicOps.dummyStringStreamSplit(datastream3)
datastream3Split.foreach(word => word)
println("RANDOM LINE SPLIT OK: no heap overflow")

And you won't get the GC overhead limit exceeded error.

Sign up to request clarification or add additional context in comments.

2 Comments

Thanks for answer. Actually I don't understand why even if I def every streamed mapping inside the method I cannot get the expected behavior. I have updated my post and explained why I am messing with def Stream. Maybe I should have picked another approach?
Use Iterator instead. With Stream it is tricky to tell if you are going to blow the heap or not.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.