The introduction

During the execution of tasks using the Java task management framework, when the task waiting queue fills up and new tasks are submitted, the saturation strategy comes into play.ThreadPollExecutorFour saturation strategies are provided:

We looked at their source code in the previous section, so this section verifies the differences.

Preparing test classes

Start by defining a MyCommand task that receives a string message:

public class MyCommand implements Runnable { private String name; public MyCommand(String name){ this.name = name; } @Override public void run() { System.out.println(Thread.currentThread().getName()+" ," + "name: "+name+","+new Date()); try { Thread.sleep(5000); } catch (InterruptedException execption) { execption.printStackTrace(); } } @Override public String toString() { return "MyCommand [name=" + name + "]"; }}Copy the code

Write a unified test class, the initial size of the thread pool is 2, and the size of the waiting queue is 2. When the submitted task is larger than 4, the fifth task will get different execution results due to different saturation strategies. We will test their behavior differences by setting up different saturation strategies.

public class RejectPolicyTest { private final ThreadPoolExecutor exec ; Public RejectPolicyTest {the exec () = new ThreadPoolExecutor (2 0 L, TimeUnit. MICROSECONDS, new LinkedBlockingQueue<Runnable>(2)); / / TODO set different saturation strategy / / exec setRejectedExecutionHandler (new ThreadPoolExecutor. CallerRunsPolicy ()); } public static void main(String[] args) { MyCommand c1 = new MyCommand("c1"); MyCommand c2 = new MyCommand("c2"); MyCommand c3 = new MyCommand("c3"); MyCommand c4 = new MyCommand("c4"); MyCommand c5 = new MyCommand("c5"); RejectPolicyTest c = new RejectPolicyTest(); c.submit(c1); c.submit(c2); c.submit(c3); c.submit(c4); c.submit(c5); } public void submit(Runnable command){ System.out.println(Thread.currentThread().getName()+" submit tast..." +command); try{ exec.submit(command); }catch(Exception e){ System.out.println("Exception when submit task:"+e.getMessage()); }}}Copy the code

Strategy 1: Notification mode rejection

AbortPolicy is the default saturation strategy, this strategy will throw unchecked exceptions RejectedExecutionException, the caller can capture this exception, then write code according to their own needs. For example, catching an exception and trying to resubmit the task is a fairly friendly strategy, at least notifying the task submitter before discarking it.

Since this is the default policy, run the test class prepared in part 1 directly and the result is as follows:

main submit tast... MyCommand [name=c1] main submit tast... MyCommand [name=c2] main submit tast... MyCommand [name=c3] main submit tast... MyCommand [name=c4] main submit tast... MyCommand [name=c5] Exception when submit task:Task java.util.concurrent.FutureTask@33909752 rejected from java.util.concurrent.ThreadPoolExecutor@55f96302[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0] pool-1-thread-2 ,name: c2,Sun Dec 22 19:10:56 CST 2019 pool-1-thread-1 ,name: c1,Sun Dec 22 19:10:56 CST 2019 pool-1-thread-2 ,name: c3,Sun Dec 22 19:11:01 CST 2019 pool-1-thread-1 ,name: c4,Sun Dec 22 19:11:01 CST 2019Copy the code

Test results: the main thread submitted after the four tasks, the queue is full, the fifth task submitted at this time, the thread pool throws RejectedExecutionException abnormalities, the main thread can capture processes the exception.

Strategy 2: Mute mode is abandoned

DiscardPolicy, accept the task silently, but do nothing, not even throw an exception, the caller has no idea what the status of the task is. Obviously, this is not conducive to mission control, so I don’t recommend using this strategy.

Modify the test class and adjust the policy:

exec.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
Copy the code

Test result: Five threads were committed, only four tasks were executed, and the last submitted task was abandoned without the caller noticing.

main submit tast... MyCommand [name=c1] main submit tast... MyCommand [name=c2] main submit tast... MyCommand [name=c3] main submit tast... MyCommand [name=c4] main submit tast... MyCommand [name=c5] pool-1-thread-2 ,name: c2,Sun Dec 22 19:12:38 CST 2019 pool-1-thread-1 ,name: c1,Sun Dec 22 19:12:38 CST 2019 pool-1-thread-2 ,name: c3,Sun Dec 22 19:12:43 CST 2019 pool-1-thread-1 ,name: c4,Sun Dec 22 19:12:43 CST 2019Copy the code

Strategy 3: Ditch the most waiting tasks

DiscardOldestPolicy Discards the longest waiting task from the queue, removes it from the queue, and then executes the current task. This policy is not applicable to the longest waiting task.

Modify the test class policy:

exec.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
Copy the code

Test result: Five threads were committed, but only four tasks were executed, and the longest waiting task, C3, was discarded without the caller knowing.

main submit tast... MyCommand [name=c1] main submit tast... MyCommand [name=c2] main submit tast... MyCommand [name=c3] main submit tast... MyCommand [name=c4] main submit tast... MyCommand [name=c5] pool-1-thread-2 ,name: c2,Sun Dec 22 19:13:10 CST 2019 pool-1-thread-1 ,name: c1,Sun Dec 22 19:13:10 CST 2019 pool-1-thread-2 ,name: c4,Sun Dec 22 19:13:15 CST 2019 pool-1-thread-1 ,name: c5,Sun Dec 22 19:13:15 CST 2019Copy the code

Strategy four: Caller executes

The CallerRunsPolicy policy provides a mediation mechanism that does not discard the task or throw an exception, but rather reverses the task execution request back to the task caller and lets the thread that submitted the task execute the task it just submitted.

exec.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
Copy the code

Test result: Five threads are committed, but only four are executed by the worker thread, and the fifth task is executed by the caller.

main submit tast... MyCommand [name=c1] main submit tast... MyCommand [name=c2] main submit tast... MyCommand [name=c3] main submit tast... MyCommand [name=c4] main submit tast... MyCommand [name=c5] pool-1-thread-2 ,name: c2,Sun Dec 22 19:18:28 CST 2019 pool-1-thread-1 ,name: c1,Sun Dec 22 19:18:28 CST 2019 main ,name: c5,Sun Dec 22 19:18:28 CST 2019 pool-1-thread-1 ,name: c3,Sun Dec 22 19:18:33 CST 2019 pool-1-thread-2 ,name: c4,Sun Dec 22 19:18:33 CST 2019Copy the code

Conclusion: The saturation strategy run by the caller implements an elastic adjustment mechanism so that when the work queue is filled, the next task to be executed is executed in the main thread of the submitted task.

During the execution of a task by the main thread, thread resources are occupied and the task cannot be submitted. This reduces the task submission rate and buys more time for the thread pool to complete the queued task.

Strategy 5: The caller limits the commit

The first four are the saturation strategies of the thread pool itself. In addition, task submission rate can also be controlled at the task submission side, that is, task submission can be limited to avoid task saturation. For example, Semaphore is used to limit the arrival rate of tasks, a synchronization utility class that can control the number of simultaneous operations accessing a particular resource.

Semaphore acquire can be used to obtain a virtual license, and if no license is available, the method’s calling thread is blocked until one is available. If the thread pool uses an unbounded queue to buffer tasks and does not control the number of tasks, memory may run out. This can be used in conjunction with Semaphore to set an upper bound on the Semaphore to control the rate at which tasks are submitted.

Using the MyCommand task from the previous chapter, in conjunction with Semaphore, an example of caller control of task submission is implemented:

/** ** @title :BoundedExecutor * @description: Semaphore control task commit rate * @since :2019-12-22 */ public class BoundedExecutor { private final Executor exec; private final Semaphore semaphore; public BoundedExecutor(Executor exec,int bound){ this.exec = exec; this.semaphore = new Semaphore(bound); } public void submitTask(final Runnable command) throws InterruptedException{ try{ semaphore.acquire(); exec.execute(new Runnable(){ @Override public void run() { try{ command.run(); }finally{system.out. println(" execute, release...") ); semaphore.release(); }}}); } the catch (RejectedExecutionException e) {System. Out. Println (" queue is full, refused to carry out "); semaphore.release(); }} public static void main(String[] args) { But Semaphore limit every time can only have two tasks are executed Executor exec. = Executors newCachedThreadPool (); BoundedExecutor b = new BoundedExecutor(exec,2); MyCommand c1 = new MyCommand("c1"); MyCommand c2 = new MyCommand("c2"); MyCommand c3 = new MyCommand("c3"); MyCommand c4 = new MyCommand("c4"); MyCommand c5 = new MyCommand("c5"); try { b.submitTask(c1); b.submitTask(c2); b.submitTask(c3); b.submitTask(c4); b.submitTask(c5); } catch (InterruptedException execption) { execption.printStackTrace(); }}}Copy the code

Task execution result:

pool-1-thread-2 ,name: c2,Mon Dec 15 16:20:17 CST 2019 pool-1-thread-1 ,name: C1,Mon Dec 15 16:20:17 CST 2019 release... Release... pool-1-thread-1 ,name: c4,Mon Dec 15 16:20:22 CST 2019 pool-1-thread-3 ,name: C3,Mon Dec 15 16:20:22 CST 2019 release... Release... Pool-1-thread-1,name: c5,Mon Dec 15 16:20:27 CST 2019 Release...Copy the code

Analysis of execution results: Using Semaphore to limit two tasks to be submitted at a time and release Semaphore permission after the task is completed can effectively control the task submission rate.

The revelation of

Only AbortPolicy and CallerRunPolicy are friendly to the task submitter, while the others will cause the task to miss execution, which is bad for the task submitter.

A compromise is to let the caller control the task submission rate and use the semaphore to control the task submission according to the configuration size of the thread pool, so that there will not be excessive task submission.