Multithreaded design can maximize the computing power of multi-core processors, improve throughput and performance. However, if threads are used freely without control, the performance of the system will be adversely affected.

Threads are lightweight compared to processes, but they still take time to create and close. If you create a thread for each small task, it is likely that the creation and destruction of the thread will take longer than the time consumed by the thread task. Secondly, threads themselves also need to occupy memory space, a large number of threads will seize precious memory resources.

Therefore, the use of threads needs to master a certain degree, and then a limited range of increasing the number of threads can improve the performance of the system, once beyond this range, a large number of threads will only drag down the whole system.

What is a thread pool

To prevent the system from constantly creating and destroying threads, we can reuse the created threads. We can use a thread pool to maintain threads. When you need to use threads, you can take any idle thread from the pool. When you’re done, you don’t have to shut them down immediately.

In short, after reusing the thread pool, the thread creator programs to get free threads from the thread pool, and closing the thread becomes returning the thread from the thread pool.

Create a thread pool

The members of the thread pool are in the java.util.concurrent package, which is at the heart of the JDK and its distribution. Where ThreadPoolExecutor represents a thread pool. By following Executors, you can obtain a thread pool with a specific function.

2.1 newFixedThreadPool () method

This method returns a thread pool with a fixed number of threads. The number of threads in this thread pool is always the same. When a new task is submitted, it is executed immediately if there are idle threads in the thread pool. If no, the new task will be temporarily stored in a task queue. When there are idle threads, the queue in the task queue will be processed.

2.2 newSingleThreadExecutor () method

This method returns a thread pool with only one thread. If unnecessary tasks are submitted to the thread pool, the tasks are stored in a task queue and executed in first-in, first-out order until the thread is idle.

2.3 newCachedThreadPool () method

This method returns a thread pool that can adjust the number of threads as needed. The number of threads in the thread pool is uncertain, but if there are free threads that can be reused, reusable threads are preferred. If all threads are working and a new task is submitted, a new thread is created to process the task. All threads will return to the thread pool for reuse after completing the current task.

2.4 newSingleThreadScheduledExecutor () method

This method returns a ScheduledExecutorService object with a thread pool size of 1. ScheduledExecutorService interface The ScheduledExecutorService interface extends the function of executing a task at a given time, such as after a fixed delay, or periodically.

2.5 newScheduledThreadPool () method

This method returns a ScheduledExecutorService object, but the thread pool can execute the number of threads.

Create a thread pool of fixed size

public class ThreadPoolThread {

    public static class MyTask implements Runnable{

        @Override
        public void run() {
            System.out.println(System.currentTimeMillis() + ":Thread ID: " + Thread.currentThread().getId());
            try{
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        public static void main(String[] args) {
            MyTask myTask = new MyTask();
            ExecutorService executorService = Executors.newFixedThreadPool(5);
            for(int i = 0; i< 10 ; i++){ executorService.submit(myTask); }}}}Copy the code
1562554721820:Thread ID: 12
1562554721820:Thread ID: 15
1562554721820:Thread ID: 16
1562554721820:Thread ID: 13
1562554721820:Thread ID: 14
1562554722821:Thread ID: 15
1562554722821:Thread ID: 16
1562554722821:Thread ID: 12
1562554722821:Thread ID: 13
1562554722821:Thread ID: 14
Copy the code

Plan to perform tasks

The newScheduledThreadPool() method returns a ScheduledExecutorService object that can schedule threads as needed. The main methods are as follows

public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit); public ScheduledFuture<? > scheduleAtFixedRate(Runnablecommand,long initialDelay, long period,TimeUnit unit); public ScheduledFuture<? > scheduleWithFixedDelay(Runnablecommand,long initialDelay,long delay,TimeUnit unit);
Copy the code

Unlike other threads, ScheduledExecutorService does not necessarily schedule tasks immediately. It actually plays the role of scheduling tasks, and will schedule tasks at the specified time.

Schedule () schedules tasks once at a given time. The scheduleAtFixedRate() and scheduleWithFixedDelay() methods schedule tasks periodically, but there are differences. The scheduleAtFixedRate() method has a fixed task scheduling frequency. It starts from the time when the previous task starts to be executed and then schedules the next task at the specified time. The scheduleWithFixedDelay() method is used to schedule tasks after the completion of the above tasks.

public class ScheduleExecutorServiceDemo {

    public static void main(String[] args) {
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10);
        scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                    System.out.println(System.currentTimeMillis());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },0,2, TimeUnit.SECONDS);
    }
}
Copy the code
1562555518798
1562555520798
1562555522798
1562555524799
1562555526800
Copy the code

You can see that the task is scheduled every two seconds.

If the execution time of a task is longer than the scheduled time, the task will be invoked immediately after the completion of the previous task.

Change the code to 8 seconds

Thread.sleep(8000);
Copy the code
1562555680333
1562555688333
1562555696333
1562555704333
Copy the code

The scheduler does not actually guarantee that the task will continue to be invoked indefinitely; if the task itself throws an exception, all subsequent execution will be interrupted.

Internal implementation of thread pools

For several core thread pools, the thread pools are created using the ThreadPoolExecutor class internally, although they have different functional characteristics.

Look at a few thread pool create source code:

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
    }
    
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
    }
Copy the code

ThreadPoolExecutor (ThreadPoolExecutor, ThreadPoolExecutor) ¶

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
Copy the code
  • CorePoolSize: Specifies the number of threads in the thread pool.
  • MaximumPoolSize: Specifies the maximum number of threads in the thread pool.
  • KeepAliveTime: When the number of threads in the thread pool exceeds corePoolSize, the lifetime of the excess idle threads, i.e. how long it takes to destroy the idle threads exceeding corePoolSize.
  • Unit: keepAliveTime unit.
  • WorkQueue: A queue of submitted tasks that have not yet been executed.
  • ThreadFacotry: Thread factory, used to create threads.
  • Handler: rejects the policy. How to reject tasks when there are too many to handle.

3.1 workQueue- Task queue

The workQueue parameter refers to the queue of submitted but not executed tasks. It is an object of the BlockingQueue interface and is used only to hold Runnable objects. There are several BlockingQueue interfaces that can be used in the ThreadPoolExecutor construct:

  • Commit queue directly: provided by the SynchronousQueue object. SynchronousQueue does not have the capacity to wait for a corresponding delete for each insert or for each delete for an insert. SynchronousQueue if new tasks are always submitted to the thread, if no idle process attempts to create a new thread, and if the number of threads has reached its maximum, the policy is rejected. Using SynchronousQueue usually requires a large maximumPoolSize value, otherwise it is easy to enforce a reject policy.
  • Bounded task queues: A bounded task queue is implemented using the ArrayBlockingQueue class. The constructor of the ArrayBlockingQueue class must take a capacity parameter indicating the maximum capacity of the queue. When a bounded task queue is used, if a new task is to be executed, the thread is created first if the actual thread size of the thread pool is smaller than corePoolSize, and the task is added to the wait queue if the actual thread size is larger than corePoolSize. If the wait queue is full, a new thread is created to execute the task if the total number of threads is not larger than maximumPoolSize, or a reject policy is implemented if the total number of threads is larger than maximumPoolSize.
  • Unbounded task queues: Unbounded task queues are implemented using the LinkedBlockingQueue class. Compared with the bounded task queue, the unbounded task queue does not fail to join the queue. Use LinkedBlockingQueue When a new task needs to be executed by a thread that is smaller than corePoolSize, a new thread is created, but no longer grows after corePoolSize is reached. If a new task is added later, the queue is directly entered to wait.
  • Priority task queue: Priority task queue with execution priority. Implemented through the PriorityBlockingQueue class, you can control the order in which tasks are executed. It’s a special unbounded queue. The PriorityBlockingQueue class can execute tasks in their own priority order.

4 Rejection Strategy

A reject policy is implemented when the number of tasks exceeds the actual capacity of the system. The rejection policy can be described as a remedy when the system is overloaded.

There are four built-in rejection policies in the JDK:

  • AbortPolicy policy: This policy directly throws an exception to prevent it from working properly.
  • CallerRunsPolicy policy: This policy runs the currently discarded task directly in the current caller thread as long as the thread pool is not closed. This does not actually discard the thread, but it does degrade the performance of the task submission thread.
  • DiscardOldestPolicy policy: This policy discards the oldest request, which is a task to be executed, and attempts to commit the current task again.
  • DiscardPolicy: Discards unprocessed tasks without processing them.

The preceding policies implement the RejectedExecutionHandler interface. You can extend the RejectedExecutionHandler interface if the preceding policies cannot meet the requirements of actual development.

RejectedExecutionHandler Interface construction:

public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
Copy the code

Custom rejection policy:

Public class RejectThreadPoolDemo {public static class MyTask implements Runnable{@override public voidrun() {
            System.out.println(System.currentTimeMillis() + ": Thread ID : " + Thread.currentThread().getId());
            try{
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        MyTask myTask = new MyTask();
        ThreadPoolExecutor es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(10), Executors.privilegedThreadFactory(), new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                System.out.println(r.toString() + "Rejected"); }});for (int i = 0;i<Integer.MAX_VALUE;i++){
           es.submit(myTask);
           Thread.sleep(10);
        }
    }

}
Copy the code
1562575292467: Thread ID : 14 1562575292478: Thread ID : 15 1562575292489: Thread ID : 16 Java. Util. Concurrent. FutureTask @ b4c966a rejected Java. Util. Concurrent. FutureTask @ 2 f4d3709 rejection Java. Util. Concurrent. FutureTask @ 4 e50df2e rejectionCopy the code

5 Custom thread creation: ThreadFactory

ThreadFactory is an interface that has only one method for creating threads.

Thread newThread(Runnable r);
Copy the code

With custom thread creation we can keep track of how many threads were created in the thread pool when, custom thread names, and so on.

public class ThreadFactoryDemo {

    static volatile int i = 0;

    public static class TestTask implements Runnable{

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName());
        }
    }

    public static void main(String[] args) {
        TestTask testTask = new TestTask();
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<>(), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r,"test--" + i);
                i++;
                returnthread; }});for(int i = 0; i<5; i++){ threadPoolExecutor.submit(testTask); }}}Copy the code
test--0
test1 -test4 -test2 -test--3
Copy the code

6 Extend the thread pool

While the JDK has helped us implement stable thread pools, what if we wanted to extend thread pools to monitor, for example, the start and end times of task execution.

ThreadPoolExecutor is an extensible thread pool that provides beforExecutor(),afterExecutor() and terminated() interfaces to expand it.

public class ThreadFactoryDemo {

    static volatile int i = 0;

    public static class TestTask implements Runnable{

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName());
        }
    }

    public static void main(String[] args) {
        TestTask testTask = new TestTask();
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<>(), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r,"test--" + i);
                i++;
                return thread;
            }
        }){
            @Override
            protected void beforeExecute(Thread t,Runnable r){
                System.out.println("Task ----- Ready to execute"); }};for(int i = 0; i<5; i++){ threadPoolExecutor.submit(testTask); }}}Copy the code
Task ----- Preparing to execute task----- Preparing to executetest--2 task----- Ready to executetest--1 task----- Ready to executetest--4 task----- Ready to executetest--3
test--0
Copy the code

7 Submit and execute

7.1 the execute () method

Execute submits only one Runnable object, and the return value of this method is void. If the thread is executed, it has no relation to the main thread. You can set some variables to get the result of the thread’s execution. When a thread throws an exception during execution, the main thread will not be able to get the exception information, and only through ThreadFactory can the thread’s exception handling class be aware of the exception in the submitted thread.

7.2 sumbit () method

The sumbit() method takes three forms:

public Future<? > submit(Runnable task) {if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
Copy the code

The sumbit method returns a Future object that represents the result of the thread’s execution. When the main thread calls the Future’s GET method, it gets the result data from the thread. If an exception occurs during the execution of a thread, GET gets information about the exception.