CompletableFuture

概述

  1. 在Java8中 , CompletableFuture 提供了非常强大的 Future 的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合 CompletableFuture 的方法

  2. 它可能代表一个明确完成的 Future ,也有可能代表一个完成阶段( CompletionStage ),它支持在计算完成以后触发一些函数或执行某些动作

  3. 它实现了 Future 和 CompletionStage 接口

  4. CompletionStage 接口说明

    1. CompletionStage 代表异步计算过程中的某一个阶段,一个阶段完成后可能会触发另外一个阶段
    2. 一个阶段的计算执行可以是一个 Funcation、Consumer、Runnable。比如 : stage.thenApply (x->square(x)).thenAccept(x->System.out.println(x)).thenRun(()->{System.out.println()});
    3. 一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发.有些类似 Linux 系统的管道分隔符传参数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package tech.chen.juccode.a07;

import org.omg.PortableServer.ThreadPolicy;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
* @Date 2022/8/20 16:05
* @Author c-z-k
*/
public class CompletableDemo {
public static void main(String[] args) {
ExecutorService threadPool = Executors.newCachedThreadPool();
CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().getName());
try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) {e.printStackTrace();}
return 1;
},threadPool).thenAccept((v)->{
System.out.println(v);
System.out.println(Thread.currentThread().getName());
}).whenComplete((v,e)->{
System.out.println(Thread.currentThread().getName());
}).exceptionally(e->{
System.out.println(Thread.currentThread().getName());
return null;
});

System.out.println(Thread.currentThread().getName()+"\t"+"over...");
//主线程不要立即结束,否则CompletableFuture默认使用的线程池会立即关闭,暂停几秒
try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) {e.printStackTrace();}

}
}

带了 Async 的方法表示的是:会重新在默认线程池中启动一个线程来执行任务

CompletableFuture创建

1
2
3
4
5
6
//runAsync方法不支持返回值
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
//supplyAsync可以支持返回值
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

没有指定Executor的方法会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。

CompletableFuture API

获得结果和触发计算

  1. public T get( ) 不见不散(会抛出异常) 只要调用了get( )方法,不管是否计算完成都会导致阻塞

  2. public T get(long timeout, TimeUnit unit) 过时不候

  3. public T getNow(T valuelfAbsent) :没有计算完成的情况下,给我一个替代结果计算完,返回计算完成后的结果、没算完,返回设定的valuelfAbsent

  4. public T join( ) :join方法和get( )方法作用一样,不同的是,join方法不抛出异常

对计算结果进行处理

  1. thenApply(Function<? super T,? extends U> fn):计算结果存在依赖关系,这两个线程串行化。由于存在依赖关系(当前步错,不走下一步),当前步骤有异常的话就叫停
  2. handle(BiFunction<? super T, Throwable, ? extends U> fn) : 有异常也可以往下一步走,根据带的异常参数可以进一步处理
  3. whenComplete : 是执行当前任务的线程执行继续执行whenComplete的任务
  4. whenCompleteAsync : 是执行把 whenCompleteAsync 这个任务继续提交给默认线程池(forkjoin)来进行执行

对计算结果进行消费

  1. thenRun(Runnable runnable) : 任务A执行完执行B,并且B不需要A的结果
  2. thenAccept(Consumer<? super T> action): 任务A执行完成执行B,B需要A的结果,但是任务B无返回值
  3. thenApply(Function<? super T,? extends U> fn) : 任务A执行完成执行B,B需要A的结果,同时任务B有返回值

对计算速度进行选用

  1. applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn) 。这个方法表示的是,谁快就用谁的结果,类似于我们在打跑得快,或者麻将谁赢了就返回给谁
  2. 两任务组合,一个完成
  3. applyToEither:两个任务有一个执行完成,获取它的返回值,处理任务并有新的返回值
  4. acceptEither:两个任务有一个执行完成,获取它的返回值,处理任务,没有新的返回值
  5. runAfterEither:两个任务有一个执行完成,不需要获取 future 的结果,处理任务,也没有返回值

对计算结果进行合并

  1. thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn) 两个CompletionStage任务都完成后,最终把两个任务的结果一起交给thenCombine来处理 。先完成的先等着,等待其他分支任务
  2. thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action) 两个CompletionStage任务都完成后,最终把两个任务的结果一起交给 thenAcceptBoth 来处理 。无返回值
  3. runAfterBoth(CompletionStage<?> other,Runnable action) 两个CompletionStage任务都完成后,在开始处理 action 。无返回值。

多任务组合

  1. allOf: 等待所有任务完成

    (public static CompletableFuture<Void> allOf(CompletableFuture<?>… cfs))

  2. anyOf :只要有一个任务完成

    (public static CompletableFuture<Object> anyOf(CompletableFuture<?>… cfs))