1

I have the following scenario:

  1. Thread A will send a number of tasks to both Thread B and C. (the exact number of tasks is unknown)

  2. For each of the tasks, thread A will send it to B and C at the same time (asynchronously), and then if either of the B or C finished the task successfully or both failed, A would continue to send the next task. The idea here is to avoid blocking as much as possible. i.e., for the same task, when B finished it while C is still processing, A can send next task immediately and don't need wait for C to get the result.

  3. it's expected that the slower one in B and C can skip some tasks as long as the task is done by the other. For example, B may finish task t1 t2 t3 t4, and C only finish t1 t4 due to that when C received t2 and t3, it's still processing t1 for some reason.

is there any thread synchronization construct suitable for this? I am checking java.util.concurrent.Phaser, but seems it doesn't fit my need. Any comments are welcome, thanks in advance.

4
  • 2
    Why do both B and C have to start processing the same task, by the way? Do they process it somehow differently? If not, what is the point? Commented Jun 26, 2014 at 10:48
  • 1
    Is it a requirement that tasks are bound to threads? Kind of an academic exercise? Commented Jun 26, 2014 at 12:00
  • Yes, please give more context. I have the feeling you should probably not be using threads. Threads are messy and there are newer, and simpler, constructs to handle concurrency. Commented Jun 26, 2014 at 19:22
  • @ErikAllik the task has some external dependencies, and we have a dual setup for HA purpose but we only require any one of them succeed. Thanks! Commented Jun 27, 2014 at 3:02

3 Answers 3

2

This would be easier if you used Future or actors, instead of Threads as a building block. Doing this directly on top of threads can lead to many problems, since you have to take care of details, like queueing incoming messages. Another problem is that I don't get if what you want to solve mirrors what you're asking for - where's the value in executing the same task twice, on 2 different threads? That just feels wrong.

Here's a naive non-blocking implementation to get an idea of what's involved BUT DO NOT DO THIS in real code (really do consider higher-level abstractions):

val queue = new AtomicReference(Queue.empty[Runnable])

def worker() = new Thread(new Runnable {
  @tailrec
  def run() = {
    val currentQueue = queue.get
    if (currentQueue.nonEmpty) {
      val (task, updatedQueue) = currentQueue.dequeue
      try {
        task.run()
      } catch {
        case NonFatal(ex) =>
          ex.printStackTrace()
      }

      // if this fails, then another worker succeeded
      queue.compareAndSet(currentQueue, updatedQueue)
      // process next task in queue
      if (updatedQueue.nonEmpty) run()
    }
  }
})

@tailrec
def submitTask(task: Runnable): Unit =  {
  val currentQueue = queue.get
  val newQueue = currentQueue.enqueue(task)

  if (!queue.compareAndSet(currentQueue, newQueue))
    submitTask(task)
  else if (currentQueue.isEmpty) {
    // because of the CAS above, only 2 workers will be
    // active at the same time
    worker().start()
    worker().start()
  }
}
Sign up to request clarification or add additional context in comments.

Comments

1

I would use a variation on Balancing Workloads Across Nodes with Akka 2, except instead of assigning one work item per worker, it assigns the current work item to each worker that requests work until the work item is completed.

It may seem like overkill but this pattern scales very well and is easy to customize. I have two projects in Production using it and there are others. This approach is a "pulling" approach.

A "pushing" approach using actors would require worker actors that only process the last message in their mailbox. The main actor would then forward all messages to all worker actors.

Comments

0

Consider using some Executor, for example Executors.newFixedThreadPool(). It allows:

  • Submitting task that will be processed in future;
  • Threads that work on tasks won't try completing the same task;
  • Future interface returned for each submitted task let you check whether task is completed successfully, still in progress or failed;
  • Blocking mechanism you described in 2) can be achieved relying on the fact that Future.get() method blocks until the task is finished/failed to complete;

I think this features cover the behavior requirements you wrote.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.