17

I have an iteration vals: Iterable[T] and a long-running function without any relevant side effects: f: (T => Unit). Right now this is applied to vals in the obvious way:

vals.foreach(f)

I would like the calls to f to be done concurrently (within reasonable limits). Is there an obvious function somewhere in the Scala base library? Something like:

Concurrent.foreach(8 /* Number of threads. */)(vals, f)

While f is reasonably long running, it is short enough that I don't want the overhead of invoking a thread for each call, so I am looking for something based on a thread pool.

7 Answers 7

24

Many of the answers from 2009 still use the old scala.actors.Futures._, which are no longer in the newer Scala. While Akka is the preferred way, a much more readable way is to just use parallel (.par) collections:

vals.foreach { v => f(v) }

becomes

vals.par.foreach { v => f(v) }

Alternatively, using parMap can appear more succinct though with the caveat that you need to remember to import the usual Scalaz*. As usual, there's more than one way to do the same thing in Scala!

Sign up to request clarification or add additional context in comments.

Comments

13

Scalaz has parMap. You would use it as follows:

import scalaz.Scalaz._
import scalaz.concurrent.Strategy.Naive

This will equip every functor (including Iterable) with a parMap method, so you can just do:

vals.parMap(f)

You also get parFlatMap, parZipWith, etc.

Comments

10

I like the Futures answer. However, while it will execute concurrently, it will also return asynchronously, which is probably not what you want. The correct approach would be as follows:

import scala.actors.Futures._

vals map { x => future { f(x) } } foreach { _() }

6 Comments

Be careful that vals is a strict collection -- if it's lazy (and in Scala 2.7 this includes the Range class), the futures won't be created until each one is needed by foreach, and nothing will happen in parallel.
I suppose we could solve that problem by injecting another foreach call between the map and the current foreach. Thus: vals map { x => future { f(x) } } foreach { x => x } foreach { _() }
That would be a map we have to inject, not another foreach? And it is not clear to me that the map of a lazy collection is strict. The safest way may be to call toArray.
You're right, foreach was (obviously) the wrong thing to inject since it returns Unit. My bad! :-) The map function on lazy collections is almost always non-strict, so we can either call toList (or toArray), or we can project and then force: (vals map { x => future { f(x) } } projection).force foreach { _() }. I don't know whether that's better than simply toList, but it is certainly different.
What does it mean when you say it "returns asynchronously"? Does it imply that it is non-blocking? (and why would that be a problem?)
|
3

I had some issues using scala.actors.Futures in Scala 2.8 (it was buggy when I checked). Using java libs directly worked for me, though:

final object Parallel {
  val cpus=java.lang.Runtime.getRuntime().availableProcessors
  import java.util.{Timer,TimerTask}
  def afterDelay(ms: Long)(op: =>Unit) = new Timer().schedule(new TimerTask {override def run = op},ms)
  def repeat(n: Int,f: Int=>Unit) = {
    import java.util.concurrent._
    val e=Executors.newCachedThreadPool //newFixedThreadPool(cpus+1)
    (0 until n).foreach(i=>e.execute(new Runnable {def run = f(i)}))
    e.shutdown
    e.awaitTermination(Math.MAX_LONG, TimeUnit.SECONDS)
  }
}

Comments

2

I'd use scala.actors.Futures:

vals.foreach(t => scala.actors.Futures.future(f(t)))

Comments

2

The latest release of Functional Java has some higher-order concurrency features that you can use.

import fjs.F._
import fj.control.parallel.Strategy._
import fj.control.parallel.ParModule._
import java.util.concurrent.Executors._

val pool = newCachedThreadPool
val par = parModule(executorStrategy[Unit](pool))

And then...

par.parMap(vals, f)

Remember to shutdown the pool.

Comments

0

You can use the Parallel Collections from the Scala standard library. They're just like ordinary collections, but their operations run in parallel. You just need to put a par call before you invoke some collections operation.

import scala.collection._

val array = new Array[String](10000)
for (i <- (0 until 10000).par) array(i) = i.toString

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.