An overview of the


In the JAVA world, if you want to perform tasks in parallel, you can use ThreadPoolExecutor. In most cases, using ThreadPoolExecutor directly will suffice, but in some scenarios, such as instantaneous high traffic, it is best to extend ThreadPoolExecutor to improve response and throughput.

JAVA IT people all over the universe should know how ThreadPoolExecutor works:

  • As long as the core thread can handle it, it keeps creating new threads;
  • If the core thread can’t cope, it throws the task into a queue;
  • Queue is full of (insert task failure means), began to create MAX threads, thread count after reaching MAX, queue also has been full, the thrown RejectedExecutionException.

One small problem with this execution process is that when the core thread can’t handle the request, it immediately adds tasks to the queue. If the queue is very long and there are many tasks, there will be frequent in-queue and out-of-queue tasks.

According to the actual pressure survey found that this operation is also a certain consumption. The JAVA API’s SynchronousQueue is a zero-length queue where tasks are delivered directly from the producer to the consumer without enqueuing.

In addition, if there are too many tasks in the queue and the MAX thread is not doing any work, then if there are too many tasks in the queue and only the poor core thread is busy, it will also affect performance.

When the core thread can’t handle the request, can it delay the queue operation? Get the MAX thread up as soon as possible to help with tasks.

If the number of threads in the current thread pool is less than the maximum number of threads, the task will continue to be created until the maximum number of threads is reached, and the task will be inserted into the queue

We do this by overriding the queue’s Offer method.

  @Override
public  boolean offer(Runnable o) {
    int currentPoolThreadSize = executor.getPoolSize();
    // If the number of threads in the pool reaches the maximum, the task is added to the queue
    if (currentPoolThreadSize == executor.getMaximumPoolSize()) {
        return super.offer(o);
    }
    // There is no need to create a thread outside the core thread
    if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
        return super.offer(o);
    }

    // If the number of threads in the pool has not reached the maximum, create the thread directly instead of throwing the task to the queue
    if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
        return false;
    }

    return super.offer(o);
}Copy the code

Notice the

if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
        return super.offer(o);
}Copy the code

It means that the core thread can still handle it, and there are idle threads, and it inserts the task into the queue. How do you tell if there are idle threads in the thread pool? This can be done using a counter that increments the calculator every time the execute method is executed and decays the counter after afterExecute is executed.

    @Override
    public void execute(Runnable command) {
        submittedTaskCount.incrementAndGet();
        // The code is not complete.
    }Copy the code
 @Override
    protected void afterExecute(Runnable r, Throwable t) {
        submittedTaskCount.decrementAndGet();
    }Copy the code

That way, when

executor.getSubmittedTaskCount() < currentPoolThreadSizeCopy the code

“, indicating that there are idle threads.


The complete code


EnhancedThreadPoolExecutor class


package executer;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class EnhancedThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {

    /** * counter indicates the number of tasks that have been submitted to the queue, especially tasks that have not yet completed. * submittedTaskCount is reduced by 1 when the task is finished. * /
    private final AtomicInteger submittedTaskCount = new AtomicInteger(0);

    public EnhancedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, TaskQueue workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new ThreadPoolExecutor.AbortPolicy());
        workQueue.setExecutor(this);
    }

    /** * overrides the afterExecute method of the parent class and decrement the counter by 1 */ when the task completes
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        submittedTaskCount.decrementAndGet();
    }


    public int getSubmittedTaskCount() {
        return submittedTaskCount.get();
    }


    /** * overrides the parent's execute method, incrementing the counter by 1 before the task starts executing. * /
    @Override
    public void execute(Runnable command) {
        submittedTaskCount.incrementAndGet();
        try {
            super.execute(command);
        } catch (RejectedExecutionException rx) {
            / / when a RejectedExecutionException, try to cast a task to the queue again, if still happen RejectedExecutionException, directly throw an exception.
            BlockingQueue<Runnable> taskQueue = super.getQueue();
            if (taskQueue instanceof TaskQueue) {
                final TaskQueue queue = (TaskQueue)taskQueue;
                if(! queue.forceTaskIntoQueue(command)) { submittedTaskCount.decrementAndGet();throw new RejectedExecutionException("Queue full"); }}else {
                submittedTaskCount.decrementAndGet();
                throwrx; }}}}Copy the code

TaskQueue

package executer;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;

public class TaskQueue extends LinkedBlockingQueue<Runnable> {
    private EnhancedThreadPoolExecutor executor;

    public TaskQueue(int capacity) {
        super(capacity);
    }

    public void setExecutor(EnhancedThreadPoolExecutor exec) {
        executor = exec;
    }

    public boolean forceTaskIntoQueue(Runnable o) {
        if (executor.isShutdown()) {
            throw new RejectedExecutionException("Executor is closed. Tasks cannot be added to queues.");
        }
        return super.offer(o);
    }

    @Override
    public  boolean offer(Runnable o) {
        int currentPoolThreadSize = executor.getPoolSize();
        // If the number of threads in the pool reaches the maximum, the task is added to the queue
        if (currentPoolThreadSize == executor.getMaximumPoolSize()) {
            return super.offer(o);
        }
        // There is no need to create a thread outside the core thread
        if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
            return super.offer(o);
        }

        // If the number of threads in the pool has not reached the maximum, create the thread directly instead of throwing the task to the queue
        if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
            return false;
        }

        return super.offer(o); }}Copy the code

TestExecuter

package executer;

import java.util.concurrent.TimeUnit;

public class TestExecuter {
    private static final int CORE_SIZE = 5;

    private static final int MAX_SIZE = 10;

    private static final long KEEP_ALIVE_TIME = 30;

    private static final int QUEUE_SIZE = 5;

    static EnhancedThreadPoolExecutor executor = new EnhancedThreadPoolExecutor(CORE_SIZE,MAX_SIZE,KEEP_ALIVE_TIME, TimeUnit.SECONDS , new TaskQueue(QUEUE_SIZE));

    public static void main(String[] args){
        for (int i = 0; i < 15; i++) {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.currentThread().sleep(1000);
                    } catch(InterruptedException e) { e.printStackTrace(); }}}); System.out.println("The current number of threads in the thread pool is:"+executor.getPoolSize()+", the number of tasks waiting to be executed in the queue is:"+ executor.getQueue().size()); }}}Copy the code

Let’s run the code and see if we can expect it. Execute the main method of the TestExecuter class directly as follows:

Thread pool is now the number of threads is: 1, the queue is waiting for the number of tasks as: 0 thread pool is now the number of threads is: 2, the queue is waiting for the number of tasks as: 0 thread pool thread number is now: 3, the queue is waiting for the number of tasks as: 0 thread pool thread number is now: The current number of threads in the thread pool is: 6. The current number of threads in the queue is: 0 The current number of threads in the thread pool is: 9. The current number of threads in the queue is: 0. The current number of threads in the thread pool is: 10. The current number of tasks in the queue is: 10. The current number of threads in the thread pool is: 10. The number of threads in the thread pool is: 10. The number of tasks in the queue is: 4. The number of threads in the thread pool is: 10, and the number of tasks in the queue is: 5Copy the code

You can see that as the number of threads increases to the number of cores, there are no tasks in the queue. It is not until the number of threads increases to the MAX number, which is 10, that there are tasks in the queue. In line with our expectations.

If we comment out the Offer method in the TaskQueue class, that is, the offer method that does not overwrite the queue, the result is as follows:

Thread pool is now the number of threads is: 1, the queue is waiting for the number of tasks as: 0 thread pool is now the number of threads is: 2, the queue is waiting for the number of tasks as: 0 thread pool thread number is now: 3, the queue is waiting for the number of tasks as: 0 thread pool thread number is now: The number of threads in the thread pool is: 5. The number of tasks in the queue is: 1. The number of threads in the thread pool is: 5. The number of threads in the queue is: 5. The number of threads in the queue is: 5. The number of threads in the queue is: 5. The number of threads in the queue is: 7. The number of threads in the queue is: 7. The current number of threads in the pool is: 9. The current number of threads in the queue is: 10. The number of tasks in the queue is: 5Copy the code

You can see that when the number of threads increases to the number of cores, there are already tasks in the queue.


Think further


At the time of using ThreadPoolExecutor, if RejectedExecutionException happened, what should I do? The code in this article re-inserts a task attempt into the queue and, if it still fails, throws a Reject exception.

 @Override
    public void execute(Runnable command) {
        submittedTaskCount.incrementAndGet();
        try {
            super.execute(command);
        } catch (RejectedExecutionException rx) {
            / / when a RejectedExecutionException, try to cast a task to the queue again, if still happen RejectedExecutionException, directly throw an exception.
            BlockingQueue<Runnable> taskQueue = super.getQueue();
            if (taskQueue instanceof TaskQueue) {
                final TaskQueue queue = (TaskQueue)taskQueue;
                if(! queue.forceTaskIntoQueue(command)) { submittedTaskCount.decrementAndGet();throw new RejectedExecutionException("Queue full"); }}else {
                submittedTaskCount.decrementAndGet();
                throwrx; }}}Copy the code

The TaskQueue class provides the forceTaskIntoQueue method to insert tasks into the queue.

Another solution is to use another thread pool to execute the task,catch the first pool when it throws a Reject exception, and use the second pool to process the task.