(Mobile phone landscape view source more convenient)

The problem

(1) How can a do-it-yourself thread pool support tasks with return values?

(2) What should I do if an exception is thrown during the execution of the task?

Introduction to the

In the last chapter we wrote a thread pool ourselves, but it does not support tasks with return values. Can we implement it ourselves? It must. Today we are going to implement a thread pool with a return value task.

Antecedents to review

First, let’s review the thread pool from the previous chapter:

(1) It contains four elements: number of core threads, maximum number of threads, task queue and rejection policy;

(2) It has the ability to perform tasks with no return value;

(3) It cannot handle tasks with return values;

(4) It cannot handle exceptions for task execution (exceptions in the thread are not thrown out of the thread);

So, can we implement the following two capabilities on our current basis? Let’s give it a try!

What is the difference between a task with a return value and a task with no return value?

The answer, obviously, is one with a return value and one with no return value, expressed in pseudocode as follows:

Threadpool.execute (()->{system.out.println (1); }); SomeClass result = threadPool.execute(()->{system.out.println (1); return 1; }); Object value = result.get();Copy the code

A task with no return value is submitted and done. The main thread does not Care if it completes or not, does not Care if it throws an exception. The main thread Just submits the thread to the thread pool, and does not Care about anything else.

A task with a return value is different. The main thread first submits the task to the thread pool. It needs to use the result of the task execution, so it must wait for the completion of the task to get the result of the task execution.

So why not wait for the task to complete during execute? In this case, it is no different from serial, rather than directly execute tasks in the main thread, and reduce the resource consumption of thread switching.

The reason for the two-step approach is that the main thread does not necessarily need to get the return value immediately, but only gets it when it needs it, so that you can do something more efficient between submitting the task and getting the return value.

Therefore, there is no need to block when the task is submitted, but only when the return value of GET is returned. If the task has been executed at the time of GET, there is no need to block. If the task has not been executed at the time of GET, it is necessary to block until the completion of task execution to obtain the return value.

Implementation analysis

First of all, the Runnable function interface that we use directly for tasks with no return value, is there a ready-made interface for tasks with return value? Yes, that’s the Callable interface, which has a return value.

@FunctionalInterface
public interface Callable<V> {
    V call() throws Exception;
}Copy the code

Second, you need to submit a task with a return value, which will be used in the future to retrieve the result of the task execution. In fact, it is a capability of the new task, which can be used to wrap the task with the ability to return a value.

public interface Future<T> {
    T get();
}Copy the code

Third, we need to add a new capability to the existing thread pool and define a new interface to host this capability according to the single responsibility principle.

public interface FutureExecutor extends Executor {
    <T> Future<T> submit(Callable<T> command);
}Copy the code

Then, we need a new task that has both the ability to execute the old task (the run() method) and the ability to return the value of the new task (the get() method), so we create a “future task” that wraps the submitted task with the ability to return the value.

Public class FutureTask<T> implements Runnable, Future<T> {private Callable<T> task; public FutureTask(Callable<T> task) { this.task = task; } @override public void run() { } @override public T get() { }}Copy the code

Finally, we can simply extend the existing thread pool, wrap the submitted task as “future fetch return task”, execute it the same way, and return the future task.

According to the open and closed principle, we do not make any modifications to the original code, and extend new subclasses to achieve new capabilities.

public class MyThreadPoolFutureExecutor extends MyThreadPoolExecutor implements FutureExecutor, Executor { public MyThreadPoolFutureExecutor(String name, int coreSize, int maxSize, BlockingQueue<Runnable> taskQueue, RejectPolicy rejectPolicy) { super(name, coreSize, maxSize, taskQueue, rejectPolicy); } @override public <T> Future<T> submit(Callable<T> task) {FutureTask<T> FutureTask = new FutureTask<>(task); Execute (futureTask); Return FutureTask; return FutureTask; return FutureTask; }}Copy the code

Well, now that the overall logic has been fairly clearly implemented, the most critical part remains, how the two capabilities of the “future task” will be implemented.

Future tasks

Future tasks have two abilities: the ability to perform the real task and the ability to retrieve the return value in the future.

Public class FutureTask<T> implements Runnable, Future<T> {@override public void run() { } @override public T get() { }}Copy the code

First of all, let’s make it clear that the tasks are executed in the thread pool, the return values are retrieved in the main thread, they are executed in two threads, and we can’t determine which comes first.

Second, if run() is executed before get(), we need to tell get() that the task has finished, so we need a state to notify this and a variable to hold the return value of the task execution.

/** * State of task execution, 0 not started, 1 normally completed, 2 abnormally completed */ Private AtomicInteger state = new AtomicInteger(new); private static final int NEW = 0; private static final int FINISHED = 1; private static final int EXCEPTION = 2; /** * private Object result */ private Object result */ private Object result */Copy the code

Again, if get() is executed before run(), it blocks and waits for run() to complete before getting the return value, so it needs to save the caller (the main thread), park blocks when get(), and run() completes unpark to wake it up to get the return value.

/ / private AtomicReference<Thread> caller = new AtomicReference<>();Copy the code

Then, let’s look at the logic of the run() method, which executes the actual task, changes the state to done, saves the return value of the task, and wakes up the main thread if it is saved.

@override public void run() {// If (state.get()! = NEW) { return; T r = task.call(); T r = task.call(); T r = task.call(); // CAS update state FINISHED // if the update succeeds, r is assigned to result // If the update fails, state is not NEW. If (state.compareAndSet(NEW, FINISHED)) {this.result = r; // Finish () must be placed in the modified state, as shown in finish() below; }} catch (Exception e) {// If the CAS update is successful, e is assigned to result. // If the CAS update fails, state is not NEW. If (state.compareAndSet(NEW, EXCEPTION)) {this.result = e; // Finish () must be placed in the modified state, as shown in finish() below; }}} private void finish() {// Check whether the caller is empty, if not empty, wake it up // the caller enters the blocking state when calling the get() method for (Thread c; (c = caller.get()) ! = null;) { if (caller.compareAndSet(c, null)) { LockSupport.unpark(c); }}}Copy the code

Finally, let’s look at the get() method, which blocks and waits for the task to execute if it hasn’t already. If the task has been completed, simply fetch the return value; However, there is another case where the run() method is executing while the get() method is executing, so each step in the get() method checks for changes in the value of the state.

@Override public T get() { int s = state.get(); If (s == NEW) {if (s == NEW) {Boolean marked = false; for (;;) {// retrieve the state value s = state.get(); If (s > NEW) {break; // If (s > NEW) {break; } else if (!) {// Caller CAS update ();} else if (! {// Try to update the caller thread // imagine the breakpoint stops here. The state is NEW and the run() method is executed. It does not execute the unpark() method in Finish (). If (s>NEW) {if(s>NEW) {if(s>NEW); Internal marked = caller.compareAndSet(null, thread.currentThread ()); } else {// If the breakpoint stops at this point, the state is NEW, and the run() method is executed. The caller already set the value. // Unpark () {// Unpark (); // If the thread executes the park() method, unpark() method will wake up that thread. // If the unpark() method is executed first, the next time the thread executes the park() method will not block locksupport.park (); } } } if (s == FINISHED) { return (T) result; } throw new RuntimeException((Throwable) result); }Copy the code

In our implementation, if an exception is thrown during task execution, it is returned to the main thread as result, so that the main thread gets the exception and can handle it accordingly.

Well, that’s the end of the full implementation, I don’t know if you got it.

The test case

Finally, here is the test code:

public class MyThreadPoolFutureExecutorTest { public static void main(String[] args) { FutureExecutor threadPool = new MyThreadPoolFutureExecutor("test", 2, 4, new ArrayBlockingQueue<>(6), new DiscardRejectPolicy()); List<Future<Integer>> list = new ArrayList<>(); for (int i = 0; i < 100; i++) { int num = i; Future<Integer> future = threadPool.submit(() -> { Thread.sleep(1000); System.out.println("running: " + num); return num; }); list.add(future); } for (Future<Integer> future : list) { System.out.println("runned: " + future.get()); }}}Copy the code

Running results:

thread name: core_test2 thread name: test4 thread name: test3 discard one task thread name: core_test1 discard one task ... Discard one task running: 0 running: 1 running: 8 running: 9 runned: 0 runned: 1 running: 1 Discard one task running: 0 running: 1 running: 8 running: 9 runned: 0 runned: 1 running: 4 running: 2 running: 3 running: 5 runned: 2 runned: 3 runned: 4 runned: 5 running: 6 running: 7 runned: 6 runned: 7 runned: 8 runned: 9Copy the code

conclusion

(1) The task with return value is realized by packaging it as a future task, which has both the basic execution ability and the ability to obtain return value in the future;

(2) The return value of abnormal task execution and normal task is returned to the main thread through the same return value, and the main thread determines whether it is abnormal or normal value according to the state;

(3) In our implementation, we use the single responsibility principle, open and close principle and other design principles, without causing any intrusion to the original code;

eggs

We are only going to write these two chapters about the handwritten thread pool, and then we will start to analyze the source code of the JDK native thread pool.

In addition, if you need the complete source code of the handwritten thread pool, please pay attention to my public account “Tongge read source code” and reply “MyThreadPool” (without quotation marks) in the background to get the complete source code of the handwritten thread pool. Please pay attention to the case and do not mistake oh, otherwise Tongge will not give you the source code.

Welcome to pay attention to my public number “Tong Elder brother read source code”, view more source code series articles, with Tong elder brother tour the ocean of source code.