1

everyone. I have a misunderstanding of working with Thread Pools. The real result differs from API description of this class. When I am using LinkedBlockedQueue in Thread pool with it does not reuse threads, thread pool wait KeepAliveTime that was set in the constructor, then kill this thread and create a new one. When I set KeepAliveTime small, like 1 second or less it deletes thread a recreate it, but if I set for a minute new threads aren't created because MaxPoolSize doesn't allow it and queue already full so all tasks rejected, but threads for which keepAliveTime set minute doing nothing this time. I am quite new and don't understand why it doesn't reuse these threads. after keepTimeAlive expiration it kills these thread and if queue full, it creates a new one. Why it works this way? As far as I understood from API it has to reuse it if threads are idle during keepAliveTime. It reuses threads when I used SynchronousQueue, but not LinkedBlockingQueue.

public class Main {

    private volatile int remainingTasksCount;
    private volatile static ThreadPoolExecutor consumer = new ThreadPoolExecutor(1, 2, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>(3));

    private static Runnable task = () -> {
        System.out.println(String.format("consumer %s, id %s, size %s, active count %s, queue %s",
                Thread.currentThread().getName(), Thread.currentThread().getId(),
                consumer.getPoolSize(), consumer.getActiveCount(), 3-consumer.getQueue().remainingCapacity()));
        String s = new String();
        synchronized (s) {
            try {
                s.wait(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    };


    public static void main(String[] args) throws IOException {
        try {
            new Thread(() -> {
                while (true) {
                    try {
                        for (int i = 0; i < 5; i++) {
                            consumer.submit(task);
                        }
                        System.out.println("PUSH TASKS");
                        synchronized (Thread.currentThread()) {
                            Thread.currentThread().wait(10000);
                        }
                    } catch (Throwable th) {
                        System.out.println(th);
                    }
                }
            }).start();
        } catch (Throwable th) {
            System.out.println(th);
        }
    }  

OUTPUT

PUSH TASKS
consumer pool-1-thread-1, id 15, size 2, active count 2, queue 3
consumer pool-1-thread-2, id 16, size 2, active count 2, queue 3
consumer pool-1-thread-2, id 16, size 2, active count 2, queue 1
consumer pool-1-thread-1, id 15, size 2, active count 1, queue 2
consumer pool-1-thread-1, id 15, size 2, active count 1, queue 0
Disconnected from the target VM, address: '127.0.0.1:64434', transport: 'socket'

Process finished with exit code 1

But next time producer submit tasks, I get RejectedExecutionException

if I change keepAliveTime to 1 Second. Everything is working well, but creates new Threads.

PUSH TASKS
consumer pool-1-thread-2, id 16, size 2, active count 2, queue 3
consumer pool-1-thread-1, id 15, size 2, active count 2, queue 3
consumer pool-1-thread-2, id 16, size 2, active count 2, queue 2
consumer pool-1-thread-1, id 15, size 2, active count 2, queue 1
consumer pool-1-thread-2, id 16, size 2, active count 1, queue 0
PUSH TASKS
consumer pool-1-thread-3, id 17, size 2, active count 2, queue 3
consumer pool-1-thread-2, id 16, size 2, active count 2, queue 2
consumer pool-1-thread-3, id 17, size 2, active count 2, queue 1
consumer pool-1-thread-2, id 16, size 2, active count 2, queue 1
consumer pool-1-thread-3, id 17, size 2, active count 1, queue 0
consumer pool-1-thread-3, id 17, size 1, active count 1, queue 2
PUSH TASKS
consumer pool-1-thread-4, id 18, size 2, active count 2, queue 3
consumer pool-1-thread-3, id 17, size 2, active count 2, queue 1
consumer pool-1-thread-4, id 18, size 2, active count 2, queue 1
consumer pool-1-thread-3, id 17, size 2, active count 1, queue 0
PUSH TASKS
consumer pool-1-thread-3, id 17, size 2, active count 2, queue 2
consumer pool-1-thread-5, id 19, size 2, active count 2, queue 3
consumer pool-1-thread-3, id 17, size 2, active count 2, queue 1
consumer pool-1-thread-5, id 19, size 2, active count 2, queue 1
consumer pool-1-thread-3, id 17, size 2, active count 1, queue 0

I will be glad if someone could explain me my fault, or something basic principle that I missed

4
  • 2
    TL;DR - how about Executors.newCachedThreadPool() ? Commented Nov 7, 2017 at 12:05
  • That's the problem I wrote, that SynchronousQueue reuse threads, but LinkedBlockingQueue not (API says that any blockingQueue interface implementer reuse), it creates as many threads as you have tasks. public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } Commented Nov 7, 2017 at 12:23
  • You should not be managing the queue that the thread pool uses. That is the issue. If you use what @rkosegi suggested then the queue gets managed by the ExecutorService. If you are trying to limit the active threads then use Executors.newFixedThreadPool(15). That will only allow 15 executing threads at once but will allow you to queue up as many jobs as you like. Commented Nov 7, 2017 at 13:27
  • Where have you seen that I manage the queue? I don't need to have 15 threads anytime my program executing. It will have load 2-3 times per day. But it will be huge load. So I want to increase dynamically threads count. I suppose, that I can make more corePoolSize value and allowCoreThreadTimeOut(true). so it will decrease count of threds when threads are idle. When new task will be submittted and threadpool size < corePoolSize it will increase despite on queue not full. Commented Nov 7, 2017 at 14:20

2 Answers 2

3

It is a race condition. If you follow submit() long enough (in the source code), you will arrive to ThreadPoolExecutor.execucte():

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /* long comment block removed */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

When your submit-loop runs for the first time, execute will create new workers, and provide them your tasks without trying to push them into the queue (addWorker+return), so 2 tasks start immediately, and 3 go into the queue which can accommodate all 3 of them.

On the second time, submit-s will end up with the workQueue.offer thing, which may saturate the queue (depending on how fast the workers get to attempt consuming the new item), and when it does, the last-effort addWorker will run and fail, resulting in reject, as no new workers are allowed to be created.

Practically if you start doing 'things' in your submit-loop, it will eventually start working. For example I tried to println(i), and that was slow enough to get some tasks consumed and the loop success. When I tried print(i) that was already too fast, it died on the 4th submit, so no tasks were consumed soon enough. So it is a delicate matter, which race conditions usually are.

Sign up to request clarification or add additional context in comments.

5 Comments

Thanks, you are right as concerns principle of working Thread Pool. I aggree. But could you try to set in constructor TimeUnit.SECONDS, so it will work correctly and fast. It will kill threads that already performed task and recreates new one, but I want not to create new thread and reuse them, so I set this parameter to minutes
@qmalt Yes, that is normal: if you reduce timeout, threads will terminate during the 10 seconds wait, and the pattern will start again from scratch with create-2-threads-and-store-3-tasks-in-3-slots, which will not fail.
I still be confused. Why does thread pool not using threads during keep alive time, when threads are waiting for new tasks
@qmalt thread pool uses the threads the way you expect, but it takes time, as it happens asynchronously. Simply it is possible to fill up the queue faster than the actual threads can pull the tasks from it. Note that those worker threads are actual OS threads, and they are completely idle when you submit the tasks, so they need to wake up and get scheduled. While the submitter loop is a very fast and tight one, and it is inside a thread which is actively running.
Thanks. I have seen that on second iteration. All tasks try to go to queue,which size is 3, so another 2 one rejected. I thought before that second iteration also will be as first. but source code and your advise helped me!
0

I think you have some misunderstanding how thread pool works because of your code sample. I tried to run it and get output from 5 tasks and infinite number of RejectedExecutionException after. This happens because in case of exception Thread.currentThread().wait(10000); isn't invoked and 5 more tasks are added to the pool and this logic repeats again and again producing new exceptions. Try to surround consumer.submit(task); with try-catch block and you will see that only two threads process all tasks as expected because keepTimeAlive is longer than wait time. In the second sample keepTimeAlive is shorter than the wait time so after each wait new non-core thread is created and you see different ids after each loop invocation. Which is correct because previous non-core thread was stopped as it was idle longer than keepTimeAlive.

8 Comments

Change in your code param TimeUnit.SECONDS to MINUTES and set value 1 in constructor. You will see that you task also will be rejected despite on threads will be idle
11 12 12 11 12 12 13 13 12 12. It's output of your program. Here we can see, that thread pool creates new threads because you set too small keepAlive param. So it was waiting for a task, hasn't been received it and has been terminated, so with the new pushing tasks thread pool created new thread with new id. But I just change keepAlive param to minutes.
So thread has to wait for new task, instead of it, thread doesn't receive new task and wait while keepAlive time expire then it will be terminated and new thread will be created, but during it nothing to do threadpool reject all tasks, because it can't create new thread (due maxPoolSize param) and queue is full
threads creation isn't related with keepAlive param. Is your main question why there are rejected tasks in case of 5 tasks, 2 idle threads and blocking queue of size 3?
look at topic title. my question why does threadPool doesn't reuse thread
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.