An overview of the

Everyone on the back end knows about thread pools, even if you haven’t used them yourself, you’ve probably heard about them or understood them. Sometimes will go to in-depth understanding, the result is often felt that they understand, after a period of time forget. Because in the daily development, we do not need to use the thread pool, a lot of tools and frameworks are written, we directly call the interface is done.

Many things without personal practice and in-depth thinking, only reading articles and books is impossible to truly understand. I’ve read articles about thread pools many times in the past, forgotten about them and re-read them six months later, with no real understanding. Therefore, I am going to practice, since the peacetime development of the time to use, I will do a project to use.

Just do it, I chose a Redis distributed lock as a practice project, there is a regular renewal function, is to use the thread pool to run the submitted task, the key renewal, detailed not to say, want to know can see this Redis distributed lock implementation -Redisson.

In the implementation of Redis renewal function, while looking at others how to achieve a regular task, while looking at the source of the thread pool. It was as if I had opened the door to a new world of thread pool execution logic and some art of thread pool design.

As a designer, I want to take you through the process of designing and implementing a thread pool from scratch, step by step, thoroughly understanding thread pool implementation and some of the art of design.

The purpose and significance of thread pools

We need to understand that any technology is there to solve a problem. So what problem does thread pool solve? The answer is: it solves the problem of thread resource reuse.

Without a thread pool, we start a new thread to execute a task, and when the task is complete, the thread stops. If this task is repeated, it is not possible to create a new thread every time, which is a waste. Moreover, threads are scarce resources, and repeated creation and destruction is time-consuming.

With a thread pool, we can create several fixed threads. Wake up the idle thread when a task comes to deal with it. After the task is processed, the thread can continue to deal with the subsequent task. If there is no task for the time being, the thread can sleep and wake up when there is a new task. In this way, thread resources can be used more efficiently and system concurrency efficiency can be improved.

Task: Abstract unit of work

In a program, it is structured around the execution of tasks, which are usually abstract units of work. Think of an HTTP request as a task, an interaction with a database as a task, and so on. In a thread pool, we abstract what we need to process into a task unit, which simplifies the structure of the thread pool and makes it easier to model the thread pool.

Threads: Abstract workers

In a thread pool, we can think of each thread as a worker, meaning “worker”. It will constantly try to get tasks to perform, and if there are no tasks, it will sleep or do other processing.

Functional design of thread pools

So, what functions do thread pools usually have and provide? Here are the core functional requirements listed:

Opening and closing of thread pools

Thread pools, as a tool, need to have their own life cycle, which can be abstracted into three:

  • open
  • Running state
  • End state

After executing the close interface of the thread pool:

  • What about running tasks?
  • What about tasks in the task queue?
  • Can the thread pool continue to add tasks?

These are things to consider and deal with. ExecutorService in Java provides two shutdown interfaces

  • shutdown: Orderly shutdown. Submitted tasks are processed one by one, but no new tasks are accepted
  • shutdownNow: Tries to stop all executing tasks, abandons tasks waiting in the queue, and returns a list of tasks waiting to be executed

Thread construction and management

How should the threads in the thread pool be built, how should they be managed after building, whether they are fixed or dynamic. Here are a few patterns:

  1. Fixed number of threads: A fixed number of thread pools are built as soon as the pool is started and no threads are closed
  2. Dynamic build threads: No new threads are created at startup. Threads are created when tasks come in. If the number of tasks is small, no more threads are created. If the number of tasks is large, the number of threads continues to be built until the maximum number is reached.
  3. A thread with an idle period: A thread has an idle period when it is built, and when it is idle for more than that period, it will recycle. This is commonly used in database connection pooling
  4. A single thread: There is only one thread, and the tasks are executed in the chronological order of submission.

Task management

In the thread pool, a task queue is created. When there are no idle threads, new tasks are placed in the queue and wait for the thread to execute them.

Thread pools provide an interface for task execution.

In addition, many tasks will return the result of processing as a value, so tasks need to have a post-processing mechanism to do something when the task is complete. (This is where FutureTask comes in)

Task-related functions are as follows:

  • Task submission
  • Task Processing result
  • Cancellation and interruption of tasks

Construction of the thread pool model

Comb through some of the basic functions of the thread pool and to consider the point, so how is the implementation of the thread pool, how to design it. Without further ado, go straight to the picture above:

When there is a new task, check whether there is an idle thread. If there is a new task, process it directly. If there is no new task, put it into the task queue and wait for the thread to process it.

In fact, if we look at the thread pool, we can find that its logic is not complicated. What is complex is the handling of various situations, such as how to manage threads, how to handle task cancellations, how to handle thread interrupts and so on, as well as the handling of various concurrent operations.

Use code to implement simple thread pools

We then implement a fixed number of thread pools to do the following:

The interface to be provided by the thread pool

  • Task submission

The functionality to be implemented inside the thread pool

  • Implementation of task queue
  • Thread management

Let’s briefly implement the core functions of the thread pool, understand the execution logic of the thread pool, and slowly add other things later.

Create a task unit

First, the task unit to implement, directly implement the Runnable interface can be.

Of course, you can write a class that doesn’t implement the Runnable interface, but that’s not enough for a thread pool. Instead, you can implement the Runnable interface, and then you can delegate the task of implementing the interface to the thread pool.

static class Task implements Runnable{
    
        private int tag;

        public Task(int tag){
            this.tag = tag;
        }

        @Override
        public void run(a) {
            System.out.printf("Task %d starts executing \n",tag);
            System.out.printf("Task %d executing \n",tag);
            System.out.printf("Task %d completed \n",tag); }}Copy the code

Implementation of thread pools

Detailed instructions are in the comments, just read the comments

package steap1;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.locks.ReentrantLock;

public class ThreadPoolExecutor {

    // Array of worker threads
    private Worker[] workers;

    // The task block queue is thread-safe, where every operation is locked
    private BlockingQueue<Task> queue;

    // The number of current worker threads
    private int workerSize = 0;

    // The maximum number of worker threads in the thread pool
    private int poolSize;

    public ThreadPoolExecutor(int poolSize, BlockingQueue<Task> queue) {
        this.poolSize = poolSize;
        this.workers = new Worker[poolSize];
        this.queue = queue;
    }

    // Task add interface
    public void execute(Task task) {
        // If the number of threads in the thread pool is less than the maximum, threads are added
        // Otherwise, put the task in the queue
        if (workerSize < poolSize) {
            addWorker(task);
        } else {
            this.queue.add(task); }}// Add the worker thread and execute it immediately
    private synchronized void addWorker(Task task) {
        // Check whether the number of threads is less than the maximum
        if (workerSize >= poolSize) {
            this.queue.add(task);
            return;
        }

        // Build the worker and start the thread
        workers[workerSize] = new Worker(task);
        workers[workerSize].t.start();

        workerSize++;
    }

    // The actual running code
    void runWorker(Worker worker){
        Task task =(Task) worker.task;
        try {
            while (true) {// The thread is constantly fetching tasks to execute in this loop
                The queue.task() method is a thread-safe blocking method
                // If there are no tasks in the queue, all worker threads will block here, waiting to get available tasks
                if(task == null){
                    task = this.queue.take();
                }
                task.run();
                task = null; }}catch(InterruptedException e){ e.printStackTrace(); }}// The worker thread wrapper class
    private class Worker implements Runnable {
        private Runnable task;

        final Thread t;

        public Worker(Runnable task) {
            this.task = task;
            this.t = new Thread(this);
        }

        @Override
        public void run(a) {
            runWorker(this); }}/ / task
    static class Task implements Runnable {

        private int tag;

        public Task(int tag) {
            this.tag = tag;
        }

        @Override
        public void run(a) {
            System.out.printf("Task %d starts executing \n", tag);
            System.out.printf("Task %d executing \n", tag);
            System.out.printf("Task %d completed \n", tag); }}}Copy the code

Simple use

    public static void main(String[] args){
        ThreadPoolExecutor executor = new ThreadPoolExecutor(8.new LinkedBlockingQueue<>());
        for(int i=0; i<1000; i++){ executor.execute(newThreadPoolExecutor.Task(i)); }}Copy the code

The execution result

task923Start a mission923Task in progress923End Task912Start a mission912Task in progress912Perform the endCopy the code

conclusion

At this point, a simple thread pool is written, the main functions of the thread pool are realized, the whole execution process is also described in detail.

In fact, there are a lot of things not covered here, thread lifecycle management, task cancellations, thread interrupts, etc., which will be addressed in the next chapter.

The source code of the project is attached at the end. The content of this chapter is in step1.