Mp.weixin.qq.com/s?src=11&ti…

preface

I thought the thread pool is quite simple (usually used, but also analyzed the principle), this time I want to write a thread pool to understand it more deeply; However, in the process of writing, it was not as easy as I thought when I landed on the details. Combined with the source code comparison does have to admire DougLea.

I think most people just go to see a Java. Util. Concurrent. The source of ThreadPoolExecutor is watching a about, because it involves many details, also some content of AQS, so want to analyze the details is not so easy.

Rather than one by one analysis of the source code as their own implementation of a simple version, of course, the simple version does not mean the lack of functionality, the need to ensure that the core logic is consistent.

So that’s the purpose of this article:

Write a thread pool and learn how it works and how to use it in your work.

If you are not familiar with thread pools, please read these articles:

Here I’ve taken some excerpts, maybe for foreshadowing.

Creating a thread pool

Now to get down to business, we create a new class, CustomThreadPool, which works like this:

Simply put, you drop tasks into a thread pool, and the lost tasks are cached in a queue; A Thread pool stores threads that are constantly fetching tasks from the buffered queue.

The process is still pretty simple.

Let’s take a look at the effect of our own thread pool:

Initialize a thread pool with a core of 3, maximum number of threads of 5, and queue size of 4.

10 tasks are dropped, and since the blocking queue size is 4 and the maximum number of threads is 5, 5 threads will be created because there is no buffer in the queue (upper limit).

After a period of time when no work has been submitted (sleep), it is automatically scaled down to three threads (no less than the number of core threads).

The constructor

Let’s see how this works.

Here is the thread pool constructor:

There will be the following core parameters:

  • MiniSize Minimum number of threads, equivalent to the number of core threads in a ThreadPool.

  • MaxSize Maximum number of threads.

  • KeepAliveTime keepAliveTime of a thread.

  • WorkQueue Blocking queue.

  • Notify Indicates the notify interface.

These are roughly the same parameters as in ThreadPool and have similar effects.

Note that a workers member variable is initialized:

/** ** private volatile Set<Worker> workers; public CustomThreadPool(int miniSize, int maxSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, Notify notify) { workers = new ConcurrentHashSet<>(); }Copy the code

Workers is the thread that ultimately holds the thread running in the thread pool. In the J.U.C source code, workers is a HashSet, so all its operations need to be locked.

I’ve defined my own thread-safe Set called ConcurrentHashSet for simplicity.

In fact, the principle is very simple, similar to the HashSet is also based on HashMap to store data, using the key of the non-repeatable feature to implement the set, but here HashMap is implemented using ConcurrentHashMap concurrency security.

This ensures that writes and deletions to it are thread-safe.

However, since the size() function of ConcurrentHashMap is not accurate, I use a separate AtomicInteger here to count the container size.

Creating a core thread

There are a lot of things to do when you drop a task into a thread pool, but the most important thing is to create a thread to store in the thread pool.

Of course we can’t create threads indefinitely, otherwise there’s no point in using a thread pool. So miniSize maxSize makes sense.

But where do these two parameters come into play? That’s the first thing to be clear about.

It can be seen from the flow chart that the first step is to determine whether the number of core threads is greater than, if not, create.

In combination with the code, you can find that when executing a task, it will determine whether the number of core threads is greater than the number of threads, thus creating a thread.

The worker.starttask () task execution part will be analyzed later.

MiniSize here also uses the volatile keyword for visibility because it can be used in multithreaded scenarios.

The queue buffer

Given the above flowchart, the second step is naturally to determine if the queue is ready to hold tasks (if it is full).

Priority will be placed in the queue.

Up to the top

If the write fails, the current thread pool size is determined to be greater than the maximum number of threads, if not, continue to create thread execution.

Otherwise, the execution attempts to block the write queue (where J.U.C enforces a reject policy)

The above steps are the same as the flow chart just now, so do you see any pits?

Always be careful

As you can see from these two steps in the flowchart above, new threads are created directly.

This process is very expensive compared to writing directly to the blocking queue in the middle for two main reasons:

  • The thread being created will be locked, although it will eventually use the write function ConcurrentHashMap, but there is still the possibility of locking.

  • A new thread is created, which also requires calling the OPERATING system’s API.

So ideally we should avoid these two steps and try to get tasks thrown into the thread pool into the blocking queue.

Perform a task

The task is added, how does that work?

The worker.starttask () function was mentioned when creating the task:

Private void addWorker(runnable runnable) {Worker Worker = new Worker(runnable, runnable) {Worker Worker = new Worker(runnable, runnable, runnable)true);        worker.startTask();        workers.add(worker);    }Copy the code

That is, when the thread is created to execute the task, the Worker object will be created and its startTask() method will be used to execute the task.

So let’s see what the Worker object looks like:

In fact, it is also a thread, which receives tasks to be executed in the member variable task.

The most critical step is the worker.starttask () step.

    public void startTask() {        thread.start();    }Copy the code

In fact, it runs the worker thread itself, so let’s look at the run method.

  • The first step is to fetch the task execution (task.run) that was passed in when the thread was created, and then it will continue fetching the task execution from the queue until there are no more new tasks.

  • After the completion of the task, the built-in counter -1 will facilitate the notification of the completion of all subsequent tasks.

  • The worker thread exits after failing to get the task and needs to release itself from the thread pool (worker.remove (this)).

Get the task from the queue

GetTask is also a key method that encapsulates fetching tasks from a queue while reclaiming threads that don’t need to be saved.

Obviously, the core function is to get tasks from the queue; But there are two caveats:

  • When the number of threads exceeds the number of core threads, the task needs to be obtained from the queue through the keepalive time. The queue must be empty if the task cannot be obtainednullAnd then in the aboverun()Will exit the thread; This allows the thread to be recycled, which is what we demonstrated earlier

  • The reason for locking is that there is bound to be concurrency, and not locking causes the worker.size ()>miniSize condition to be executed multiple times, causing the thread to be completely reclaimed.

Closing the thread pool

Finally, thread closure;

Again, if we do not close the thread after submitting the task, we will see that the application does not exit even after the task is completed.

Task = workqueue.take (); task= workqueue.take (); Even if the thread size is not smaller than the number of core threads.

The stack can also prove that:

There are exactly three threads left blocking here.

Threads are usually closed in the following two ways:

  • Close now: Execute the close method, regardless of the current thread pool health, directly stop all, this will result in task loss.

  • Does not accept new tasks and exits the thread pool while waiting for existing tasks to complete.

Immediately shut down

Let’s start with the first immediate shutdown:

/** * Close the thread pool immediately, which will cause job loss */ public voidshutDownNow() {        isShutDown.set(true);        tryClose(false); } /** * close the thread pool ** @param isTrytrueTrying to close --> will wait for all tasks to complete *falsePrivate void tryClose(Boolean isTry) {private void tryClose(Boolean isTry) {if(! isTry) { closeAllTask(); }else {            if(isShutDown.get() && totalTask.get() == 0) { closeAllTask(); }} /** * Close all tasks */ private voidcloseAllTask() {        for (Worker worker : workers) {            //LOGGER.info("Start closing");            worker.close();        }    }    public void close() {        thread.interrupt();    }Copy the code

As you can easily see, you end up iterating through all the worker threads in the thread pool to execute their interrupt functions one by one.

Let’s test it out:

You can see that the three tasks thrown in later are not actually executed.

Shut it down when you’re done

Normal shutdown is different:

/** * Close the thread pool after the task completes */ public voidshutdown() {        isShutDown.set(true);        tryClose(true);    }Copy the code

He will have an extra judgment here and will not interrupt the thread until all tasks have been completed.

It also tries to close the thread when it needs to recycle:

Take a look at this in action:

Recovery of the thread

Thread recycling is more or less mentioned above, but it can be summed up in the following two points:

  • Once executedshutdown/shutdownNowMethod sets the thread pool state to closed so that onlyworkerWhen a thread attempts to fetch a task from the queue, it returns null, causingworkerThe thread is reclaimed.

  • Once the size of the thread pool exceeds the number of core threads, the keepalive time is used to fetch tasks from the queue, so if it is not retrieved, it is returnednullThe collection is triggered.

But if our queue is large enough that the number of threads does not exceed the number of core threads, then no collection will be triggered.

For example, here I set the queue size to 10 so that tasks accumulate in the queue instead of creating five worker threads.

Therefore, threads 1 to 3 are always scheduling tasks repeatedly.

conclusion

This time I have implemented most of the core functions of the thread pool. I believe that once I see it and start typing it, I will have a different understanding of the thread pool.

Combined with the current content to summarize:

  • The thread pool and queue size should be designed reasonably, and tasks should be executed from the queue as much as possible.

  • Closing the thread pool with shutdownNow() method with caution will result in task loss (unless business allows it).

  • If there are many tasks and the thread execution time is short, you can increase the Keepalive value to prevent the thread from being reclaimed and reuse the thread.

Next time we will share some new thread pool features, such as:

  • Executes a thread with a return value.

  • What about exception handling?

  • How do you let me know when everything’s done?

All source code of this article:

https://github.com/crossoverJie/JCSprout/blob/master/src/main/java/com/crossoverjie/concurrent/CustomThreadPool.java