My goal is to count the sum of elements in a binary tree using ExecutorService in Java and then collect the result for each task using the CompletionService.
The user gives the tree height, the level at which the parallelism should start, and the number of threads to be used. I know that the ExecutorService should spawn exactly the same number of threads the user has given it and the completion service should spawn exactly N number of tasks in the preProcess method, where N is 2^(level of parallelism), since at a certain level, n, we are going to have 2^n nodes.
My problem is that I don't know how to start traversing the tree from a given height and how to use the CompletionService to gather the results in my postProcess method. Also each time a new task is spawned, the number of total tasks is increased by one and each time the CompletionService has got a result back, the number of tasks should be reduced by one.
I was able to use the CompletionService in the processTreeParallel function, but I really don't understand how I can use it in my postProcess method.
Here is my code:
import java.util.concurrent.*;
public class TreeCalculation {
// tree level to go parallel
int levelParallel;
// total number of generated tasks
long totalTasks;
// current number of open tasks
long nTasks;
// total height of tree
int height;
// Executors
ExecutorService exec;
CompletionService<Long> cs;
TreeCalculation(int height, int levelParallel) {
this.height = height;
this.levelParallel = levelParallel;
}
void incrementTasks() {
++nTasks;
++totalTasks;
}
void decrementTasks() {
--nTasks;
}
long getNTasks() {
return nTasks;
}
// Where the ExecutorService should be initialized
// with a specific threadCount
void preProcess(int threadCount) {
exec = Executors.newFixedThreadPool(threadCount);
cs = new ExecutorCompletionService<Long>(exec);
nTasks = 0;
totalTasks = 0;
}
// Where the CompletionService should collect the results;
long postProcess() {
long result = 0;
return result;
}
public static void main(String[] args) {
if (args.length != 3) {
System.out.println(
"usage: java Tree treeHeight levelParallel nthreads\n");
return;
}
int height = Integer.parseInt(args[0]);
int levelParallel = Integer.parseInt(args[1]);
int threadCount = Integer.parseInt(args[2]);
TreeCalculation tc = new TreeCalculation(height, levelParallel);
// generate balanced binary tree
Tree t = Tree.genTree(height, height);
//System.gc();
// traverse sequential
long t0 = System.nanoTime();
long p1 = t.processTree();
double t1 = (System.nanoTime() - t0) * 1e-9;
t0 = System.nanoTime();
tc.preProcess(threadCount);
long p2 = t.processTreeParallel(tc);
p2 += tc.postProcess();
double t2 = (System.nanoTime() - t0) * 1e-9;
long ref = (Tree.counter * (Tree.counter + 1)) / 2;
if (p1 != ref)
System.out.printf("ERROR: sum %d != reference %d\n", p1, ref);
if (p1 != p2)
System.out.printf("ERROR: sum %d != parallel %d\n", p1, p2);
if (tc.totalTasks != (2 << levelParallel)) {
System.out.printf("ERROR: ntasks %d != %d\n",
2 << levelParallel, tc.totalTasks);
}
// print timing
System.out.printf("tree height: %2d "
+ "sequential: %.6f "
+ "parallel with %3d threads and %6d tasks: %.6f "
+ "speedup: %.3f count: %d\n",
height, t1, threadCount, tc.totalTasks, t2, t1 / t2, ref);
}
}
// ============================================================================
class Tree {
static long counter; // counter for consecutive node numbering
int level; // node level
long value; // node value
Tree left; // left child
Tree right; // right child
// constructor
Tree(long value) {
this.value = value;
}
// generate a balanced binary tree of depth k
static Tree genTree(int k, int height) {
if (k < 0) {
return null;
} else {
Tree t = new Tree(++counter);
t.level = height - k;
t.left = genTree(k - 1, height);
t.right = genTree(k - 1, height);
return t;
}
}
// ========================================================================
// traverse a tree sequentially
long processTree() {
return value
+ ((left == null) ? 0 : left.processTree())
+ ((right == null) ? 0 : right.processTree());
}
// ========================================================================
// traverse a tree parallel
// This is where I was able to use the CompletionService
long processTreeParallel(TreeCalculation tc) {
tc.totalTasks = 0;
for(long i =0; i<(long)Math.pow(tc.levelParallel, 2); i++)
{
tc.incrementTasks();
tc.cs.submit(new Callable<Long>(){
@Override
public Long call() throws Exception {
return processTree();
}
});
}
Long result = Long.valueOf(0);
for(int i=0; i<(long)Math.pow(2,tc.levelParallel); i++) {
try{
result += tc.cs.take().get();
tc.decrementTasks();
}catch(Exception e){}
}
return result;
}
}