1

My goal is to count the sum of elements in a binary tree using ExecutorService in Java and then collect the result for each task using the CompletionService.

The user gives the tree height, the level at which the parallelism should start, and the number of threads to be used. I know that the ExecutorService should spawn exactly the same number of threads the user has given it and the completion service should spawn exactly N number of tasks in the preProcess method, where N is 2^(level of parallelism), since at a certain level, n, we are going to have 2^n nodes.

My problem is that I don't know how to start traversing the tree from a given height and how to use the CompletionService to gather the results in my postProcess method. Also each time a new task is spawned, the number of total tasks is increased by one and each time the CompletionService has got a result back, the number of tasks should be reduced by one.

I was able to use the CompletionService in the processTreeParallel function, but I really don't understand how I can use it in my postProcess method.

Here is my code:

import java.util.concurrent.*;

public class TreeCalculation {
    // tree level to go parallel
    int levelParallel;
    // total number of generated tasks
    long totalTasks;
    // current number of open tasks
    long nTasks;
    // total height of tree
    int height;
    // Executors
    ExecutorService exec;
    CompletionService<Long> cs;
    TreeCalculation(int height, int levelParallel) {
        this.height = height;
        this.levelParallel = levelParallel;
    }

    void incrementTasks() {
        ++nTasks;
        ++totalTasks;
    }

    void decrementTasks() {
        --nTasks;
    }

    long getNTasks() {
        return nTasks;
    }
    // Where the ExecutorService should be initialized 
    // with a specific threadCount
    void preProcess(int threadCount) {

        exec = Executors.newFixedThreadPool(threadCount);
        cs = new ExecutorCompletionService<Long>(exec);
        nTasks = 0;
        totalTasks = 0;
    }
    // Where the CompletionService should collect the results;
    long postProcess() {
        long result = 0;
        return result;
    }

    public static void main(String[] args) {
        if (args.length != 3) {
            System.out.println(
                "usage: java Tree treeHeight levelParallel nthreads\n");
            return;
        }
        int height = Integer.parseInt(args[0]);
        int levelParallel = Integer.parseInt(args[1]);
        int threadCount = Integer.parseInt(args[2]);

        TreeCalculation tc = new TreeCalculation(height, levelParallel);

        // generate balanced binary tree
        Tree t = Tree.genTree(height, height);

        //System.gc();

        // traverse sequential
        long t0 = System.nanoTime();
        long p1 = t.processTree();
        double t1 = (System.nanoTime() - t0) * 1e-9;

        t0 = System.nanoTime();
        tc.preProcess(threadCount);
        long p2 = t.processTreeParallel(tc);
        p2 += tc.postProcess();
        double t2 = (System.nanoTime() - t0) * 1e-9;

        long ref = (Tree.counter * (Tree.counter + 1)) / 2;
        if (p1 != ref)
            System.out.printf("ERROR: sum %d != reference %d\n", p1, ref);
        if (p1 != p2)
            System.out.printf("ERROR: sum %d != parallel %d\n", p1, p2);
        if (tc.totalTasks != (2 << levelParallel)) {
            System.out.printf("ERROR: ntasks %d != %d\n", 
                2 << levelParallel, tc.totalTasks);
        }

        // print timing
        System.out.printf("tree height: %2d "
            + "sequential: %.6f "
            + "parallel with %3d threads and %6d tasks: %.6f  "
            + "speedup: %.3f count: %d\n",
            height, t1, threadCount, tc.totalTasks, t2, t1 / t2, ref);
    }
}

// ============================================================================

class Tree {

    static long counter; // counter for consecutive node numbering

    int level; // node level
    long value; // node value
    Tree left; // left child
    Tree right; // right child

    // constructor
    Tree(long value) {
        this.value = value;
    }

    // generate a balanced binary tree of depth k
    static Tree genTree(int k, int height) {
        if (k < 0) {
            return null;
        } else {
            Tree t = new Tree(++counter);
            t.level = height - k;
            t.left = genTree(k - 1, height);
            t.right = genTree(k - 1, height);
            return t;
        }
    }

    // ========================================================================
    // traverse a tree sequentially

    long processTree() {
        return value
            + ((left == null) ? 0 : left.processTree())
            + ((right == null) ? 0 : right.processTree());
    }

    // ========================================================================
    // traverse a tree parallel
    // This is where I was able to use the CompletionService
    long processTreeParallel(TreeCalculation tc) {

        tc.totalTasks = 0;
        for(long i =0; i<(long)Math.pow(tc.levelParallel, 2); i++)
        {
            tc.incrementTasks();
            tc.cs.submit(new Callable<Long>(){
                @Override
                public Long call() throws Exception {
                    return processTree();
                }

            });
        }
        Long result = Long.valueOf(0);
        for(int i=0; i<(long)Math.pow(2,tc.levelParallel); i++) {
            try{
                result += tc.cs.take().get();
                tc.decrementTasks();
            }catch(Exception e){}

        }
        return result;
    }
}
1
  • see on forkjoinPool Commented May 13, 2018 at 11:05

2 Answers 2

1

The basic idea here is that you traverse the tree, and compute the results just like you did in the processTree method. But as soon as the level is reached at which the parallel computation is supposed to start (the levelParallel), you just spawn a task that actually calls processTree internally. This will take care of the remaining part of the tree.

processTreeParallel             0
                               / \    
                              /   \    
processTreeParallel          1     2
                            / \   / \    
processTreeParallel        3   4 5   6  <- levelParallel
                           |   | |   |
processTree call for each: v   v v   v
                          +---------------+
tasks for executor:       |T   T T   T    |
                          +---------------+
completion service         |
fetches tasks and          v
sums them up:              T+T+T+T  -> result

You then have to add the result that was computed by the sequential part of the processTreeParallel method, and the task-results that are summed up by the completion service.

The processTreeParallel method could thus be implemented like this:

long processTreeParallel(TreeCalculation tc)
{
    if (level < tc.levelParallel)
    {
        long leftResult = left.processTreeParallel(tc);
        long rightResult = right.processTreeParallel(tc);
        return value + leftResult + rightResult;
    }
    tc.incrementTasks();
    tc.cs.submit(new Callable<Long>()
    {
        @Override
        public Long call() throws Exception
        {
            return processTree();
        }
    });
    return 0;
}

The complete program is shown here:

import java.util.concurrent.*;

public class TreeCalculation
{
    // tree level to go parallel
    int levelParallel;
    // total number of generated tasks
    long totalTasks;
    // current number of open tasks
    long nTasks;
    // total height of tree
    int height;
    // Executors
    ExecutorService exec;
    CompletionService<Long> cs;

    TreeCalculation(int height, int levelParallel)
    {
        this.height = height;
        this.levelParallel = levelParallel;
    }

    void incrementTasks()
    {
        ++nTasks;
        ++totalTasks;
    }

    void decrementTasks()
    {
        --nTasks;
    }

    long getNTasks()
    {
        return nTasks;
    }

    // Where the ExecutorService should be initialized
    // with a specific threadCount
    void preProcess(int threadCount)
    {
        exec = Executors.newFixedThreadPool(threadCount);
        cs = new ExecutorCompletionService<Long>(exec);
        nTasks = 0;
        totalTasks = 0;
    }

    // Where the CompletionService should collect the results;
    long postProcess()
    {
        exec.shutdown();
        long result = 0;
        for (int i = 0; i < (long) Math.pow(2, levelParallel); i++)
        {
            try
            {
                result += cs.take().get();
                decrementTasks();
            }
            catch (Exception e)
            {
                e.printStackTrace();
            }
        }
        return result;
    }

    public static void main(String[] args)
    {

        int height = 22;
        int levelParallel = 3;
        int threadCount = 4;
        if (args.length != 3)
        {
            System.out.println(
                "usage: java Tree treeHeight levelParallel nthreads\n");
            System.out.println("Using default values for test");
        }
        else
        {
            height = Integer.parseInt(args[0]);
            levelParallel = Integer.parseInt(args[1]);
            threadCount = Integer.parseInt(args[2]);

        }

        TreeCalculation tc = new TreeCalculation(height, levelParallel);

        // generate balanced binary tree
        Tree t = Tree.genTree(height, height);

        // traverse sequential
        long t0 = System.nanoTime();
        long p1 = t.processTree();
        double t1 = (System.nanoTime() - t0) * 1e-9;

        t0 = System.nanoTime();
        tc.preProcess(threadCount);
        long p2 = t.processTreeParallel(tc);
        p2 += tc.postProcess();
        double t2 = (System.nanoTime() - t0) * 1e-9;

        long ref = (Tree.counter * (Tree.counter + 1)) / 2;
        if (p1 != ref)
            System.out.printf("ERROR: sum %d != reference %d\n", p1, ref);
        if (p1 != p2)
            System.out.printf("ERROR: sum %d != parallel %d\n", p1, p2);
        if (tc.totalTasks != (1 << levelParallel))
        {
            System.out.printf("ERROR: ntasks %d != %d\n", 1 << levelParallel,
                tc.totalTasks);
        }

        // print timing
        System.out.printf("tree height: %2d\n" 
            + "sequential: %.6f\n"
            + "parallel with %3d threads and %6d tasks: %.6f\n"
            + "speedup: %.3f count: %d\n",
            height, t1, threadCount, tc.totalTasks, t2, t1 / t2, ref);
    }
}

// ============================================================================

class Tree
{

    static long counter; // counter for consecutive node numbering

    int level; // node level
    long value; // node value
    Tree left; // left child
    Tree right; // right child

    // constructor
    Tree(long value)
    {
        this.value = value;
    }

    // generate a balanced binary tree of depth k
    static Tree genTree(int k, int height)
    {
        if (k < 0)
        {
            return null;
        }

        Tree t = new Tree(++counter);
        t.level = height - k;
        t.left = genTree(k - 1, height);
        t.right = genTree(k - 1, height);
        return t;
    }

    // ========================================================================
    // traverse a tree sequentially

    long processTree()
    {
        return value 
            + ((left == null) ? 0 : left.processTree())
            + ((right == null) ? 0 : right.processTree());
    }

    // ========================================================================
    // traverse a tree parallel
    long processTreeParallel(TreeCalculation tc)
    {
        if (level < tc.levelParallel)
        {
            long leftResult = left.processTreeParallel(tc);
            long rightResult = right.processTreeParallel(tc);
            return value + leftResult + rightResult;
        }
        tc.incrementTasks();
        tc.cs.submit(new Callable<Long>()
        {
            @Override
            public Long call() throws Exception
            {
                return processTree();
            }
        });
        return 0;
    }
}
Sign up to request clarification or add additional context in comments.

Comments

1

I assume that you wish to speed up the "tree processing" task by running different parts of the calculation in parallel.

As far as I can tell, your current solution submits multiple callables that each do the exact same thing, namely to each process the whole tree. This means that you are doing the same overall task multiple times. This is unlikely to be what you want. Instead, you likely want to split the overall task into several partial tasks. The partial tasks should be non-overlapping and together cover the whole task. With those partial tasks, you then do them in parallel, and collect the results somehow.

Since you are doing execution on a tree, you will have to find some way to divide the processing of the tree into suitable parts. Preferably in a way that makes it easier to design and implement, as well as with mostly decent size chunks such that parallelization will be more effective.

The current parallel calculation is also wrong, as you can see from running the program with input like:

java TreeCalculation 10 2 4

The calls to Math.pow(tc.levelParallel, 2) and Math.pow(2,tc.levelParallel) also differ.

Also beware of memory consistency issues. I did not see any at a glance, though you do mutate memory here and there.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.