3

Background

I have been reading the book Functional Programming in Scala, and have some questions regarding the content in Chapter 7: Purely functional parallelism.

Here is the code for the answers in the book: Par.scala, but I am confused about certain part of it.

Here is the first part of the code of Par.scala, which stands for Parallelism:

import java.util.concurrent._

object Par {
  type Par[A] = ExecutorService => Future[A]

  def unit[A](a: A): Par[A] = (es: ExecutorService) => UnitFuture(a)

  private case class UnitFuture[A](get: A) extends Future[A] {
    def isDone = true
    def get(timeout: Long, units: TimeUnit): A = get
    def isCancelled = false
    def cancel(evenIfRunning: Boolean): Boolean = false
  }

  def map2[A, B, C](a: Par[A], b: Par[B])(f: (A, B) => C): Par[C] =
    (es: ExecutorService) => {
      val af = a(es)
      val bf = b(es)
      UnitFuture(f(af.get, bf.get))
    }

  def fork[A](a: => Par[A]): Par[A] =
    (es: ExecutorService) => es.submit(new Callable[A] {
      def call: A = a(es).get
    })

  def lazyUnit[A](a: => A): Par[A] =
    fork(unit(a))

  def run[A](es: ExecutorService)(a: Par[A]): Future[A] = a(es)

  def asyncF[A, B](f: A => B): A => Par[B] =
    a => lazyUnit(f(a))

  def map[A, B](pa: Par[A])(f: A => B): Par[B] =
    map2(pa, unit(()))((a, _) => f(a))
}
  • The simplest possible model for Par[A] might be ExecutorService => Future[A], and run simply returns the Future.
  • unit promotes a constant value to a parallel computation by returning a UnitFuture, which is a simple implementation of Future that just wraps a constant value.
  • map2 combines the results of two parallel computations with a binary function.
  • fork marks a computation for concurrent evaluation. The evaluation won’t actually occur until forced by run. Here is with its simplest and most natural implementation of it. Even though it has its problems, let's first put them aside.
  • lazyUnit wraps its unevaluated argument in a Par and marks it for concurrent evaluation.
  • run extracts a value from a Par by actually performing the computation.
  • asyncF converts any function A => B to one that evaluates its result asynchronously.

Questions

The fork is the function confuses me a lot here, because it takes a lazy argument, which will be evaluated later when it is called. Then my questions are more about when we should use this fork, i.e., when we need lazy-evaluation and when we need to have the value directly.

Here is an exercise from the book:

EXERCISE 7.5 Hard: Write this function, called sequence. No additional primitives are required. Do not call run.

def sequence[A](ps: List[Par[A]]): Par[List[A]]

And here is the answers (offered here).


First

  def sequence_simple[A](l: List[Par[A]]): Par[List[A]] =
    l.foldRight[Par[List[A]]](unit(List()))((h, t) => map2(h, t)(_ :: _))

What is the different between above code and the following:

  def sequence_simple[A](l: List[Par[A]]): Par[List[A]] =
    l.foldLeft[Par[List[A]]](unit(List()))((t, h) => map2(h, t)(_ :: _))

Additionally

  def sequenceRight[A](as: List[Par[A]]): Par[List[A]] =
    as match {
      case Nil => unit(Nil)
      case h :: t => map2(h, fork(sequenceRight(t)))(_ :: _)
    }

  def sequenceBalanced[A](as: IndexedSeq[Par[A]]): Par[IndexedSeq[A]] = fork {
    if (as.isEmpty) unit(Vector())
    else if (as.length == 1) map(as.head)(a => Vector(a))
    else {
      val (l,r) = as.splitAt(as.length/2)
      map2(sequenceBalanced(l), sequenceBalanced(r))(_ ++ _)
    }
  }

In sequenceRight, fork is used when recursive function is directly called. However, in sequenceBalanced, fork is used outside of the whole function body.

Then, what is the differences or above code and the following (where we switched the places of fork):

  def sequenceRight[A](as: List[Par[A]]): Par[List[A]] = fork {
    as match {
      case Nil => unit(Nil)
      case h :: t => map2(h, sequenceRight(t))(_ :: _)
    }
  }

  def sequenceBalanced[A](as: IndexedSeq[Par[A]]): Par[IndexedSeq[A]] =
    if (as.isEmpty) unit(Vector())
    else if (as.length == 1) map(as.head)(a => Vector(a))
    else {
      val (l,r) = as.splitAt(as.length/2)
      map2(fork(sequenceBalanced(l)), fork(sequenceBalanced(r)))(_ ++ _)
    }

Finally, given the sequence defined above, we have the following function:

  def parMap[A,B](ps: List[A])(f: A => B): Par[List[B]] = fork {
    val fbs: List[Par[B]] = ps.map(asyncF(f))
    sequence(fbs)
  }

I would like to know, can I also implement the function in the following way, which is by applying the lazyUnit defined in the beginning? Is this implementation lazyUnit(ps.map(f)) lazy?

  def parMapByLazyUnit[A, B](ps: List[A])(f: A => B): Par[List[B]] =
    lazyUnit(ps.map(f))

1 Answer 1

1

I did not completely understand your doubt. But I see a major problem with the following solution,

def parMapByLazyUnit[A, B](ps: List[A])(f: A => B): Par[List[B]] =
  lazyUnit(ps.map(f))

To understand the problem lets look at def lazyUnit,

def fork[A](a: => Par[A]): Par[A] =
  (es: ExecutorService) => es.submit(new Callable[A] {
    def call: A = a(es).get
  })

def lazyUnit[A](a: => A): Par[A] =
  fork(unit(a))

So... lazyUnit takes an expression of type => A and submits it to ExecutorService to get evaluated. And returns the wrapped result of this parallel computation as Par[A].

In parMap for every element of ps: List[A], we not only have to evaluate the corresponding mapping using the function f: A => B but we have to do these evaluations in parallel.

But our solution lazyUnit(ps.map(f)) will submit the whole { ps.map(f) } evaluation as a single task to our ExecutionService. Which means we are not doing it in parallel.

What we need to do is make sure that for each element a in ps: [A], the function f: A => B is executed as a separate task for our ExecutorService.

Now, as we learned from our implementation is that we can run an expression of type exp: => A by using lazyUnit(exp) to get a result: Par[A].

So, we will do exactly that for every a: A in ps: List[A],

val parMappedTmp = ps.map( a => lazyUnit(f(a) ) )

// or

val parMappedTmp = ps.map( a => asyncF(f)(a) )

// or

val parMappedTmp = ps.map(asyncF(f))

But, Now our parMappedTmp is a List[Par[B]] and whereas we needed a Par[List[B]]

So, you will need a function with the following signature to get what you wanted,

def sequence[A](ps: List[Par[A]]): Par[List[A]]

Once you have it,

val parMapped = sequence(parMappedTmp)
Sign up to request clarification or add additional context in comments.

5 Comments

I don't think you have to put () => f(a) instead of f(a) for a lazy argument. For example, a function like def f(p: => Int, eval : Boolean) = if (eval) println(p), you can call if like f(3/0, false), instead of f(() => 3/0, false)
I think this is the purpose of lazy argument evaluation.
Yup. I think you are correct about that. But rest of the explanation still holds.
could you please modify the first part of your answer? then let's further discuss the rest and try to make it clear.
Thanks. Your answer answered my Finally question. Could you please also address the First and Additionally questions?

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.