Executor & Overview

Executor is a top-level interface. An overview of thread pools is shown below:

Thread instances will be fetched from the core thread corePool first. If the core thread is full, the thread will be added to the work queue first. If the work queue is full, it will be allowed to continue applying until maxnumPoolSize. The RejectedExecutionHandler policy will be executed.

ThreadFactory is the factory that builds thread instances in the worker.

The benefits of using thread pools are as follows:

  • Can reuse threads, control the maximum number of concurrent.

  • Implement task thread queue cache policy and reject mechanism.

  • Implement time-related functions such as timing execution and periodic execution.

  • Isolate the threading environment.

    • For example, if two thread pools are opened for transaction service and search service, the resource consumption of transaction thread is obviously larger. Therefore, a separate thread pool is configured to separate the slower transaction services from the search services and prevent threads from interacting with each other.
Executor executor = new ExecutorSubClass(); Executor.execute (new RunnableTask1()); executor.execute(new RunnableTask2());Copy the code

The Executors class provides factory methods for Executors. ExecutorService is the default implementation of the Executor interface, and there are several ways to create threads using ExecutorService.

2 Implement thread pool in Executors

2.1 CachedThreadPool

public static void main(String[] args) { ExecutorService service = Executors.newCachedThreadPool(); for(int i = 0; i < 5; i++){ service.execute(new TestThread()); } service.shutdown(); }Copy the code

CachedThreadPool creates a thread for each task.

ExecutorService objects are created by using static Executors. This method can determine the Executor type. Calling shutDown prevents new tasks from being submitted to the ExecutorService, which exits when all tasks in the Executor are complete.

2.2 FixedThreadPool

A FixedThreadPool can start multithreading with a finite set of threads.

public static void main(String[] args) { ExecutorService service = Executors.newFixedThreadPool(5); for(int i = 0; i < 5; i++){ service.execute(new TestThread()); } service.shutdown(); }Copy the code

FixedThreadPool can pre-perform costly thread allocations at once, thus limiting the number of threads. You can save time because you don’t have to spend a fixed amount of time creating threads for each task.

2.3 SingleThreadExecutor

SingleThreadExecutor is a single-thread FixedThreadPool. If multiple tasks are submitted to SingleThreadPool at once, these tasks will be queued and all tasks will use the same thread. SingleThreadPool serializes all tasks submitted to it and maintains a hidden pending queue.

public static void main(String[] args) { ExecutorService service = Executors.newSingleThreadExecutor(); for(int i = 0; i < 5; i++){ service.execute(new TestThread()); } service.shutdown(); }Copy the code

You can use SingleThreadExecutor to ensure that there is only one task running at any one time.

2.3.1 newSingleThreadExecutor () and newFixedThreadPool (1)

Given the above introduction, the question naturally arises: why does newSingleThreadExecutor exist when we already have newFixedThreadPool?

Unlike the otherwise equivalent newFixedThreadPool(1) The returned executor is guaranteed not to be Additional threads.” There is indeed a difference between newSingleThreadExecutor and newFixedThreadPool(1) in that the thread pool returned by newSingleThreadExecutor is guaranteed not to be reconfigured (resize the thread pool, etc.)

Comparison source code:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
        0L, TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue < Runnable > ());
}
Copy the code

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
        0L, TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue < Runnable > ());
}
Copy the code

NewFixedThreadPool returns a ThreadPoolExecutor object, NewSingleThreadExecutor returned a FinalizableDelegatedExecutorService packaged ThreadPoolExecutor object, Problem out on FinalizableDelegatedExecutorService actually.

  • Properties of a FixedThreadPool of capacity 1 (capacity, etc.) can be reconfigured by forcing it to ThreadPoolExecutor;
  • SingleThreadPool is actually a FinalizableDelegatedExecutorService class of objects, such as approach to setCorePoolSize removed, and the class inherits no can configure thread pool class, It is therefore guaranteed that it cannot be configured again.

2.4 SingleThreadScheduledExrcutor

Create a single-threaded thread pool that can perform tasks periodically.

Public class TestMain {// format static SimpleDateFormat sim = new SimpleDateFormat(" YYYY-MM-DD HH: MM :ss"); Static AtomicInteger number = new AtomicInteger(); public static void main(String[] args) throws Exception { ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); for (int i = 0; i < 3; Executorservice.schedule (new Runnable() {@override public void run() {system.out.println ("第" +) IncrementAndGet () + "format(new Date()) + ")"; try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } }, 3L, TimeUnit.SECONDS); } system.out. println(" sim.format(new Date()) + ") "); }}Copy the code

2.5 ScheduledThreadPool

ScheduledThreadPoolExecutor, it can be arranged in a given delay run the command, or execute the command on a regular basis.

ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(2); scheduled.scheduleAtFixedRate(new Runnable() { @Override public void run() { loge("time:"); } }, 0, 40, TimeUnit.MILLISECONDS); //0 indicates the delay for executing the first task. 40 indicates the interval for executing each task. Timeunit. MILLISECONDS Indicates the interval for executing the first taskCopy the code

 5 ThreadPoolExecutor

5.1 an overview of the

The above mentioned methods are not recommended. Alibaba’s programming manual describes them as follows:

Thread pools cannot be created by using Executors.

Instead, through ThreadPoolExecutor,

This approach allows the writer to be more aware of the running rules of the thread pool and the risk of running out of rule resources.

The specific reasons are:

  • FixedThreadPool and singleThreadExecutor have no limit on the number of queued queues. A maximum of INTEger. MAX_VALUE is supported.
  • The maximum number of threads in cachedThreadPool and scheduledThreadPool can be integer. MAX_VALUE.
  • These methods tend to cause OOM when there are too many threads

When we go to see the source of Executors, Executors newFixedThreadPool, Executors. NewSingleThreadPool, Executors. NewCachedThreadPool And Executors. NewScheduledThreadPool method such as bottom are ThreadPoolExecutor, including periodic tasks benefited from DelayedWorkedQueue use, and the thread pool is not recommended to use. So it’s worth taking a look at ThreadPoolExecutor to customize the thread pool so you can be more specific about how the thread pool runs and avoid the risk of running out of resources.

The core argument values of ThreadPoolExecutor are the arguments it needs to pass in its construction, which are as follows:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
Copy the code

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
Copy the code

  • Int corePoolSize: The number of resident core threads in the thread pool

    • If set to 0, the thread pool is destroyed when there are no tasks.
    • If greater than 0, the number of threads in the thread pool is guaranteed to equal this value even when there are no tasks.

Note that if this value is set to a small value, threads will be created and destroyed frequently. If the value is set to a large value, system resources are wasted.

  • Int maximumPoolSize: Specifies the maximum number of threads that can be created in a thread pool

This parameter must be greater than 0 and greater than or equal to corePoolSize. This parameter is used only when there are many tasks and the task queue is full.

  • Long keepAliveTime: indicates the keepAliveTime of a thread

When the thread pool is idle and exceeds that time, the excess threads are destroyed until the number of threads in the thread pool equals corePoolSize. If maximumPoolSize is equal to corePoolSize, the thread pool does not destroy any threads when idle.

  • TimeUnit unit: unit of the keepAliveTime. It is used with keepAliveTime.
  • BlockingQueue workQueue: Queue of tasks executed by the thread pool

When all threads in the thread pool are working on tasks, new tasks are cached in the sub-task queue and queued for execution.

  • ThreadFactory ThreadFactory: The creation factory of a thread

This parameter is rarely used, and if it is not specified when a thread is created, the default on-site factory creation method is used to create the thread.

  • RejectedExecutionHandler Handler: Specifies the rejection policy of the thread pool.

This denial policy is used when a thread pool is full of tasks in the cache workQueue and no new threads can be created to handle the task.

5.2 Starting with CAPACITY initialization:

@Native public static final int SIZE = 32; private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; private static final int RUNNING = -1 << COUNT_BITS; // 1110 0000 0000 0000 0000 0000 0000 private static final int SHUTDOWN = 0 << COUNT_BITS; // 0000 0000 0000 0000 0000 0000 0000 private static final int STOP = 1 << COUNT_BITS; // 0010 0000 0000 0000 0000 0000 0000 private static final int TIDYING = 2 << COUNT_BITS; // 0100 0000 0000 0000 0000 0000 0000 private static final int TERMINATED = 3 << COUNT_BITS; 0110 0000 0000 0000 0000 0000 0000Copy the code

The COUNT_BITS value is 29, and the CAPACITY calculation is as follows:

The reason why we ended up with -1 is because of the following two methods: to get the running state and to get the current number of active threads:

/ / to get RUNNING state RUNNING/SHUTDOWN/STOP/TIDYING/TERMINATED private static int runStateOf (int) c {/ / - CAPACITY = 1110, 0000, 0000  0000 0000 0000 0000 0000 return c & ~CAPACITY; } // fetch the lower 29 bits, Private static int workerCountOf(int c) {// CAPACITY = 0001 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 return c & CAPACITY; }Copy the code

A sequence in which the first three bits are 0 and the last 29 bits are 1 makes it very easy to extract the first three bits and the last 29 bits of C, namely runState and workerCount, are stored in a variable called CTL.

5.3 Thread pool running status and number of active threads

The RUNNING state can be switched to SHUTDOWN or STOP as follows:

SHUTDOWN and STOP are intermediate states, and eventually TIDYING occurs when all tasks are stopped.

5.4 Constructor of thread pool above

5.5 the execute

The execution process is shown as follows:

Simplified version of the flow chart:

The corresponding annotated source code is as follows:

public void execute(Runnable command) { if (command == null) throw new NullPointerException(); */ int c = ctl.get(); */ if (workerCountOf(c) < corePoolSize) {// workerCountOf(c) : Command: specifies the Runnable thread to execute. True: Specifies whether the number of active threads is < corePoolSize * false when a thread is added. < maximumPoolSize */ if (addWorker(command, true)) return; C = ctl.get(); c = ctl.get(); } /** Step 2: If the current thread pool is running and the task was successfully added to the queue (i.e., case2: If workCount >= corePoolSize) */ if (isRunning(c) &&workqueue.offer (command)) {// add command to workQueue  recheck = ctl.get(); // Check again to see if the current thread pool is running. If not, remove the command that was just added to the workQueue. isRunning(recheck) && remove(command)) reject(command); Else if (workerCountOf(recheck) == 0){// If (workerCountOf(recheck) == 0){// If (workerCountOf(recheck) == 0){ * * If workerCountOf(recheck) > 0, the command in the queue will exit the queue later and execute */ addWorker(null, false); } /** * step 3: If one of the following two conditions is met, the third step is to enter the statement * case1: the thread pool is not running, that is: isRunning (c) ==false * case2: WorkCount >=corePoolSize and failed to add workQueue. WorkQueue. Offer (command) == false * If workCount < maximumPoolSize, create thread; Otherwise, enter the method body, reject(command) * compare the number of core threads if true */} else if (! addWorker(command, false)) reject(command); // Execute reject policy for thread creation failure}Copy the code

5.5.1 addWorker

An important method often used in execute is addWorker(), which has the effect of adding a new thread to the thread pool.

Private Boolean addWorker(Runnable firstTask, Boolean core) {Retry: // Step 1: Try adding workerCount + 1 for (;) { int c = ctl.get(); RunState int rs = runStateOf(c); * case one:rs==RUNNING * case two:rs==SHUTDOWN && firstTask ==null&&! Workqueue.isempty () */ if (rs >= SHUTDOWN && // not RUNNING state. If the thread pool is SHUTDOWN, the thread pool will not receive any new tasks, so: * case1: if firstTask! =nul1, it means to add a new task, then: new worker fails, false is returned. * case2: If firstTask==null and workQueue is empty, the task in the queue has been processed and no new task needs to be added. If the new worker fails, return false */! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; /** * try to set workerCount + 1 */ for (;;) Int wc = workerCountOf(c); /** * case1: greater than or equal to the maximum thread capacity, i.e. 536870911 * case2: when core is true; > = core number of threads * when the core is false: > = maximum number of threads * / if (wc > = CAPACITY | | wc > = (core? corePoolSize : maximumPoolSize)) return false; / / the current working threads add 1 if (compareAndIncrementWorkerCount (c)) break retry. C = ctl.get(); // Add 1 successfully to exit the retry for loop c = ctl.get(); RunState if (runStateOf(c)! RunStateOf (c)! RunStateOf (c)! = rs) continue retry; WorkerStarted = false; // workerStarted = false; // workerStarted = false; Thread.start Boolean workerAdded = false; // The new worker instance has been added to the thread pool. // AQS.Worker try {// create a Worker instance, each Worker object will create a thread w = new Worker(firstTask); Final Thread t = w.htread; if (t ! = null) {// Final ReentrantLock mainLock = this.mainLock; mainLock.lock(); RunStatus int rs = runStateOf(ctl.get()); /** * Add threads to the thread pool if any of the following conditions are met: * case1: The thread pool state is RUNNING. * case2: the thread pool state is SHUTDOWN and firstTask is empty. * / if (rs < SHUTDOWN | | / / rs = RUNNING only meet (rs = = SHUTDOWN && firstTask = = null)) {/ / thread pool closes, To null if the incoming thread tasks (t.i sAlive ()) / / because t is the thread of the new building, also has not started, so if is the state of being alive, that has already been launched throw new IllegalThreadStateException (); workers.add(w); Int s = worker.size (); If (s > largestPoolSize) // largestPoolSize records the maximum number of threads in the thread pool. LargestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); // Start thread workerStarted = true; } } } finally { if (! WorkerStarted) // If the thread is not started, addWorkerFailed(w); } return workerStarted; }Copy the code

AddWorker () takes the first thread in the current queue and starts it by calling the start() method

Where thread T is obtained by the following code

 

Look at the Worker constructor and create a thread using the getThreadFactory factory:

Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
Copy the code

The run method in the thread calls the runWorker method, as shown in green above

/** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }
Copy the code

The runWorker and tryTerminate methods in the figure above are highlighted in green.

5.5.2 tryTerminate *

5.5.3 runWorker *

The execution process is as follows:

final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task ! = null || (task = getTask()) ! = null) { w.lock(); /** * If the following two conditions are met, TERMINATED: wt.interrupt() * 1> threads TERMINATED are in the STOP, TIDYING, TERMINATED state or TERMINATED state. The threads TERMINATED are TERMINATED and TERMINATED */ if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); }}Copy the code

At the heart of the runWorker method is a call to the getTask method, shown in the green box above.

5.5.4 getTask *

Private Runnable getTask() {// indicates whether the task from the blocking queue timedOut last time Boolean timedOut = false; for (;;) { int c = ctl.get(); int rs = runStateOf(c); /** * The number of working threads in the thread pool is TERMINATED by 1 and null * 1> rs >= SHUTDOWN is returned. The thread pool is not in the RUNNING state. They have in common is also not receive new task 】 【 【 don't handle the threads in the workQueue tasks or blocking queue workQueue is empty * / if (rs > = SHUTDOWN && (rs > = STOP | | workQueue. IsEmpty ())) { decrementWorkerCount(); // Number of working threads in the pool - 1 return null; } int wc = workerCountOf(c); If allowCoreThreadTimeOut is set to true or the number of active threads is greater than the number of core threads, timeout is required. // allowCoreThreadTimeOut defaults to false. Indicates the core threads do not allow the timeout Boolean timed = allowCoreThreadTimeOut | | wc > corePoolSize; * case1: the number of active threads is greater than the maximum number of threads, or timed = true and the task was fetched from the blocking queue last time * case2: If the number of valid threads is greater than 1, or the blocking queue is empty. */ if ((wc > maximumPoolSize) Led to the narrow maximum number of threads | | (timed && timedOut)) && (wc > 1 | | workQueue. IsEmpty ())) {if (compareAndDecrementWorkerCount (c)) / / Number of working threads in the thread pool -1 return NULL; // If -1 fails, retry continue; Runnable r = timed? Runnable r = timed? Runnable r = timed? Poll (keepAliveTime, timeunit.nanoseconds) : // pol1--> Null workqueue.take (); // take--> if queue is empty, block, wait for element if (r! = null) return r; // if r=nul1, timeOut is set to true. TimedOut = true; } catch (InterruptedException retry) { timedOut = false; }}}Copy the code

 7 The execution method of ThreadPoolExecutor

There are two execution methods, execute() and submit(). The main difference is that the submit() method can accept the return value of the thread pool execution, while execute() cannot accept the return value.

Sample code:

public static void main(String[] args) throws Exception{ ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 10, 10L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(20)); Executor.execute (new Runnable() {@override public void run() {system.out.println (" I am an execute"); }}); Future<String> Future = executor.submit(new Callable<String>() {@override public String call() throws Exception {return "I am a submit"; }}); System.out.println(future.get()); }Copy the code

Another difference is that the execute() method belongs to the top-level Executor interface, while submit() belongs to the subclass ExecutorService interface.

Reject policy for the thread

When the task queue in the thread pool is full and another task is added, the system checks whether the number of threads in the current thread pool is greater than or equal to the maximum number of threads in the thread pool. If yes, the system triggers the reject policy of the thread pool.

There are four built-in rejection policies:

  • AbortPolicy: Termination policy. The thread pool throws an exception and terminates execution. This is the default policy
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString());  }Copy the code

  • CallerRunsPolicy: Assigns the task to the current thread
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
Copy the code

  • DiscardPolicy: Discards incoming tasks
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
Copy the code

  • DiscardOldestPolicy: Discards the earliest task (the first task added to the queue)
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
Copy the code

8.1 User-defined Rejection Policy

To customize the rejection policy, create a RejectedExecutionHandler object and override the rejectedExecution method.

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 1, 3, 10L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(2) , new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {system.out.println (" I am a custom reject policy "); }}); for (int i = 0; i < 6; i++) { threadPoolExecutor.execute(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName()); }}); }Copy the code

9 the interview questions

If the current size of the thread pool does not reach the base size (poolSize < corePoolSize), add a new thread to handle the newly submitted task.

2. If the current size has reached the basic size, the new submitted task is submitted to the blocking queue queue, waiting for processing workqueue. offer(command);

If the poolSize of the queue is not maximumPoolSize, create a new thread to process the task.

4. If the queue is full and the current number of threads has reached the upper limit, it means that the capacity of the thread pool has reached its limit and new tasks need to be rejected. How to reject new tasks depends on the saturation policy of the thread pool RejectedExecutionHandler.

CorePoolSize =10, maximumPoolSize=10, queueSize =10

20 concurrent tasks coming in, how many active threads?

10. CorePoolSize = maximumPoolSize Specifies the length of the thread pool. CorePoolSize is full first and queueSize is full

How many threads are in the queue?

10. CorePoolSize is full and queueSize is full.

What if there are 21 concurrent queues coming through?

CorePoolSize is full, queueSize is full, and the discard policy is discarded.

CorePoolSize =10, maximumPoolSize=20, queueSize =10?

20 concurrent tasks coming in, how many active threads?

10. CorePoolSize is full and queueSize is full

21 concurrent tasks coming in, how many active threads?

11. QueueSize = 20, corePoolSize + 1 = 11

30 concurrent tasks coming in, how many active threads?

20. CorePoolSize is full, queueSize is full, corePoolSize is expanded to 20, and there are 20 active tasks.

31 concurrent tasks coming in, how many active threads?

20. CorePoolSize is full, queueSize is full, corePoolSize is expanded to 20 or more, and there are 20 active tasks if the discard policy is used.