上一篇我们讲了Future机制,有兴趣的可以参考谈谈Future、Callable、FutureTask关系
但Future机制,还不那么灵活,比如怎么去利用Future机制描述两个任务串行执行,又或是两个任务并行执行,又或是只关心最先执行结束的任务结果。
Future机制在一定程度上都无法快速地满足以上需求,CompletableFuture便应运而生了。
本片会介绍CompletableFuture的api,并用一些示例演示如何去使用。
复制
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor); public static CompletableFuture<Void> runAsync(Runnable runnable); public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor);
1.
2.
3.
4.
5.
6.
7.
supplyAsync与runAsync的区别在于:supplyAsync有返回值,而runAsync没有返回值
带Executor参数的构造函数,则使用线程池中的线程执行异步任务(线程池可以参考说说线程池)
不带Executor参数的构造函数,则使用ForkJoinPool.commonPool()中的线程执行异步任务(Fork/Join框架可以参考谈谈并行流parallelStream)
复制
public class Case1 {      public static void main(String[] args) throws Exception {          CompletableFuture<Integer> completableFuture=CompletableFuture.supplyAsync(()->{             try {                 Thread.sleep(1000);             } catch (InterruptedException e) {                 e.printStackTrace();             }             return 1;         });         //该方法会一直阻塞         Integer result = completableFuture.get();         System.out.println(result);     }  }
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
复制
public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action); public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action); public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor); public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn);
1.
2.
3.
4.
5.
6.
7.
whenComplete开头的方法在计算任务完成(包括正常完成与出现异常)之后会回调
而exceptionally则只会在计算任务出现异常时才会被回调
如何确定哪个线程去回调whenComplete,比较复杂,先略过。
而回调whenCompleteAsync的线程比较简单,随便拿一个空闲的线程即可,后缀是Async的方法同理。
复制
package com.qcy.testCompleteableFuture;  import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.function.BiConsumer; import java.util.function.Function; import java.util.stream.IntStream;  /**  * @author qcy  * @create 2020/09/07 17:40:44  */ public class Case2 {      public static void main(String[] args) throws Exception {          CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {             try {                 Thread.sleep(1000);             } catch (InterruptedException e) {                 e.printStackTrace();             }             System.out.println("执行supplyAsync的线程:" + Thread.currentThread().getName());             int i = 1 / 0;             return 1;         });          completableFuture.whenComplete(new BiConsumer<Integer, Throwable>() {             @Override             public void accept(Integer integer, Throwable throwable) {                 System.out.println("执行whenComplete的线程:" + Thread.currentThread().getName());                 if (throwable == null) {                     System.out.println("计算未出现异常,结果:" + integer);                 }             }         });          completableFuture.exceptionally(new Function<Throwable, Integer>() {             @Override             public Integer apply(Throwable throwable) {                 //出现异常时,则返回一个默认值                 System.out.println("计算出现异常,信息:" + throwable.getMessage());                 return -1;             }         });          System.out.println(completableFuture.get());     }  }
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
输出:
当然,CompletableFuture内的各种方法是支持链式调用与Lambda表达式的,我们进行如下改写:
复制
public static void main(String[] args) throws Exception {       CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {          try {              Thread.sleep(2000);          } catch (InterruptedException e) {              e.printStackTrace();          }          System.out.println("执行supplyAsync的线程:" + Thread.currentThread().getName());          int i = 1 / 0;          return 1;      }).whenComplete((integer, throwable) -> {          System.out.println("执行whenComplete的线程:" + Thread.currentThread().getName());          if (throwable == null) {              System.out.println("计算未出现异常,结果:" + integer);          }      }).exceptionally(throwable -> {          //出现异常时,则返回一个默认值          System.out.println("计算出现异常,信息:" + throwable.getMessage());          return -1;      });       System.out.println("计算结果:" + completableFuture.get());  }
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
复制
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn); public CompletableFuture<Void> thenRun(Runnable action); public CompletableFuture<Void> thenAccept(Consumer<? super T> action); public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn); public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
1.
2.
3.
4.
5.
6.
7.
8.
9.
thenApply,依赖上一次任务执行的结果,参数中的Function<? super T,? extends U>,T代表上一次任务返回值的类型,U代表当前任务返回值的类型,当上一个任务没有出现异常时,thenApply才会被调用
thenRun,不需要知道上一个任务的返回结果,只是在上一个任务执行完成之后开始执行Runnable
thenAccept,依赖上一次任务的执行结果,因为入参是Consumer,所以不返回任何值。
handle和thenApply相似,不过当上一个任务出现异常时,能够执行handle,却不会去执行thenApply
thenCompose,传入一次任务执行的结果,返回一个新的CompleteableFuture对象
复制
package com.qcy.testCompleteableFuture;  import java.util.concurrent.CompletableFuture;  /**  * @author qcy  * @create 2020/09/07 17:40:44  */ public class Case4 {      public static void main(String[] args) {                  CompletableFuture.supplyAsync(() -> 2)                 .thenApply(num -> num * 3)                 .thenAccept(System.out::print);     }  }
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
很显然,输出为6
复制
package com.qcy.testCompleteableFuture;  import java.util.concurrent.CompletableFuture; import java.util.function.BiFunction;  /**  * @author qcy  * @create 2020/09/07 17:40:44  */ public class Case4 {      public static void main(String[] args) {          CompletableFuture.supplyAsync(() -> 2)                 .thenApply(num -> num / 0)                 .thenApply(result -> result * 3)                 .handle((integer, throwable) -> {                     if (throwable == null) {                         return integer;                     } else {                         throwable.printStackTrace();                         return -1;                     }                 }).thenAccept(System.out::print);     }  }
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
最终会输出-1
复制
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, Function<? super T,? super U,? extends V> fn); public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, Consumer<? super T, ? super U> action); public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,Runnable action); public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);
1.
2.
3.
4.
5.
6.
7.
8.
9.
thenCombine,合并两个任务,两个任务可以同时执行,都执行成功后,执行最后的BiFunction操作。其中T代表第一个任务的执行结果类型,U代表第二个任务的执行结果类型,V代表合并的结果类型
thenAcceptBoth,和thenCombine特性用法都极其相似,唯一的区别在于thenAcceptBoth进行一个消费,没有返回值
runAfterBoth,两个任务都执行完成后,但不关心他们的返回结构,然后去执行一个Runnable。
allOf,当所有的任务都执行完成后,返回一个CompletableFuture
复制
package com.qcy.testCompleteableFuture;  import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException;  /**  * @author qcy  * @create 2020/09/07 17:40:44  */ public class Case5 {      public static void main(String[] args) throws Exception {          CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {             System.out.println("任务1开始");             try {                 Thread.sleep(3000);             } catch (InterruptedException e) {                 e.printStackTrace();             }             System.out.println("任务1结束");             return 2;         });          CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(() -> {             System.out.println("任务2开始");             try {                 Thread.sleep(3000);             } catch (InterruptedException e) {                 e.printStackTrace();             }             System.out.println("任务2结束");             return 3;         });          CompletableFuture<Integer> completableFuture = cf1.thenCombine(cf2, (result1, result2) -> result1 * result2);         System.out.println("计算结果:" + completableFuture.get());     }  }
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
输出:
可以看到两个任务确实是同时执行的
当然,熟练了之后,直接使用链式操作,代码如下:
复制
package com.qcy.testCompleteableFuture;  import java.util.concurrent.CompletableFuture;  /**  * @author qcy  * @create 2020/09/07 17:40:44  */ public class Case6 {      public static void main(String[] args) throws Exception {          CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {             System.out.println("任务1开始");             try {                 Thread.sleep(3000);             } catch (InterruptedException e) {                 e.printStackTrace();             }             System.out.println("任务1结束");             return 2;         }).thenCombine(CompletableFuture.supplyAsync(() -> {             System.out.println("任务2开始");             try {                 Thread.sleep(2000);             } catch (InterruptedException e) {                 e.printStackTrace();             }             System.out.println("任务2结束");             return 3;         }), (result1, result2) -> result1 * result2);          System.out.println("计算结果:" + completableFuture.get());     }  }
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
复制
public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn); public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action); public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,Runnable action); public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs);
1.
2.
3.
4.
5.
6.
7.
applyToEither,最新执行完任务,将其结果执行Function操作,其中T是最先执行完的任务结果类型,U是最后输出的类型
acceptEither,最新执行完的任务,将其结果执行消费操作
runAfterEither,任意一个任务执行完成之后,执行Runnable操作
anyOf,多个任务中,返回最先执行完成的CompletableFuture
复制
package com.qcy.testCompleteableFuture;  import java.util.concurrent.CompletableFuture;  /**  * @author qcy  * @create 2020/09/07 17:40:44  */ public class Case7 {      public static void main(String[] args) throws Exception {          CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> {             System.out.println("任务1开始");             try {                 Thread.sleep(3000);             } catch (InterruptedException e) {                 e.printStackTrace();             }             System.out.println("任务1结束");             return 2;         }).acceptEither(CompletableFuture.supplyAsync(() -> {             System.out.println("任务2开始");             try {                 Thread.sleep(2000);             } catch (InterruptedException e) {                 e.printStackTrace();             }             System.out.println("任务2结束");             return 3;         }), result -> System.out.println(result));          //等待CompletableFuture返回,防止主线程退出         completableFuture.join();     }  }
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
输出:
可以看得到,任务2结束后,直接不再执行任务1的剩余代码
复制
package com.qcy.testCompleteableFuture;  import java.util.concurrent.CompletableFuture;  /**  * @author qcy  * @create 2020/09/07 17:40:44  */ public class Case8 {      public static void main(String[] args) throws Exception {          CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {             System.out.println("任务1开始");             try {                 Thread.sleep(3000);             } catch (InterruptedException e) {                 e.printStackTrace();             }             System.out.println("任务1结束");             return 2;         });          CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(() -> {             System.out.println("任务2开始");             try {                 Thread.sleep(2000);             } catch (InterruptedException e) {                 e.printStackTrace();             }             System.out.println("任务2结束");             return 3;         });          CompletableFuture<Integer> cf3 = CompletableFuture.supplyAsync(() -> {             System.out.println("任务3开始");             try {                 Thread.sleep(4000);             } catch (InterruptedException e) {                 e.printStackTrace();             }             System.out.println("任务3结束");             return 4;         });          CompletableFuture<Object> firstCf = CompletableFuture.anyOf(cf1, cf2, cf3);         System.out.println(firstCf.get());     }  }
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
输出: