Let’s talk about the Future

Callable has roughly the same function as Runnable, but the call() function returns a value. Callable is typically used in conjunction with ExecutorService

A Future is a Future that cancels and queries the execution result of a specific Runnable or Callable task

Five methods are declared in the Future interface

  • The cancel method is used to cancel a task, returning true on success or false on failure.
  • The isCancelled method indicates whether the task was cancelled successfully or returns true if the task was cancelled before it completed normally.
  • The isDone method indicates whether the task is complete, and returns true if it is.
  • The get() method is used to get the result of the execution, which blocks and does not return until the task is finished.
  • Get (long timeout, TimeUnit Unit) is used to obtain the execution result. If no result is obtained within the specified time, null is returned.

In other words, Future provides three functions:

  • Judge whether the task is completed;
  • Can interrupt the task;
  • Obtain the task execution results.

Because a Future is just an interface, it cannot be used directly to create objects, hence FutureTask.

To two demo:

public static void futureDemo1(a) throws ExecutionException, InterruptedException {
    ThreadPoolExecutor pool = CommonThreadPool.getPool();    
    Future<Integer> f = pool.submit(() -> {      
        // Long time asynchronous computation
        Thread.sleep(2000);      
        // Then return the result
        return 100;    
    });    
    while(! f.isDone()) { System.out.println(System.currentTimeMillis() +"It's not over.");   
    }    
    // Get the result
    System.out.println(f.get());
  }
Copy the code

The Future only implements asynchron, not callback, and the main thread gets blocks, which can be polled to see if the asynchronous call is complete. In practice, it is recommended to use Guava ListenableFuture to implement asynchronous non-blocking. The purpose is to perform multiple tasks asynchronously, and obtain the execution result through callback without polling the task status.

public static void futureDemo2(a) {
    ListeningExecutorService executorService = MoreExecutors.listeningDecorator(CommonThreadPool.getPool());
    IntStream.rangeClosed(1.10).forEach(i -> {      
        ListenableFuture<Integer> listenableFuture = executorService.submit(() -> {            
            // Long time asynchronous computation
            // Thread.sleep(3000);            
            // Then return the result
            return 100;          
        });
      Futures.addCallback(listenableFuture, new FutureCallback<Integer>() {        
          @Override        
          public void onSuccess(Integer result) {          
              System.out.println("get listenable future's result with callback " + result);        
          }
          @Override        
          public void onFailure(Throwable t) {          
              t.printStackTrace();        
          }      
      }, executorService);    
    });  
}
Copy the code

CompletableFuture

Futrue is very inconvenient to obtain the results, and can only get the results of the task through blocking or polling.

In Java 8, a new class with about 50 methods, CompletableFuture, provides a very powerful extension to Future.

CompletableFuture can execute the callback in a different thread from the task, or it can execute the callback in the same thread as the task as a synchronization function that continues execution. It avoids the biggest problem with traditional callbacks, which is the ability to separate the flow of control into different event handlers.

CompletableFuture makes up for the shortcomings of the Future mode. After an asynchronous task completes, there is no need to wait to continue with the results. ThenAccept, thenApply, thenCompose, and so on can be used directly to hand the results of the previous asynchronous processing to another asynchronous event processing thread.

The following examples illustrate CompletableFuture

Asynchronous execution

/** * * public static CompletableFuture
      
        runAsync(Runnable runnable) * public static CompletableFuture
       
         runAsync(Runnable runnable, Executor executor) * public static 
         CompletableFuture supplyAsync(Supplier supplier) * public static  CompletableFuture supplyAsync(Supplier supplier, Executor Executor) * * A method with an Async ending and no Executor specified uses ForkJoinPool.commonPool() as its thread pool to execute asynchronous code. * * The runAsync method is easy to understand. It takes a Runnable functional interface type as an argument, so the CompletableFuture is null. The * * supplyAsync method takes the Supplier functional interface type as an argument, and the CompletableFuture calculates the result of type U. * /
       
      
public static void runAsyncExample(a) throws ExecutionException, InterruptedException {

    CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> {
      System.out.println("Abnormal code execution");
    });

    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
      // Long computing tasks
      return "00",;
    });

    System.out.println(future.join());

  }
Copy the code

Processing of calculation results when they are completed

/** ** We can execute specific actions when the CompletableFuture completes the calculation, or when an exception is thrown. The main method is: * * whenComplete(BiConsumer
       action) public CompletableFuture
      
        * whenCompleteAsync(BiConsumer
        action) public CompletableFuture
       
         * whenCompleteAsync(BiConsumer
         action, Executor executor) public * CompletableFuture
        
          exceptionally(Function
         
           fn) * * Methods that do not end with Async are evaluated by the original thread. Methods that end with Async are run by the default thread pool, ForkJoinPool.commonPool(), or by the specified thread pool, Executor. * Java's CompletableFuture class always follows this principle * * If you want to execute a piece of code whether the CompletableFuture is running properly or not, such as releasing resources, updating state, logging, etc., without affecting the original execution result. * Then you can use the whenComplete method. Exceptionally is very similar to catch(), and whenComplete is very similar to finally: */
         ,?>
        
       
      
  public static void whenComplete(a) throws ExecutionException, InterruptedException {

    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(new Supplier<Integer>() {
      @Override
      public Integer get(a) {
        return 2323; }}); Future<Integer> f = future.whenComplete((v, e) -> { System.out.println(v); System.out.println(e); }); System.out.println(f.get()); }Copy the code

Handle is the processing of the result when a task is completed

private static class HttpResponse {

    private final int status;
    private final String body;

    public HttpResponse(final int status, final String body) {
      this.status = status;
      this.body = body;
    }

    @Override
    public String toString(a) {
      return status + "-"+ body; }}/** * Handle is the processing of the result when the task is completed. The Handle method is basically the same as thenApply. The difference is that Handle is executed after the task is completed and can handle abnormal tasks. * public  CompletionStage handle(BiFunction
         fn); * public  CompletionStage handleAsync(BiFunction
           fn); * public  CompletionStage handleAsync(BiFunction
             fn,Executor executor); * * thenApply can only execute normal tasks. If a task is abnormal, thenApply is not executed. * public  CompletableFuture thenApply(Function
               fn) * public  CompletableFuture thenApplyAsync(Function
                 fn) * public  CompletableFuture thenApplyAsync(Function
                   fn, Executor executor) */
  public static void handle(a) throws ExecutionException, InterruptedException {

    for (final boolean failure : new boolean[] {false.true}) {

      CompletableFuture<Integer> x = CompletableFuture.supplyAsync(() -> {
        if (failure) {
          throw new RuntimeException("Oops, something went wrong");
        }
        return 42;
      });

      /** * Returns a new CompletableFuture that, when this CompletableFuture completes either normally or exceptionally, * is executed with this stage's result and exception as arguments to the supplied function. */
      CompletableFuture<HttpResponse> tryX = x
          // Note that tryX and x are of different type.
          .handle((value, ex) -> {
            if(value ! =null) {
              // We get a chance to transform the result...
              return new HttpResponse(200, value.toString());
            } else {
              // ... or return details on the error using the ExecutionException's message:
              return new HttpResponse(500, ex.getMessage()); }});// Blocks (avoid this in production code!) , and either returns the promise's value:
      System.out.println(tryX.get());
      System.out.println("isCompletedExceptionally = " + tryX.isCompletedExceptionally());

    }
Copy the code

conversion

/** * convert *@throws ExecutionException
   * @throws InterruptedException
   */
  public static void thenApply(a) throws ExecutionException, InterruptedException {
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
      return 100;
    });
    CompletableFuture<String> f =  future.thenApplyAsync(i -> i * 10).thenApply(i -> i.toString());
    / / "1000"
    System.out.println(f.get());
  }
Copy the code

Action

WhenComplete * CompletableFuture also provides a way to process the result, Void: * * public CompletableFuture
      
        thenAccept(Consumer
        action) * public CompletableFuture
       
         thenAcceptAsync(Consumer
         action) * public CompletableFuture
        
          thenAcceptAsync(Consumer
          action, Executor executor) */
        
       
      
  public static void action(a) throws ExecutionException, InterruptedException {

    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
      return 100;
    });
    CompletableFuture<Void> f =  future.thenAccept(System.out::println);
    System.out.println(f.get());

  }
Copy the code

thenAccept

/** * thenAcceptBoth and related methods provide similar functionality in that when both CompletionStages have completed their calculations properly, the provided action is executed to combine another asynchronous result. * runAfterBoth is a Runnable executed when both completionstages have completed the calculation normally, without using the result of the calculation. * * public  CompletableFuture
       
         thenAcceptBoth(CompletionStage
         other, BiConsumer
         action) * public 
         CompletableFuture
         
           thenAcceptBothAsync(CompletionStage
           other, BiConsumer
           action) * public 
           CompletableFuture
           
             thenAcceptBothAsync(CompletionStage
             other, BiConsumer
             action, Executor executor) * public CompletableFuture
            
              runAfterBoth(CompletionStage
              other, Runnable action) */
            
           
         
       
  public static void thenAcceptBoth(a) throws ExecutionException, InterruptedException {
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
      return 100;
    });
    CompletableFuture<Void> f =  future.thenAcceptBoth(CompletableFuture.completedFuture(10), (x, y) -> System.out.println(x * y));
    System.out.println(f.get());

  }
Copy the code

thenRun

/** * A Runnable is executed when the calculation is complete. Unlike thenAccept, Runnable does not use the result of the CompletableFuture calculation. * * public CompletableFuture
      
        thenRun(Runnable action) * public CompletableFuture
       
         thenRunAsync(Runnable action) * public CompletableFuture
        
          thenRunAsync(Runnable action, Executor executor) */
        
       
      
  public static void  thenRun(a) throws ExecutionException, InterruptedException {

    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
      return 100;
    });
    CompletableFuture<Void> f =  future.thenRun(() -> System.out.println("finished"));
    System.out.println(f.get());

  }

Copy the code

composite

/** * thenCombine is used to compound the result of another CompletionStage. It functions like A + * * * * + -- -- -- -- -- - > | + -- -- -- -- -- - ^ C * * * * B + two CompletionStage is executed in parallel, between them is not depend on the order, Other does not wait for the previous CompletableFuture to complete. * * public 
      
        CompletableFuture
       
         thenCombine(CompletionStage
         other, BiFunction
         fn) * public 
        
          CompletableFuture
         
           thenCombineAsync(CompletionStage
           other, BiFunction
           fn) * public 
          
            CompletableFuture
           
             thenCombineAsync(CompletionStage
             other, BiFunction
             fn, Executor Executor) * * Functionally, they are more similar to thenAcceptBoth, except that thenAcceptBoth is pure consumption. ThenCombine's function argument fn has a return value. * /
           
          ,v>
         
        ,v>
       
      ,v>
  public static void thenCombine(a) throws ExecutionException, InterruptedException {

    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
      return 100;
    });
    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
      return "abc";
    });
    CompletableFuture<String> f =  future.thenCombine(future2, (x,y) -> y + "-" + x);
    System.out.println(f.get()); //abc-100

  }
Copy the code

combination

/** * This set of methods takes a Function as an argument. The input to this Function is the computed value of the current CompletableFuture, and the return result will be a new CompletableFuture, * This new CompletableFuture will combine the original CompletableFuture with the CompletableFuture returned by the function. So its function is similar: The object returned by A +--> B +--> C * * thenCompose is not the object returned by the fn function, and if the original CompletableFuture has not been calculated, it will generate A new composed CompletableFuture. * /
  public static void thenCompose(a) throws ExecutionException, InterruptedException {

    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
      return 100;
    });
    CompletableFuture<String> f =  future.thenCompose( i -> {
      return CompletableFuture.supplyAsync(() -> {
        return (i * 10) + "";
      });
    });
    System.out.println(f.get()); / / 1000

  }
Copy the code

Either

/** * Either represents two CompleTableFutures, which are executed when Either CompletableFuture is evaluated. * * public CompletableFuture
      
        acceptEither(CompletionStage
        other, Consumer
        action) * public CompletableFuture
       
         acceptEitherAsync(CompletionStage
         other, Consumer
         action) * public CompletableFuture
        
          acceptEitherAsync(CompletionStage
          other, Consumer
          action, Executor executor) * public 
          CompletableFuture applyToEither(CompletionStage
            other, Function
            fn) * public  CompletableFuture applyToEitherAsync(CompletionStage
              other, Function
              fn) * public  CompletableFuture applyToEitherAsync(CompletionStage
                other, Function
                fn, Executor Executor) * * acceptEither is a consumer action that will be executed when any CompletionStage is completed. This method returns The CompletableFuture
               
                 * * applyToEither method is executed when any CompletionStage is completed, Its return value is treated as the result of the new CompletableFuture
                 calculation. * /
               
        
       
      
  public static void either(a) {

    Random random = new Random();

    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {

      try {
        Thread.sleep(random.nextInt(1000));
      } catch (InterruptedException e) {
        e.printStackTrace();
      }

      return "from future1";
    });

    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {

      try {
        Thread.sleep(random.nextInt(1000));
      } catch (InterruptedException e) {
        e.printStackTrace();
      }

      return "from future2";
    });

    CompletableFuture<Void> haha = future1
        .acceptEitherAsync(future2, str -> System.out.println("The future is " + str));

    try {
      System.out.println(haha.get());
    } catch (InterruptedException e) {
      e.printStackTrace();
    } catch(ExecutionException e) { e.printStackTrace(); }}Copy the code

All

/** * the allOf method executes the calculation when all completableFutures have been executed. * anyOf accepts any number of CompletableFutures * * anyOf is a method that computes when any CompletableFuture is finished, with the same result. * /
  public static void allOfAndAnyOf(a) throws ExecutionException, InterruptedException {

    Random rand = new Random();
    CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
      try {
        Thread.sleep(10000 + rand.nextInt(1000));
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      return 100;
    });
    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
      try {
        Thread.sleep(10000 + rand.nextInt(1000));
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      return "abc";
    });
    //CompletableFuture
      
        f = CompletableFuture.allOf(future1,future2);
      
    CompletableFuture<Object> f =  CompletableFuture.anyOf(future1,future2);
    System.out.println(f.get());

  }
Copy the code

AllOf how to end all quickly if one of them fails?

/** * allOf how to quickly end all if one of them fails? * * By default, allOf waits for all tasks to complete, and even if one of them fails, it does not affect the other tasks to continue. But in most cases, the failure of one task means the failure of the whole task, and there is little point in continuing to complete the remaining tasks. * On Guava's allAsList, the entire task is canceled if one of the tasks fails: * * One approach is to capture each CompletableFuture's exceptionally method in the allOf array: If there is an exception, then the entire allOf throws that exception directly: */

  public static void allOfOneFail(a){
    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
      System.out.println("-- future1 -->");
      try {
        Thread.sleep(1000);
      } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
      }
      System.out.println("<-- future1 --");
      return "Hello";
    });

    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
      System.out.println("-- future2 -->");
      try {
        Thread.sleep(2000);
      } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
      }
      System.out.println("<-- future2 --");
      throw new RuntimeException("Oops!");
    });

    CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
      System.out.println("-- future3 -->");
      try {
        Thread.sleep(4000);
      } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
      }
      System.out.println("<-- future3 --");
      return "world";
    });

    // CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(future1, future2, future3);
    // combinedFuture.join();

    CompletableFuture<Void> allWithFailFast = CompletableFuture.allOf(future1, future2, future3);
    Stream.of(future1, future2, future3).forEach(f -> f.exceptionally(e -> {
      allWithFailFast.completeExceptionally(e);
      return null;
    }));

    allWithFailFast.join();
  }
Copy the code

A demo of my own

/** * suppose you have a set, and you need to request N interfaces. * /
  public static void myDemo(a){

    ArrayList<String> strings = Lists.newArrayList("1"."2"."3"."4");

    CompletableFuture[] cfs = strings.stream()
        .map(s -> CompletableFuture.supplyAsync(() -> {
          return s + "$";
        }).thenAccept(s1 -> {
          System.out.println(s1+ "#");
        }).exceptionally(t -> {
          return null;
        })).toArray(CompletableFuture[]::new);

    // Wait for the future to complete
    CompletableFuture.allOf(cfs).join();

  }
Copy the code