6

I attend the class Parallel Programming, and it shows the parallel interface:

def parallel[A, B](taskA: => A, taskB: => B): (A, B) = {
  val ta = taskA
  val tb = task {taskB}
  (ta, tb.join())
}

and the following is wrong:

def parallel[A, B](taskA: => A, taskB: => B): (A, B) = {
  val ta = taskB
  val tb = task {taskB}.join()
  (ta, tb)
}

see the interface more at https://gist.github.com/ChenZhongPu/fe389d30626626294306264a148bd2aa

It also show us the right way to execute four tasks:

def parallel[A, B, C, D](taskA: => A, taskB: => B, taskC: => C, taskD: => D): (A, B, C, D) = {
    val ta = task { taskA }
    val tb = task { taskB }
    val tc = task { taskC }
    val td = taskD
    (ta.join(), tb.join(), tc.join(), td)
}

My question: if I don't know the number of tasks advance (a List of tasks), how can I call join for each tasks correctly?

tasks.map(_.join()) // wrong

Edit

The similar discussion also occurs at Discuss this week's module: Parallel Programming

6
  • Note: map returns a new collection with each element transformed by the function. Maybe you need the transformation without creating the new collection? Commented Aug 2, 2016 at 5:30
  • @jwvh how to do it? Commented Aug 2, 2016 at 5:32
  • How about just using foreach since you are not going to return anything? Commented Aug 2, 2016 at 7:12
  • @sebszyller using foreach will also execute those tasks sequentially, not parallelly. Commented Aug 2, 2016 at 8:02
  • @chenzhongpu I meant joining them not actual execution. Commented Aug 2, 2016 at 8:07

3 Answers 3

3

Using framework from Parallel Programming course

You can implement the method like this:

def parallel[A](tasks: (() => A)*): Seq[A] = {
  if (tasks.isEmpty) Nil
  else {
    val pendingTasks = tasks.tail.map(t => task { t() })
    tasks.head() +: pendingTasks.map(_.join())
  }
}

(Note that you can't have variable number of by-name arguments - though this can change)

And then use it like that:

object ParallelUsage {
  def main(args: Array[String]) {
    val start = System.currentTimeMillis()

    // Use a list of tasks:
    val tasks = List(longTask _, longTask _, longTask _, longTask _)
    val results = parallel(tasks: _*)
    println(results)

    // or pass any number of individual tasks directly:
    println(parallel(longTask, longTask, longTask))
    println(parallel(longTask, longTask))
    println(parallel(longTask))
    println(parallel())

    println(s"Done in ${ System.currentTimeMillis() - start } ms")
  }

  def longTask() = {
    println("starting longTask execution")
    Thread.sleep(1000)
    42 + Math.random
  }
}

Using Scala's parallel collections

You can't go simpler than this:

val tasks = Vector(longTask _, longTask _, longTask _)
val results = tasks.par.map(_()).seq
Sign up to request clarification or add additional context in comments.

Comments

1

Looking around for a practical way to build parallel() I found it can be built from Future. The paradigm will seem familiar to anyone using modern Javascript Promises:

import scala.concurrent.{Await,Future}
import scala.concurrent.duration.Duration
import scala.concurrent.ExecutionContext.Implicits.global

def parallel[A, B](taskA: =>A, taskB: =>B): (A,B) = {
  val fB:Future[B] = Future { taskB }
  val a:A = taskA
  val b:B = Await.result(fB, Duration.Inf)
  (a,b)
}

This spins off taskB to it own thread and does taskA in the main thread. We do taskA and wait, forever if necessary, for fB to finish. Beware I haven't tested exceptions with this setup and it might stall or misbehave.

Comments

0

Inspired by Future.sequence and cheating a bit. You need a Task implementation that's also a Monad to make this design work.

  /** Transforms a `TraversableOnce[Task[A]]` into a `Task[TraversableOnce[A]]`.
   *  Useful for reducing many `Task`s into a single `Task`.
   */
  def parallel[
    A,
    M[X] <: TraversableOnce[X]
  ](in: M[Task[A]])(
    implicit cbf: CanBuildFrom[M[Task[A]], A, M[A]],
    executor: ExecutionContext
  ): Task[M[A]] = {
    in.foldLeft(Task.point(cbf(in))) {
      (fr, fa) => for (r <- fr; a <- fa) yield (r += a)
    }.map(_.result())(executor)
  }

This can execute operations in parallel for most Scala collections, the only condition is that the Task defines map and flatMap, whichever the implementation is, because you can abstract over the particular collection type using the implicit builder construct, that's internal to the Scala library.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.