CompletableFuture

  • Create an asynchronous calculation and get the result
  • Use non-blocking operations to increase swallowing
  • Design and implement asynchronous apis
  • Use synchronous apis in an asynchronous manner
  • Pipelining and merging two or more asynchronous operations
  • Handles the completion state of asynchronous operations

The status quo

It is common to wait for an SQL execution to complete before continuing to execute the next SQL, but the two SQL statements themselves are unrelated and can be executed at the same time. We want to be able to process both SQL at the same time, rather than wait for one SQL to complete and then move on to the next.

This can be extended, in many tasks, we need to perform two tasks, but the two tasks are not related to each other, we also want the two tasks to be executed at the same time, and then the execution results can be aggregated.

Future

The function of the Future

The Future submits a Callable task to a thread pool, which starts other threads in the background to execute it, and then calls the get() method to get the result

private void test(a) {
  ExecutorService executor = Executors.newCachedThreadPool();
  Future<Integer> future = executor.submit(() -> sleep(1));
  try {
    Integer integer = future.get(3, TimeUnit.SECONDS);
    System.out.println(integer);
  } catch (InterruptedException e) {
    // The current line was interrupted while waiting
    e.printStackTrace();
  } catch (ExecutionException e) {
    // Task execution exception
    e.printStackTrace();
  } catch (TimeoutException e) {
    / / timeoute.printStackTrace(); }}private int sleep(int timeout) {
  try {
    TimeUnit.SECONDS.sleep(timeout);
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
  return 1;
}
Copy the code

The problem with this approach is that if the sleep executes for more than 3 seconds, the Future will not get the result. Of course, the Future provides a parameterless GET method that can wait for results. However, it is recommended to use the get method with a timeout parameter and to define how to handle the timeout.

Features that Future does not have

  • Combine two asynchronous computations into one that is independent of each other, while the second depends on the result of the first.
  • Wait for all tasks in the Future collection to complete.
  • Just wait for the fastest finished task in the Future collection to complete and return its result.
  • Perform a Future task programmatically.
  • The Future’s completion event is handled by being notified when the Future’s completion event occurs and being able to use the result of the Future’s calculation for the next action, rather than simply blocking and waiting for the result of the action.

CompletableFuture

  • Provide asynchronous apis
  • Synchronous variation step
  • The completion event of an asynchronous operation is handled in a reactive manner

synchronous

When a method is called, the caller waits while the called is running until the called returns. The caller retrieves the value returned by the called and continues running. Even if the caller and the called are not running in the same thread, the caller still needs to wait for the called to finish before running, which is called blocking invocation.

asynchronous

The asynchronous API call returns directly, handing off the computation to another thread. The other threads return the results to the caller after they have finished executing.

Using asynchronous apis

public void test(a){
  CompletableFuture<Integer> completableFuture = new CompletableFuture<>();
  new Thread(() -> {
    int sleep = sleep(1);
    completableFuture.complete(sleep);
  }).start();
  CompletableFuture<Integer> completableFuture1 = new CompletableFuture<>();
  new Thread(() -> {
    int sleep = sleep(2);
    completableFuture1.complete(sleep);
  }).start();
  Integer integer = null;
  Integer integer1 = null;
  try {
    integer = completableFuture.get();
    integer1 = completableFuture1.get();
  } catch (InterruptedException e) {
    e.printStackTrace();
  } catch (ExecutionException e) {
    e.printStackTrace();
  }
  System.out.println(integer + "....CompletableFuture.." + integer1);

  Instant end = Instant.now();

  Duration duration = Duration.between(start, end);
  long l = duration.toMillis();
  System.err.println(l);
}	

private int sleep(int timeout) {
  try {
    TimeUnit.SECONDS.sleep(timeout);
  } catch (InterruptedException e) {
    e.printStackTrace();
  }

  return timeout;
}
Copy the code

Asynchronous processing

The problem with the above code is that if an exception occurs in a thread, how can it be detected in an external call and handled at the same time? Normally, if an exception occurs in a thread, it will be blocked directly in the thread, and eventually the thread will be killed, so the GET method will always block.

Instead of using the get() method, use the GET method with a timeout parameter and pass the exception back to the caller within the thread.

new Thread(() -> {
  try {
    int sleep = sleep(2);
    completableFuture1.complete(sleep);
  } catch (Exception e) {
    completableFuture1.completeExceptionally(e);
  }
}).start();
Copy the code

completableFuture1.completeExceptionally(e); Pass the exception, which will be caught in ExecutionException and then processed.

try {
  integer = completableFuture.get();
  integer1 = completableFuture1.get();
} catch (InterruptedException e) {
  e.printStackTrace();
} catch (ExecutionException e) {
  e.printStackTrace();
}
Copy the code

Example:

public void test(a){
  CompletableFuture<Integer> completableFuture = new CompletableFuture<>();
  new Thread(() -> {
    try {
      throw new RuntimeException("Intentionally thrown exception...");
    } catch (Exception e) {
      completableFuture.completeExceptionally(e);
    }
  }).start();
  Integer integer = null;
  try {
    integer = completableFuture.get(3, TimeUnit.SECONDS);
  } catch (InterruptedException e) {
    e.printStackTrace();
  } catch (ExecutionException e) {
    e.printStackTrace();
  } catch (TimeoutException e) {
    e.printStackTrace();
  }
  System.out.println(integer + "....CompletableFuture.." );
  Instant end = Instant.now();
  Duration duration = Duration.between(start, end);
  long l = duration.toMillis();
  System.err.println(l);
}
Copy the code

The following exceptions will be received:

Java. Util. Concurrent. ExecutionException: Java. Lang. RuntimeException: the exception thrown intentionally... at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
	at com.example.demo.me.sjl.service.UserService.test(UserService.java:92)
	at com.example.demo.me.sjl.controller.UserController.test(UserController.java:20)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at ....
Copy the code

Use factory methodsupplyAsyncCreate CompletableFuture

CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> sleep(1));
Copy the code

It is more elegant and concise than the new approach, and does not require explicit new Thread creation. By default, it is run by a thread of execution in the ForkJoinPoll pool. It also provides overloaded methods to specify executors.

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
  return asyncSupplyStage(asyncPool, supplier);
}
Copy the code

How to determine the default number of threads:

  • If the configuration the system attribute java.util.concurrent.ForkJoinPool.com mon. Parallelism is take the value, converted to int as the number of threads

    String pp = System.getProperty
                    ("java.util.concurrent.ForkJoinPool.common.parallelism");
    if(pp ! =null)
                    parallelism = Integer.parseInt(pp);
    Copy the code
  • If this value is not configured, runtime.geTruntime ().availableProcessors() is used as the number of threads

    if (parallelism < 0 && // default 1 less than #cores
                (parallelism = Runtime.getRuntime().availableProcessors() - 1) < =0)
                parallelism = 1;
    Copy the code

    Parallelism The initial value is -1

Resize the thread pool


  • Among them:? N_{cppu}? AvailableProcessors is the number of cores in the processor.getruntime ().availableProcessors


Above is a reference formula “Java Concurrent Programming Practice” (MnG.bZ / 979C)

This is the theoretical value calculated, but when we use it, we need to consider the actual situation. For example, IF I have 5 parallel tasks, I need to start 5 threads to execute them separately. If more threads are used, millions of dollars will be wasted and the effect of concurrency will not be achieved. At this point we need five threads.

When inserting a database

public void save(a){
    CompletableFuture<UserEntity> completableFuture = CompletableFuture.supplyAsync(() -> {
      UserEntity entity = UserEntity.builder()
        .id(1112)
        .userName("Scherling")
        .password("abc1213")
        .birthday("2018-08-08")
        .createUser("1")
        .createTime(LocalDateTime.now())
        .updateUser("2")
        .updateTime(LocalDateTime.now())
        .build();
      return userRepository.save(entity);
    });

    CompletableFuture<UserEntity> completableFuture1 = CompletableFuture.supplyAsync(() -> {
      UserEntity entity = UserEntity.builder()
        .id(223)
        .userName("Scherling 1")
        .password("abc12131")
        .birthday("2018-08-18")
        .createUser("11")
        .createTime(LocalDateTime.now())
        .updateUser("21")
        .updateTime(LocalDateTime.now())
        .build();
      if (true) {
        throw new RuntimeException("Intentionally thrown exception...");
      }
      return userRepository.save(entity);
    });

    System.out.println(completableFuture.join());
    System.out.println(completableFuture1.join());
}
Copy the code

The test results show that the above data is inserted into the database properly, while the following data fails to be inserted. Transactions are not rolled back.

Combine two asynchronous computations into one, depending on (thenCompose)

Combine two asynchronous computations into one that is independent of each other, while the second depends on the result of the first

public void test(a){
  CompletableFuture<Integer> compose = CompletableFuture.supplyAsync(() -> sleep(2))
    .thenCompose(
    	(x) -> CompletableFuture.supplyAsync(() -> sleep(x))
  	);
}

private int sleep(int timeout) {
  try {
    TimeUnit.SECONDS.sleep(timeout);
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
  return timeout;
}
Copy the code

From the above code, you can see that the previous return value x is used in the calculation, and the entire task runs in 4 seconds.

Combine two asynchronous computations into one, whether dependent or not (the combine)

public void test(a) {
  CompletableFuture<Integer> combine = CompletableFuture.supplyAsync(() -> sleep(2))
    .thenCombine(
    	CompletableFuture.supplyAsync(() -> sleep(1)),
    	(t1, t2) -> t1 + t2
  );
}

private int sleep(int timeout) {
  try {
    TimeUnit.SECONDS.sleep(timeout);
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
  return timeout;
}
Copy the code
  • thenCombine
  • thenCombineAsync

The arguments received by both methods are the same, but the difference is in the second argument they receive: whether the BiFunction is committed to the thread pool and executed asynchronously by another task. ThenCombine will not execute BiFunction asynchronously whereas thenCombineAsync will execute asynchronously.

When to use Async suffix methods?

When our merge method is a time-consuming one, consider using the Async suffix whenever possible.

When inserting a database

When we insert a database, if one of the two operations fails, will it be rolled back?

@Transactional(rollbackFor = Exception.class)
public void save(a) {
  CompletableFuture<UserEntity> completableFuture = CompletableFuture.supplyAsync(() -> {
    UserEntity entity = UserEntity.builder()
      .id(111)
      .userName("Scherling")
      .password("abc1213")
      .birthday("2018-08-08")
      .createUser("1")
      .createTime(LocalDateTime.now())
      .updateUser("2")
      .updateTime(LocalDateTime.now())
      .build();
    return userRepository.save(entity);
  }).thenCombine(CompletableFuture.supplyAsync(() -> {
    UserEntity entity = UserEntity.builder()
      .id(222)
      .userName("Scherling 1")
      .password("abc12131")
      .birthday("2018-08-18")
      .createUser("11")
      .createTime(LocalDateTime.now())
      .updateUser("21")
      .updateTime(LocalDateTime.now())
      .build();
    return userRepository.save(entity);
  }), (a, b) -> {
    System.out.println(a);
    System.out.println(b);
    return a;
  });

  UserEntity join = completableFuture.join();
  System.out.println(join);
}

Copy the code

When tested, the second task throws an exception that is rolled back.

Completion event for the CompletableFuture

Java 8’s CompletableFuture provides this functionality through the thenAccept method, which accepts the return value of the CompletableFuture as an argument.

public void test(a) {
  CompletableFuture.supplyAsync(() -> sleep(2))
    .thenCombineAsync(
    CompletableFuture.supplyAsync(() -> sleep(1)),
    (t1, t2) -> t1 + t2
  ).thenAccept((t) -> System.out.println(t + "-- -- -- -- -- -"));
}

private int sleep(int timeout) {
  try {
    TimeUnit.SECONDS.sleep(timeout);
  } catch (InterruptedException e) {
    e.printStackTrace();
  }

  return timeout;
}
Copy the code

The thread pool (ThreadPoolExecutor)

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) ;
Copy the code
  • Int corePoolSize: The size of the core pool. This parameter has a lot to do with how thread pools are implemented, as described below. After a thread pool is created, by default, there are no threads in the pool, but instead wait for a task to arrive before creating a thread to execute the task, unless the prestartAllCoreThreads() or prestartCoreThread() methods are called. As the names of these two methods suggest, The corePoolSize thread or one thread is created before the task arrives. By default, after a thread pool is created, the number of threads in the thread pool is zero. When a task arrives, a thread is created to execute the task. When the number of threads in the thread pool reaches corePoolSize, the incoming task is placed in the cache queue.

  • Int maximumPoolSize: specifies the maximum number of threads that can be created in the thread pool.

  • Long keepAliveTime: Indicates the maximum length of time a thread can hold without executing a task before terminating. By default, keepAliveTime works only when the number of threads in the thread pool is greater than corePoolSize, until the number of threads in the thread pool is no greater than corePoolSize: That is, when the number of threads in the thread pool is greater than corePoolSize, if a thread is idle for a keepAliveTime, it terminates until the number of threads in the thread pool does not exceed corePoolSize. But if the **allowCoreThreadTimeOut(Boolean)** method is called, the keepAliveTime parameter will also work if the number of threads in the pool is not greater than corePoolSize until the number of threads in the pool is zero.

  • TimeUnit Unit: Indicates the unit of keepAliveTime

  • BlockingQueue<Runnable> workQueue: a BlockingQueue used to store tasks waiting to be executed. The choice of this parameter can have a significant impact on the running of the thread pool. Generally, there are several options for blocking queues

    • ArrayBlockingQueue

    • LinkedBlockingQueue

  • PriorityBlockingQueue

    • SynchronousQueue

    ArrayBlockingQueue and PriorityBlockingQueue are less commonly used, while LinkedBlockingQueue and Synchronous are commonly used. The queue policy of the thread pool is related to BlockingQueue.

  • ThreadFactory threadFactory

  • RejectedExecutionHandler Handler: Implements the RejectedExecutionHandler interface to customize processors