This is the 16th day of my participation in the August More Text Challenge. For details, see:August is more challenging

The thread pool

1. ThreadPoolExecutor

  1. What is a thread pool:
    1. Two collections maintained in a thread pool: one is a collection of threads; One is a set of tasks.
  2. How to define a thread pool:
    1. The thread pool has seven parameters:
      1. CorePoolSize: Indicates the number of core threads
        1. The core thread will always exist, even if no task is executed, if no timeout is set;
        2. When the number of threads is less than the number of core threads, even if there are idle threads, threads will be created until the number of core threads is reached. When the number of threads in the thread pool reaches the corePoolSize, the incoming task is placed in the cache queue. The core thread exits when allowCoreThreadTimeout is set to true and does not exit by default.
        3. AllowCoreThreadTimeout =true (false by default) will timeout the core thread.
      2. MaximumPoolSize: Indicates the maximum number of threads in the thread pool
        1. The maximum number of threads allowed in a thread pool
        2. When the task queue is full and the number of threads is greater than or equal to the number of core threads, a new thread is created to execute the task. Until the number of threads reaches maxPoolSize. If the number of threads is equal to maxPoolSize and the task queue is full, the pool has exceeded the processing capacity, and the pool will refuse to process the task and throw an exception.
      3. KeepAliveTime: indicates the lifetime of an idle thread
        1. When the idle time of the thread reaches keepAliveTime, the thread exits (closes) until the number of threads equals the number of core threads;
        2. If allowCoreThreadTimeout=true, the thread exits until the number of threads equals zero.
      4. Unit: Unit of the duration of a spatial thread
      5. WorkQueue: indicates the workQueue
        1. After a new task is submitted, it is first added to the work queue and then removed from the queue when the task is scheduled. There are four types of work queues available in the JDK:
          1. ArrayBlockingQueue: Array-based bounded blocking queue, sorted by FIFO. As new tasks come in, they are placed at the end of the queue, and the bounded array prevents resource exhaustion. When the number of threads in the thread pool reaches the corePoolSize and a new task comes in, the task is placed at the end of the queue to be scheduled. If the queue is already full, a new thread is created, and if the number of threads has reached maxPoolSize, a rejection policy is executed.
          2. LinkedBlockingQuene: Unbounded blocking queue based on a linked list (actually interger.max), sorted by FIFO. Due to the approximate unbounded nature of the queue, when the number of threads in the pool reaches corePoolSize, new tasks will be stored in the queue and will not be created up to maxPoolSize. Therefore, maxPoolSize is not valid when using the work queue.
          3. SynchronousQuene: a blocking queue of uncached tasks placed by a producer until fetched by a consumer. That is, when a new task comes in, it is not cached, it is scheduled to execute the task directly, if no threads are available, a new thread is created, and if the number of threads reaches maxPoolSize, a rejection policy is executed.
          4. PriorityBlockingQueue: An unbounded blocking queue with a priority implemented by the Comparator parameter.
      6. ThreadFactory: threadFactory
        1. The factory used when creating a new thread can be used to set the thread name, whether it is a daemon thread, and so on
      7. Handler: rejects the policy
        1. When the work queue has reached its maximum number of tasks, and the number of threads in the thread pool has reached its maximum number, what should be done if a new task is submitted? There are four rejection policies available in the JDK:
          1. CallerRunsPolicy: in this policy, the run method of the rejected task is executed directly in the caller thread, unless the thread pool is shutdown, then the task is discarded directly.
          2. AbortPolicy: under this strategy, discarding the task directly, and throw RejectedExecutionException anomalies.
          3. DiscardPolicy: Under this policy, tasks are discarded and nothing is done.
          4. DiscardOldestPolicy: This policy discards the first queued task and then attempts to queue the rejected task

          1. Source code parsing (from:www.jianshu.com/p/a977ab670…)

1. Based on the source code entry for analysis, first look at the execute method

      public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            int c = ctl.get();
            if (workerCountOf(c) < corePoolSize) {//1. The number of threads in the current pool is less than the number of cores. Create a thread to execute the task
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            if (isRunning(c) && workQueue.offer(command)) {//2. The core pool is full, but the task queue is not full. Add the task to the queue
                int recheck = ctl.get();
                // After the task has been successfully added to the queue, check again to see if a new thread needs to be added, as existing threads may have been destroyed
                if (! isRunning(recheck) && remove(command))
                    reject(command);// If the thread pool is not running and the current task has been successfully removed from the task queue, reject the task
                else if (workerCountOf(recheck) == 0)// If the previous thread has been destroyed, create a new thread
                addWorker(null.false);
            }
            else if(! addWorker(command,false)) //3. The core pool is full, the queue is full, try to create a new thread
                reject(command); // If creating a new thread fails, the thread pool is closed or the thread pool is completely full, and the task is rejected
        }
Copy the code

2. The role of CTL

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
Copy the code

It is an atomic class that holds the number of threads and the state of the pool. An int is 32 bits. The higher 3 bits hold the state, and the lower 29 bits hold the number of threads. CtlOf (int rs,int WC) is called by default;

private static int ctlOf(int rs, int wc) { return rs | wc; }
Copy the code

RUNNING =-1 << COUNT_BITS; -1 moves 29 bits to the left. The binary of -1 is 32 1s (1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111).

-1 binary calculation method source code is 1000… 001. The high position 1 indicates the sign bit. And then you invert the original code, keeping the high order, and you get 1111… 110 Then add +1 to the inverse code, which is the complement operation, and you get 1111… 1111

So -1 << moved 29 places to the left, that is [111]; Rs | wc. 111 | 000 binary. It’s still going to be 111 and the same thing is going to be true for the other states

private static final int COUNT_BITS = Integer.SIZE - 3; / / 32-3
private static final int CAPACITY = (1 << COUNT_BITS) - 1; // Shift the binary of 1 29 bits to the right, and subtract another 1 to indicate the maximum thread capacity
// The health status is stored 3 bits higher than the int value (all values are shifted 29 bits to the left)
private static final int RUNNING = -1 << COUNT_BITS;// Receive new tasks and execute the tasks in the queue
private static final int SHUTDOWN = 0 << COUNT_BITS;// No new tasks are received, but tasks in the queue are executed
private static final int STOP = 1 << COUNT_BITS;// Do not receive new tasks, do not execute tasks in the queue, and interrupt ongoing tasks
private static final int TIDYING = 2 << COUNT_BITS; // All tasks are finished and the number of threads is zero. Pools in the state will call terminated() methods
private static final int TERMINATED = 3 << COUNT_BITS;// Terminated () method execution is complete
Copy the code

3. addWork

  1. If the number of worker threads is less than the number of core threads, addWorker is called, which, as the name implies, creates a worker thread. Let’s look at the source code implementation

  2. The source code is longer, but it does two things.

    • The loop CAS operation is used to increase the number of threads by 1.
    • Create a new thread and enable it.
        private boolean addWorker(Runnable firstTask, boolean core) {
            retry: //goto statement to avoid infinite loops
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
                // Check if queue empty only if necessary.
                /* If the thread is not running, the thread is not SHUTDOWN, the firstTask is not empty, and the workQueue is empty, return false. 2. (Second judge) The shutdown state does not accept new tasks, but the tasks that have been added to the task queue will still be executed. Therefore, when the thread pool is in shutdown state and the incoming tasks are empty and the task queue is not empty, If this condition is reversed, worker*/ is not allowed to be added
                if (rs >= SHUTDOWN &&
                        ! (rs == SHUTDOWN &&
                                firstTask == null&&! workQueue.isEmpty()))return false;
                for (;;) { / / spin
                    int wc = workerCountOf(c);// Obtain the number of Worker threads
                    // If the number of worker threads is greater than the default size or greater than the number of core threads, return false for noCan add worker again.if (wc >= CAPACITY ||
                            wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    if (compareAndIncrementWorkerCount(c))// Use cas to increase the number of worker threads. If CAS fails, retry directly
                        break retry;
                    c = ctl.get(); // re-read CTL // Obtain the value of CTL again
                    if(runStateOf(c) ! = rs)// If the thread is not equal, the thread state has changed, retry again, double check
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop}}// The above code is mainly to do atomic +1 operation on the number of workers, the following logic is to formally build a worker
    
    
            boolean workerStarted = false; // The id of whether the worker thread is started
            boolean workerAdded = false; // Whether the worker thread has added a successful identifier
            Worker w = null;
            try {
                w = new Worker(firstTask); // Build a Worker. What is the Worker? We can see that a Runnable object is passed into the constructor
                final Thread t = w.thread; // Fetch the thread from the worker object
                if(t ! =null) {
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock(); // There is a reentrant lock to avoid concurrency problems
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        int rs = runStateOf(ctl.get());
                        // The current thread pool can be added to workers only if it is in the running state [or SHUTDOWN and firstTask is empty]
                        if (rs < SHUTDOWN ||
                                (rs == SHUTDOWN && firstTask == null)) {
                            // If the task has just been encapsulated into the work, the thread is alive. There must be an exception thrown
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                            workers.add(w); // Add the newly created Worker to the workers set
                            int s = workers.size();
                            // If the number of worker threads in the collection is greater than the maximum number of threads, the maximum number of threads represents the maximum number of threads the pool has ever had
                            if (s > largestPoolSize)
                                largestPoolSize = s; // Update the maximum number of threads in the thread pool
                            workerAdded = true;// Indicates that the worker thread was created successfully}}finally {
                        mainLock.unlock(); / / releases the lock
                    }
                    if (workerAdded) {// If the worker is added successfully
                        t.start();// Start the thread
                        workerStarted = true; }}}finally {
                if (! workerStarted)
                    addWorkerFailed(w); // If the addition fails, one thing you need to do is to decrement the actual number of worker threads (remember we increased the number of worker threads initially).
            }
            return workerStarted;// Return the result
        }
    Copy the code

    We find that the addWorker method simply constructs a Worker and encapsulates the firstTask into the Worker

    private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
            private static final long serialVersionUID = 6138294804551838833L;
            /** * Thread this worker is running in. Null if factory fails. */
            final Thread thread; }}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}
            /** * Initial task to run. Possibly null. */
            Runnable firstTask; // This is the task to execute
            /** * Per-thread task counter */
            volatile long completedTasks; // Number of completed tasks for thread pool statistics
    
            Worker(Runnable firstTask) {
                setState(-1); // The initial state is -1 to prevent the thread from being interrupted before calling runWorker(), that is, before actually executing the task.
                this.firstTask = firstTask;
                this.thread = getThreadFactory().newThread(this);
            }
    
            public void run(a) {
                runWorker(this); }... }Copy the code
    1. Each worker is a thread, and it contains a firstTask, that is, the task to be executed first during initialization
    2. The runWorker() method ultimately executes the task
  3. The Worker class inherits AQS and implements the Runnable interface. Note the firstTask and Thread attributes: firstTask uses it to hold incoming tasks; Thread is the thread created with the ThreadFactory when the constructor is called. It is the thread used to process the task.

  4. When you call the constructor, you need to pass in the task, in this case by getThreadFactory().newThread(this); To create a newThread, newThread is passed this, because the Worker itself inherits the Runnable interface, that is, a thread, so a Worker object starts by calling the run method in the Worker class.

    Worker inherits AQS and uses AQS to realize exclusive locking. Why not use ReentrantLock? TryAcquire, which does not allow reentrant, but ReentrantLock, which does:

    The lock method acquies an exclusive lock, indicating that the current thread is executing a task. So it does a couple of things

    1. If the task is executing, the thread should not be interrupted;
    2. If the thread is not in the state of exclusive lock, that is, idle, it is not processing the task, then you can interrupt the thread.
    3. When the thread pool executes the shutdown or tryTerminate methods, it calls the interruptIdleWorkers method to interrupt idle threads. The interruptIdleWorkers method uses the tryLock method to determine whether a thread in the thread pool is idle;
    4. We set it to non-reentrant because we don’t want the task to re-acquire the lock when calling a thread pool control method like setCorePoolSize, which would interrupt a running thread;