An overview of the

What is a thread pool? Let’s take a look at the various thread pool inheritance relationships and get a bird’s eye view of the whole thing. Okay

Let’s look at the top-level interface

public interface Executor {# there is only one executionvoid execute(Runnable command);
}
Copy the code

ExecutorService adds a Submit method to the Executor, which can be used to pass data to the Callable interface

public interface ExecutorService extends Executor {

    <T> Future<T> submit(Callable<T> task);
}
Copy the code

Let’s look at the source logic for the implementation class

AbstractExecutorService

Public Abstract class AbstractExecutorService implements ExecutorService { RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); } # wrap it as FutureTask and hand it to subclasses to implement public Future<? > submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; }}Copy the code

ThreadPoolExecutor

The core idea of ThreadPoolExecutor is a bunch of workers pulling tasks in a blocking queue, as simple as thatLet’s take a look at the source code, starting with the inner classWorker

# worker inheritance, like AQS, is an exclusive lock that implements Runnable and can be put into threads for executionprivate final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{# run the current worker threadfinalThread thread; Runnable firstTask; # Number of completed tasksvolatile longcompletedTasks; # to createworker
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask; This line is important because it binds itself to runnable and threadthis.thread = getThreadFactory().newThread(this); } # The run method is enabled when thread startspublic void run(a) {
        runWorker(this); } # AQS = AQSprotected boolean tryAcquire(int unused) {
        if (compareAndSetState(0.1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true; }}Copy the code

It can be seen that Worker is an AQS lock and a runnable. It is bound to a thread. When the thread is opened, it will execute the run method. So when to start the thread, continue to look at the source code. CTL ThreadPoolExecutor uses the first three bits of int 32 bits to indicate the state of the thread pool and the last 29 bits to indicate the number of workers in the thread pool. We’re using bitwise operations to make judgments

public class ThreadPoolExecutor extends AbstractExecutorService {# ThreadPoolExecutor is usedint 32One of the top3Bit to indicate the state of the thread pool, followed29Bit represents the number of workers in the thread poolprivate final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { returnrs | wc; } # create a blocking queue from the thread poolprivate finalBlockingQueue<Runnable> workQueue; # set the Workerprivate final HashSet<Worker> workers = new HashSet<Worker>();
Copy the code

Then this is classic thread pool submission logic. The number of core threads is not full of create threads, full of put queue, queue is full of create maximum threads, and full of deny policy

Public void execute(Runnable command) {if (command == null) throw new NullPointerException(); Int c = ctl.get(); If (workerCountOf(c) < corePoolSize) {if (addWorker(command, true)) return; c = ctl.get(); If (isRunning(c) &&workqueue.offer (command)) {int recheck = ctl.get(); # check the status again, if it does not run then kick out the task and reject the policy if (! isRunning(recheck) && remove(command)) reject(command); Else if (workerCountOf(recheck) == 0) addWorker(null, false); } # add queue failed to take maximum number of threads logic else if (! addWorker(command, false)) reject(command); }}Copy the code

So let’s look at how thread pools create thread addWorker methods

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for(;;) {# show the statusint c = ctl.get();
        
        intrs = runStateOf(c); Check thread pool status, if not, returnfalse
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null&&! workQueue.isEmpty()))
            return false;

        for(;;) {# get the current thread countintwc = workerCountOf(c); # failed to create worker if current thread count exceeds core or maximum thread countif (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false; You can continue to create the thread and add the state by oneif (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if(runStateOf(c) ! = rs)continue retry;
            // else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try{# create worker object, remember that worker is a lock, or a runnable, which has a threadnew Worker(firstTask);
        final Thread t = w.thread;
        if(t ! =null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try{# check thread pool status againint rs = runStateOf(ctl.get());
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null) {# if a thread in the worker is started by someone else, an exception is thrownif (t.isAlive()) // precheck that t is startable
                        throw newIllegalThreadStateException(); # add worker worker. Add (w);int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true; }}finally{ mainLock.unlock(); } # If the worker is added to the thread pool, start the thread in the worker and the worker will execute the run methodif (workerAdded) {
                t.start();
                workerStarted = true; }}}finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}
Copy the code

As you can see, the method and core logic is to create a Worker object into a thread pool, and then start a thread to execute the methods in the Worker. So let’s take a look at what the run method in Worker actually does.

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    tryGetTask () {task = getTask();while(task ! =null|| (task = getTask()) ! =null) {
            w.lock();

            if((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt();try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try{# execute task.run(); }catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally{ afterExecute(task, thrown); }}finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally{ processWorkerExit(w, completedAbruptly); Get the task from the blocking queueprivate Runnable getTask(a) {
    boolean timedOut = false; // Did the last poll() time out?

    for(;;) {# another stack of state judgments to look atint c = ctl.get();
        int rs = runStateOf(c);

        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        int wc = workerCountOf(c);
        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue; } # retrieve tasks from a blocking queuetry {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if(r ! =null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false; }}}Copy the code

At this point, the entire logic of ThreadPoolExecutor is clear. When a worker is added to the thread pool, it starts a thread, which waits to take data from the blocking queue, get the task, execute it, and continue to wait in the queue.

ScheduledThreadPoolExecutor

Let’s look at the basic use, ScheduledThreadPoolExecutor divided into delay can run a task, and timing to perform tasks

public static void main(String[] args) throws Exception {

    ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10); # timing mission executor. ScheduleAtFixedRate (() - > {System. Out. Println ("haha");
    },0.2,TimeUnit.SECONDS); Executor.schedule (()->{system.out.println ()"fsfsf");
    },5,TimeUnit.SECONDS);

    System.in.read();
}
Copy the code

The usage is very simple, it should be noted that the delay time in the task is calculated according to the start time of the task, so add a delay of 2 seconds, the task is executed for 3 seconds, then after the execution, because it has exceeded 2 seconds, the task will be executed immediately, instead of waiting for 2 seconds. Let’s take a look at the code for submitting the task, which essentially creates a ScheduledFutureTask and puts it in a queue for consumption

publicScheduledFuture<? > scheduleAtFixedRate(Runnable command,long initialDelay,
                                              long period,
                                              TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (period <= 0)
        throw newIllegalArgumentException(); ScheduledFutureTask<Void> SFT =new ScheduledFutureTask<Void>(command,
                                      null, triggerTime(initialDelay, unit), unit.toNanos(period)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; DelayedExecute (t);returnt; } # queue tasksprivate void delayedExecute(RunnableScheduledFuture
        task) {
    if (isShutdown())
        reject(task);
    else{# find the queue and put it insuper.getQueue().add(task);
        if(isShutdown() && ! canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false);
        elseensurePrestart(); }}Copy the code

Given our prior knowledge of ThreadPoolExecutor, we know that the worker is constantly listening for tasks in the queue and then pulling them down to execute them. The next step is to look at the logic behind the run method in the ScheduledFutureTask

private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> { public void run() { # Boolean periodic = isPeriodic(); if (! canRunInCurrentRunState(periodic)) cancel(false); else if (! Periodic) # if not executed, regularly performs a ended ScheduledFutureTask. Super. The run (); The else {# timing tasks, the execution will not modify the state of the FutureTask here if (ScheduledFutureTask. Super. RunAndReset ()) {# set the next execution time setNextRunTime (); ReExecutePeriodic (outerTask); } } } void reExecutePeriodic(RunnableScheduledFuture<? > task) {if (canRunInCurrentRunState(true)) {super.getQueue().add(task); if (! canRunInCurrentRunState(true) && remove(task)) task.cancel(false); else ensurePrestart(); }}}Copy the code

The core idea is the logic that the worker throws back into the queue after completing the task