In Java programming process, dealing with concurrent and multi-threaded environment can reflect the basic literacy of programmers. From basic synchronized to ReentrantLock and ReentrantReadWriteLock, to StampedLock, and finally distributed locks in distributed scenarios, from Thread to Thread pool, Others include CountDownLatch, CyclicBarrier, and Semaphore.

1 Thread pool application

In the actual application scenario, the application of thread pool is quite extensive. In addition to the four fixed thread pools in JDK, there are many open source frameworks to provide thread pool creation methods, such as Guava, Apache, HuTools. However, ThreadPoolExecutor is usually used to configure thread pool parameters based on actual business scenarios. In addition, in order to avoid frequent creation of the thread pool during development, the thread pool will be declared as a bean and given to Spring for management, which is convenient for the management of the thread pool and the invocation of business scenarios in the project.

ThreadPoolExecutor is used to create a thread pool:

// Thread factory
ThreadFactory factory = (runnable) ->{
    return new Thread(runnable, "demo-server-" + runnable.hashCode());
};
// Create a thread pool
ThreadPoolExecutor executor = new ThreadPoolExecutor(2.4.3000,TimeUnit.SECONDS,
        new LinkedBlockingDeque<>(300),  factory,
        RejectPolicy.CALLER_RUNS.getValue());
Copy the code

When creating a batch task, just put the task into the thread pool. If concurrency control is needed, you can pass CountDownLatch and Semaphore into the thread for access control, and close the thread pool after the task is executed. For CPU – and IO – intensive tasks, experience can be used to control the number of core threads and the maximum number of threads in the thread pool to maximize the utilization of system resources.

As shown in the figure above, assuming that there are four tasks to execute and the time taken is as shown in time line, if the execution results of each task need to be obtained, The Task class needs to inherit Callable instead of Runnable, and the method for submitting the Task needs to be changed from execute to submit to get the Future object returned by each Task. There is a problem. If we put the results of the execution into a collection and then iterate to get the results, the future.get() method blocks, so we can only get the results of the execution in the order we put the tasks. In fact, we should obtain results in the order in which tasks are completed, not in the order in which they are executed. This is where the CompletionService comes out.

class Task implements Callable {
  // omit the code
}
// Place the future into the resultList
List<Future> resultList = new ArrayList<>();
Future fut = executor.submit(new Task());
resultList.add(fut);
// Iterate through the resultList to get the result of the execution
Copy the code

The improved approach is to wrap executors with CompletionService:

CompletionService service = new ExecutorCompletionService(executor);
List<Future> resultList = new ArrayList<>();
// If you don't use list to receive results, you can loop for the number of tasks and block to get results using service.take()
Future fut = service.submit(new Task());
resultList.add(fut);
Copy the code

CompletionService can return results in the order in which the tasks were completed because it internally maintains a BlockingQueue

> completionQueue that stores completed tasks. Using this form, we can obtain results according to the order in which tasks are completed, further shortening the time and improving the efficiency of the program.

2 Asynchronous Programming

Thread pools can solve most problems in a project, but they are not a panacea. Some problems can be more complex to implement, such as the following requirements:

Using Thread pools or threads to communicate with each other in this case is already too complicated, so the CompletableFuture needs to be introduced to solve this complex problem. Take a look at the corresponding code, and then analyze and explain:

1 Different colors can represent different tasks, which can be executed in parallel.

2 Tasks must be executed in sequence.

2.1 Code Examples

CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
     log.info("Wake up time");
     log.info("Prepare breakfast");
     log.info("In the shower...");
     sleep(RandomUtil.randomInt(5.10));
     return "Wash and finish.";
 });

 CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
     sleep(RandomUtil.randomInt(1.2));
     log.info("In heated milk...");
     sleep(RandomUtil.randomInt(2.5));
     log.info("In the heated bread...");
     sleep(RandomUtil.randomInt(2.5));
     return "Breakfast done.";
 });

 CompletableFuture<String> result = f1.thenCombine(f2, (res1, res2) -> {
     String detail = StrUtil.format("{} -> {}, ready for dinner.", res1, res2);
     log.info(detail);
     sleep(RandomUtil.randomInt(1.2));
     return "Go to work";
 });
 // Get the result
 log.info(result.join());

 CompletableFuture<String> f3 =
         CompletableFuture.supplyAsync(() -> {
             int t = RandomUtil.randomInt(1.3);
             sleep(t);
             return "No. 53 Bus";
         });

 CompletableFuture<String> f4 =
         CompletableFuture.supplyAsync(() -> {
             int t = RandomUtil.randomInt(0.4);
             sleep(t);
             return "No. 89 Bus";
         });

CompletableFuture<String> f6 =
         CompletableFuture.supplyAsync(() -> {
             int t = RandomUtil.randomInt(1.2);
             sleep(t);
             return "Bus No. 80";
         });

 CompletableFuture<String> re3 = CompletableFuture.anyOf(f3, f4, f6).thenApply(res -> {
     log.info("Seat on {}", res);
     sleep(RandomUtil.randomInt(3));
     int i = RandomUtil.randomInt(10);
     if (i < 2) {
         throw new RuntimeException("Suddenly I wasn't feeling well and went to the hospital.");
     } else if (i < 4) {
         throw new RuntimeException("Suddenly I don't want to go to work. I'm traveling.");
     } else if (i < 6) {
         throw new RuntimeException("I took the bus backwards, so I asked for leave.");
     }

     return "Arrive at the office and start work.";
 }).exceptionally(ex -> {
     return  "ERROR:: " + ex.getMessage();
 }).handle((res, ex) -> {
        if (ex == null) {
             log.info("handle {}", res);
        } else {
             log.error("error {}", ex.getMessage());
        }
        return "The end";
 });

 try {
     log.info("Final result {}", re3.get());
 } catch (InterruptedException e) {
     e.printStackTrace();
 } catch (ExecutionException e) {
     e.printStackTrace();
 }

Copy the code

The final running result of the program is shown as follows, which basically meets the requirements:

In asynchronous programming, all relationships can be described as serial, parallel, and convergent. Convergent means combining serial or parallel results.

// The Consumer interface accepts an argument and does not return a result
Consumer<String> consumer = (node) -> { System.out.println(node); };
consumer.accept("Consume a parameter");

The Supplier interface returns a result without any input
Supplier<String> supplier = () -> {return "34"; }; System.out.println(supplier.get());// Function interface that takes arguments and returns a result, converting a string to a number
 Function<String,Integer> function = (node) ->{return Integer.valueOf(node)};
function.apply("34");
// There is also BiFunction BiConsumer, which simply receives two parameters
BiConsumer<String,String> consumer = (node1, node2) -> {
    System.out.println(node1 + ":" + node2);
};
consumer.accept("44"."55");
// Calculate the sum of two numbers
BiFunction<String,String,Integer> function = (node1,node2) ->{returnInteger.valueOf(node1) + Integer.valueOf(node2); }; function.apply("34"."45");

Copy the code

2.2 Serial Relationship

In asynchronous programming, all relationships can be described as serial, parallel, and convergence. Convergence refers to the combination of serial or parallel results: ThenApply, thenAccept, thenRun and thenCompose are the serial relationships expressed in CompletableFuture:

ThenApply needs to pass in a Function that takes the result of the previous step as an input parameter and returns it computed

// Produce a string asynchronously and return the result with a suffix. Produce string is an asynchronous task, and suffixing is used by the thread that produced the string. If suffixing also requires another thread, thenApplyAsync should be used.
CompletableFuture<String> temp = CompletableFuture.supplyAsync(() -> {
       return "Produce a string";
});
// 
CompletableFuture<String> result = temp.thenApply(node -> {
    return node + ": suffixed";
});
// Last output result [produce a string: add suffix]
System.out.println(result.get()); 
// The difference between XXX and xxxAsync is whether to use a new thread pool
public <U> CompletableFuture<U> thenApply(
     Function<? super T,? extends U> fn) {
     return uniApplyStage(null, fn);
}
public <U> CompletableFuture<U> thenApplyAsync(
    Function<? super T,? extends U> fn) {
    return uniApplyStage(asyncPool, fn);
}
Copy the code

ThenAccept needs to pass in a Consumer that takes the result of the previous step as an input, but doesn’t need to return the result

// Also, the asynchronous output can be thenAcceptAsync
CompletableFuture<Void> future = temp.thenAccept(node -> {
     System.out.println("Already generated ->" + node); 
});
Copy the code

ThenCompose also needs to pass in a Function that takes the result of the previous step as an input and returns the calculation

ThenApply does not require a type of value to be returned. ThenCompose must return a CompletionStage. And the CompletionStage is the parent class of the CompletableFuture, so we just return a CompletableFuture object, otherwise the same as thenApply.
public <U> CompletableFuture<U> thenCompose(
     Function<? super T, ? extends CompletionStage<U>> fn) {
     return uniComposeStage(null, fn);
}
// Asynchronously returns a CompletableFuture object
CompletableFuture<String> compose = temp.thenCompose(node ->{
    return CompletableFuture.supplyAsync(() ->{
        return node + "The results";
    });
});
Copy the code

In this example, thenApply is used to select which bus will arrive first and then take to go to work. Here, there are three buses that can arrive, so anyOf() is used. Similarly, allOf can be used to process after all the conditions are met. ThenApply is executed by an idle thread in the CompletableFuture. If another thread pool is needed to execute thenApplyAsync, which means asynchronous execution.

// Only one return is required
CompletableFuture<Object> anyOf(CompletableFuture
       ... cfs)
// All must be returned
CompletableFuture<Void> allOf(CompletableFuture
       ... cfs)
Copy the code

ThenRun requires passing in a Runnble object for asynchronous processing, which I’ll skip over here.

2.3 Parallelism

The expression of parallelism includes thenCombine, thenAcceptBoth, runAfterBoth:

ThenCombine requires passing in two parameters, a CompletionStage object and a BiFunction object, and then passing the results of both tasks to BiFunction for processing.

// thenCombine is used to combine the results of two asynchronous tasks
CompletableFuture<String> result = f1.thenCombine(f2, (res1, res2) -> {
     String detail = StrUtil.format("{} -> {}, ready for dinner.", res1, res2);
     log.info(detail);
     sleep(RandomUtil.randomInt(1.2));
     return "Go to work";
 });
Copy the code

ThenAcceptBoth and runAfterBoth should be able to guess:

ThenAcceptBoth requires passing in two parameters, a CompletionStage object and a BiConsumer object, and passing the results of both tasks to BiConsumer for processing, with no results returned.

RunAfterBoth takes two arguments, a CompletionStage object and a Runnable object, and does not accept the results of either, directly processing the asynchronous task.

2.4 Aggregation Relationship:

Aggregation relations also include AND and or relations. And can be processed only after both of them are executed, and OR can be realized only when one of them meets the conditions. We’ve already mentioned thenCombine in expressing parallelism, so here’s an introduction to applyToEither:

ThenCombine, thenAcceptBoth, runAfterBoth # The API for aggregations or applyToEither, acceptEither, runAfterEitherCopy the code

ApplyToEither requires passing in two arguments, a CompletionStage object and a Function object, and then passing any result of the first execution to Function for processing and returning the result

CompletableFuture<String> f1 = CompletableFuture.supplyAsync(()->{
    return "f1";
});
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(()->{
    return "f2";
});
CompletableFuture<String> either = f1.applyToEither(f2, (node) -> {
    return node;
});

Copy the code

AcceptEither requires passing in two arguments, a CompletionStage object and a Consumer object, and passing any result that completes first to the Consumer for processing. No result is returned

CompletableFuture<String> f1 = CompletableFuture.supplyAsync(()->{
    return "f1";
});
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(()->{
    return "f2";
});
CompletableFuture<String> either = f1.applyToEither(f2, (node) -> {
    System.out.println("The final result is: :" + node);
});

Copy the code

2.5 Exception Handling:

Exception handling can be added at any step after an asynchronous operation, intercepting the corresponding exception and wrapping it, and finally using handle or whenComplete.

// exceptionally、 whenComplete、 handle
// The difference between BiFunction and BiConsumer is whether to return the result
CompletableFuture<U> handle(
        BiFunction<? super T, Throwable, ? extends U> fn)
        
CompletableFuture<T> whenComplete(
        BiConsumer<? super T, ? super Throwable> action)        

Copy the code

2.6 Asynchronous Processing:

Finally explain CompletableFuture supplyAsync, this should be in the most began to speak, the thought is not important, finally to simple the way, this similar to functional programming, whether you need a return value difference between the two.

# supplyAsync supplyAsync = supplyAsync; # supplyAsync = supplyAsyncCopy the code