In the last article, Xiao Bao talked about the process of thread pool instantiation and briefly introduced the state transition of thread pool. This article focuses on the minor issues I encountered when running thread pools and the understanding of the execute method source code.

4 is not a difficult Bug

The next step in our plan would be to submit a task, exploring the internal behavior of the thread pool as it executes a task, but first, I need to submit a task. So, following up on the code from the previous article, I submitted a task:

@Test
public void submitTest(a) {
    // Create a thread pool
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5.10.60, TimeUnit.SECONDS, 
        new LinkedBlockingQueue<Runnable>(), 
        new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                return newThread(); }},new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                System.out.println(Denial of service); }});// Submit a task that prints Hello after sleeping for 1 second
    threadPoolExecutor.submit(new Callable<String>() {
        @Override
        public String call(a) throws InterruptedException {
            Thread.sleep(1000L);
            System.out.println("Hello");
            return null; }}); }Copy the code

I didn’t see any output, and the program didn’t sleep for a second. It ended immediately. Oh yes, I remember, the thread we created is a daemon thread by default, and when all user threads are finished, the program ends, regardless of whether there are any daemon threads running. So let’s solve this problem with a simple solution — let the user sleep a little longer by not letting the thread end:

@Test
public void submitTest(a) throws InterruptedException {
    // Create a thread pool
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5.10.60, TimeUnit.SECONDS, 
        new LinkedBlockingQueue<Runnable>(), 
        new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                return newThread(); }},new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                System.out.println(Denial of service); }});// Submit a task that prints Hello after sleeping for 1 second
    threadPoolExecutor.submit(new Callable<String>() {
        @Override
        public String call(a) throws InterruptedException {
            Thread.sleep(1000L);
            System.out.println("Hello");
            return null; }});// Sleep the main thread for 5 seconds to prevent the daemon thread from exiting unexpectedly
    Thread.sleep(5000L);
}
Copy the code

However, the program waited five seconds and still had no output. My first reaction was that I was using thread pools incorrectly. Do you also need to call some method to “activate” or “start” the thread pool? I didn’t find a similar approach either in the documentation or in the blog examples. If we think about this Bug, there are three possible causes:

  1. ThreadPoolExecutorThere is a problem with the internal code
  2. I amThreadPoolExecutorIs not used in the right way
  3. I designThreadFactoryRejectedExecutionHandlerThere is a problem

Reason 1: It’s very unlikely. It’s almost impossible. So for reasons 2 and 3, we can’t rule them out right now, so I try to build a minimal reproducible error, strip ThreadPoolExecutor out, and see if the Bug reappears:

Minimal reproducible is the idea used by the author when I translated Developing a Simple Web Application using Rust, Part 4 CLI Option resolution. When we cannot locate the Bug, we peel out the irrelevant parts in the current code, observe whether the Bug reappears after stripping, and narrow the scope of the Bug step by step. In popular terms, it’s elimination.

private class MyThreadFactory implements ThreadFactory{
    @Override
    public Thread newThread(Runnable r) {
        return newThread(); }}@Test
public void reproducibleTest(a) throws InterruptedException {
    new MyThreadFactory().newThread(new Runnable() {
        @Override
        public void run(a) {
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("Hello");
        }
    }).start();
    Thread.sleep(5000L);
}
Copy the code

There is still no output, but this is good news, which means we have located the problem: the problem is now only possible in MyThreadFactory, so what’s wrong with 6 lines of code? I didn’t pass the Runnable r to the new Thread(). I was executing an empty Thread. Return new Thread(r); That would be a good change.

5 refactoring

The above problem seems to be simple, but can appear so low-level mistakes, worth my thinking. There are two reasons for this error:

  1. I don’t understandThreadPoolExecutorIn terms of grammarThreadFactoryOnly one implementation class needs to be passed outThreadI don’t know what an example isRunnable rIndispensable.
  2. The test code is unstructured. Even if it is a test code, it should not be written as noodles.

So I decided to refactor the test code. In this refactoring, the thread factory should produce non-daemons to prevent all threads in the thread pool from quitting unexpectedly due to the exit of the master process. Second, we must call log for each operation, we observed the thread pool to intuitive in what to do, it is worth mentioning that for blocking queue log operation, I use the way of dynamic proxy for each method call log, not familiar with the dynamic proxy’s shoes can be written before poking me small leopard take you to see the source code: the JDK dynamic proxy.

// import... Public class ThreadPoolExecutorTest {/** * record startup time */ private final static long START_TIME = system.currentTimemillis ();  /** * Custom thread factory, generate non-daemon thread, */ private class MyThreadFactory implements ThreadFactory {@override public Thread newThread(Runnable r) {Thread  thread = new Thread(r); thread.setDaemon(false);
            debug("Create thread - %s", thread.getName());
            returnthread; } /** * Custom denial of service exception handler, Print the denial of service information * / private class MyRejectedExecutionHandler implements RejectedExecutionHandler {@ Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { debug("Reject request, Runnable: %s, ThreadPoolExecutor: %s", r, executor); }} /** * Custom task, print the current thread name after 1 second sleep, */ Private class MyTask implements Callable<String> {@override public String Call () throws InterruptedException { Thread.sleep(1000L); String threadName = Thread.currentThread().getName(); debug("MyTask - %s", threadName);
            returnthreadName; }} /** * Dynamic proxy to BlockingQueue, Implements all method calls to BlockingQueue by calling Log */ private class PrintInvocationHandler implements InvocationHandler {private final BlockingQueue<? > blockingQueue; private PrintInvocationHandler(BlockingQueue<? > blockingQueue) { this.blockingQueue = blockingQueue; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { debug("BlockingQueue - %s, parameter: %s", method.getName(), Arrays.toString(args));
            Object result = method.invoke(blockingQueue, args);
            debug("BlockingQueue - %s completes execution and returns: %s", method.getName(), String.valueOf(result));
            returnresult; }} /** * generates the BlockingQueue proxy class * @param BlockingQueue The original BlockingQueue * @param <E> any type * @returnDynamic proxy BlockingQueue, Log */ @SuppressWarnings("unchecked")
    private <E> BlockingQueue<E> debugQueue(BlockingQueue<E> blockingQueue) {
        return(BlockingQueue<E>) Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class<? >[]{BlockingQueue.class}, new PrintInvocationHandler(blockingQueue)); } /** * instantiate a core pool of 3, a maximum pool of 5, and a lifetime of 20s, using the above thread pool instances of the blocking queue, thread factory, and denial of service processor * @returnReturn ThreadPoolExecutor instance */ private ThreadPoolExecutornewTestPoolInstance() {
        returnnew ThreadPoolExecutor(3, 5, 20, TimeUnit.SECONDS, debugQueue(new LinkedBlockingQueue<>()), new MyThreadFactory(), new MyRejectedExecutionHandler()); @param info @param arg/private void debug(String info, Object... arg) { long time = System.currentTimeMillis() - START_TIME; System.out.println(String.format(((double) time / 1000) +"-" + Thread.currentThread().getName() + "-"+ info, arg)); } /** * test instantiation */ private voidnewInstanceTest() { newTestPoolInstance(); } /** * test commit operation, commit 10 tasks */ private voidsubmitTest() {
        ThreadPoolExecutor threadPool = newTestPoolInstance();
        for (int i = 0; i < 10; i++) {
            threadPool.submit(new MyTask());
        }
    }

    public static void main(String[] args) {
        ThreadPoolExecutorTest test= new ThreadPoolExecutorTest(); test.submitTest(); }}Copy the code

Compile, run =>

0.047-main- create Thread- Thread-0 0.064-main- create Thread-1 0.064-main- create Thread-2 0.064-main- blockingqueue-offer [Java. Util. Concurrent. FutureTask @ 4 d7e1886] 0.064 - the main - BlockingQueue - offers complete, the return value is:true0.064 - the main - BlockingQueue - offer, parameters for: [Java. Util. Concurrent. FutureTask @ 3cd1a2f1] 0.065- main-blockingqueue-offertrue0.065 -main-blockingqueue-offer, parameter: [Java. Util. Concurrent. FutureTask @ 2 f0e140b] 0.065 - the main - BlockingQueue - offers complete, the return value is:true0.065 -main-blockingqueue-offer, parameter: [Java. Util. Concurrent. FutureTask @ 7440 e464] 0.065 - the main - BlockingQueue - offers complete, the return value is:true0.065 -main-blockingqueue-offer, parameter: [Java. Util. Concurrent. FutureTask @ 49476842] 0.065 - the main - BlockingQueue - offers complete, the return value is:true0.065 -main-blockingqueue-offer, parameter: [Java. Util. Concurrent. FutureTask @ 78308 db1] 0.065 - the main - BlockingQueue - offers complete, the return value is:true0.065 -main-blockingqueue-offer, parameter: [Java. Util. Concurrent. 27 c170f0 FutureTask @] 0.065 - the main - BlockingQueue - offers complete, the return value is:trueThe following table describes the functions of the mytask-1 task: 1 Null 1.65-thread 1-blockingqueue-take thread0-blockingqueue-take thread0-blockingqueue-take Null 1.65-thread - 2-blockingqueue-take null 1.65-thread - 0-blockingqueue-take java.util.concurrent.FutureTask@3cd1a2f1 1.065-Thread- 2-blockingqueue-take Java. Util. Concurrent. FutureTask @ 2 f0e140b - Thread - 1-1.065 BlockingQueue - take complete, the return value is: Java. Util. Concurrent. FutureTask @ 4 d7e1886 - Thread - 1-2.065 MyTask - 2.065 - Thread Thread - 1-2 - MyTask - Thread - 2 Null 2.065-thread 2-blockingqueue-take 2.065-thread 2-blockingqueue-take 2.065-thread 0-mytask-0 2.065-thread 1-blockingqueue-take 2.065-thread 2-blockingqueue-take 2.065-thread 0-mytask-0 2.065-thread 1-blockingqueue-take 2.065-thread 2-blockingqueue-take Null 2.065-Thread-0- blockingqueue-take null 2.065-Thread-0- blockingqueue-take Java. Util. Concurrent. E464 FutureTask @ 7440 - Thread - 2-2.065 BlockingQueue - take complete, the return value is: Java. Util. Concurrent. FutureTask @ 49476842 - Thread - 0-2.065 BlockingQueue - take complete, the return value is: Java. Util. Concurrent. Db1 FutureTask @ 78308 - Thread - 1-3.066 MyTask - 3.066 - Thread Thread - 1-2 - MyTask - Thread - 2 3.066-thread-0-mytask-0 3.066-thread-2-blockingqueue-take, null 3.066-thread-1-blockingqueue-take, null 3.066-thread-1-blockingqueue-take Null 3.066-thread -0- blockingqueue-take null 3.066-thread -2- blockingqueue-take Java. Util. Concurrent. 27 c170f0 FutureTask @ 4.067 Thread - 2 - MyTask - 4.067 Thread Thread - 2-2 - BlockingQueue - take, parameter is nullCopy the code

The log format is: time (s) – thread name – information

From the log output, we can learn:

  • When the queue is empty and the number of threads is less than the number of core threads, submitting the task triggers thread creation and executes the task immediately
  • When the core threads are busy, the resubmitted requests are stored in a blocking queue and wait for the idle threads to execute the tasks in the queue
  • There are always only three worker threads except the main thread
  • When the queue is empty and the worker thread is still running, the worker thread will block the queuetakeMethod blocks (this can be seen in the next few lines of the log, only the call log, not the call completion log)

This leads me to a question: Why are there only three threads? Isn’t my setting “core pool 3, maximum pool 5”? Why are there only three threads working?

6 the submit tasks

The method itself is very simple. It encapsulates the incoming parameter as a RunnableFuture instance, and then calls the execute method. Here is one of submit’s multiple overloaded methods:

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}
Copy the code

So, let’s move on to the execute code:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null.false);
    }
    else if(! addWorker(command,false))
        reject(command);
}
Copy the code

Let’s first explain the addWorker method, and for the moment we only need to know a few things to understand the execute code:

  • This method is used to create a new worker thread
  • This method is thread-safe
  • The first argument to this method is the first task to be executed by the new thread, and the second argument is whether to create a new core thread
  • This method returns if the new thread was created successfullytrueOtherwise returnfalse

So let’s go back to the execute code:

To help understand this, I drew a flow chart based on the code logic:

Now I understand that non-core threads are created to process tasks only if the wait queue fails to insert (i.e., the capacity limit is reached, etc.). That is, we use the LinkedBlockingQueue queue as the wait queue, so we don’t see non-core threads being created.

As you may have noticed, the whole process is unlocked. How can you ensure concurrency security? The addWorker, remove, and workqueue. offer methods are thread-safe, and the method does not need to be locked. In fact, there is a recheck on the thread pool state in addWorker, which returns false if the creation fails.

series

  • Java thread pool (I) origination & Planning
  • Java thread pool (2) instantiation
  • Java thread pool (3) Submit tasks
  • To be continued…

Little leopard is still a junior student, little leopard hopes you can “critically” read this article, the content of this article is not correct, inappropriate place of severe criticism, little leopard is very grateful.