java并发工具类-CompletableFuture
前言 CompletableFuture类实现了CompletionStage和Future接口。Future是Java 5添加的类,用来描述一个异步计算的结果,但是获取一个结果时方法较少,要么通过轮询isDone,确认完成后,调用get()获取值,要么调用get()设置一个超时时间。但是这个get()方法会阻塞住调用线程,这种阻塞的方式显然和我们的异步编程的初衷相违背。
为了解决这个问题,JDK吸收了guava的设计思想,加入了Future的诸多扩展功能形成了CompletableFuture。
敲黑板:以Async结尾的方法都是可以异步执行的,如果指定了线程池,会在指定的线程池中执行,如果没有指定,默认会在ForkJoinPool.commonPool()中执行
Future
Future 是Java 5添加的类,用来描述一个异步计算的结果。你可以使用isDone
方法检查计算是否完成,或者使用get
阻塞住调用线程,直到计算完成返回结果,你也可以使用cancel
方法停止任务的执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public class BasicFuture { public static void main (String[] args) throws ExecutionException, InterruptedException { ExecutorService es = Executors.newFixedThreadPool(10 ); Future<Integer> f = es.submit(() ->{ return 100 ; }); f.get(); } }
虽然Future
以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的CPU资源,而且也不能及时地得到计算结果,为什么不能用观察者设计模式当计算结果完成及时通知监听者呢?
CompletableFuture简介 CompletableFuture 是Java 8 新增加的Api,该类实现,Future和CompletionStage两个接口,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法
异步任务创建
CompletableFuture
提供了四个静态方法用来创建CompletableFuture对象:
1 2 3 4 public static CompletableFuture<Void> runAsync (Runnable runnable) public static CompletableFuture<Void> runAsync (Runnable runnable, Executor executor) public static <U> CompletableFuture<U> supplyAsync (Supplier<U> supplier) public static <U> CompletableFuture<U> supplyAsync (Supplier<U> supplier, Executor executor)
Asynsc
表示异步,而supplyAsync
与runAsync
不同在与前者异步返回一个结果,后者是void.第二个函数第二个参数表示是用我们自己创建的线程池,否则采用默认的ForkJoinPool.commonPool()
作为它的线程池.其中Supplier
是一个函数式接口,代表是一个生成者的意思,传入0个参数,返回一个结果。
当传入Executor会使用指定线程池执行,如果没有传入则使用默认ForkJoinPool.commonPool()执行,值得注意的是,commonPool中都是守护线程,主线程执行完,子线程也就over了。因此建议当任务非常耗 时,使用自定义线程池。
runAsync方法不支持返回值。
supplyAsync可以支持返回值。
示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public static void runAsyncTest () throws Exception { CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { } System.out.println("run end ..." ); }); future.get(); } public static void supplyAsyncTest () throws Exception { CompletableFuture<Long> future = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { } System.out.println("run end ..." ); return System.currentTimeMillis(); }); long time = future.get(); System.out.println("time = " + time); }
主动计算
以下4个方法用于获取结果
1 2 3 4 5 6 7 8 9 10 11 public T get () public T get (long timeout, TimeUnit unit) public T getNow (T valueIfAbsent) public T join () public boolean complete (T value)
getNow
有点特殊,如果结果已经计算完则返回结果或者抛出异常,否则返回给定的valueIfAbsent值。join()
与get()
区别在于join()
返回计算的结果或者抛出一个unchecked异常(CompletionException),而get()
返回一个具体的异常.
示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 public static void completableFutureGet () throws Exception{ CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000 ); } catch (InterruptedException e) { e.printStackTrace(); } return 100 * 10 ; }); System.out.println(completableFuture.get()); System.out.println(completableFuture.get(1000 , TimeUnit.MICROSECONDS)); System.out.println(completableFuture.join()); System.out.println(completableFuture.getNow(1 )); }
主动触发计算
如果 CompletableFuture 没有关联任何的Callback、异步任务等,如果调用get方法,那会一直阻塞下去,可以使用complete方法主动完成计算。
1 2 public boolean complete (T value) public boolean completeExceptionally (Throwable ex)
上面方法表示当调用CompletableFuture.get()
被阻塞的时候,那么这个方法就是结束阻塞,并且get()
获取设置的value.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public static CompletableFuture<Integer> compute () { final CompletableFuture<Integer> future = new CompletableFuture <>(); return future; } public static void main (String[] args) throws Exception { final CompletableFuture<Integer> f = compute(); class Client extends Thread { CompletableFuture<Integer> f; Client(String threadName, CompletableFuture<Integer> f) { super (threadName); this .f = f; } @Override public void run () { try { System.out.println(this .getName() + ": " + f.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } } new Client ("Client1" , f).start(); new Client ("Client2" , f).start(); System.out.println("waiting" ); f.complete(100 ); Thread.sleep(1000 ); }
多任务依赖执行
在异步编程中,有时会涉及到异步任务间存在依赖关系,如第二个任务的执行需要依赖与第一个任务的执行结果,对于这种需求,CompletableFuture中也提供了方法实现
thenCompose
thenCompose 方法允许你对两个 CompletionStage 进行流水线操作,第一个操作完成时,将其结果作为参数传递给第二个操作。
1 2 3 public <U> CompletableFuture<U> thenCompose (Function<? super T, ? extends CompletionStage<U>> fn) ;public <U> CompletableFuture<U> thenComposeAsync (Function<? super T, ? extends CompletionStage<U>> fn) ;public <U> CompletableFuture<U> thenComposeAsync (Function<? super T, ? extends CompletionStage<U>> fn, Executor executor) ;
以上接收类型为 Function<? super T,? extends CompletionStage> fn ,fn 接收上一级返回的结果,并返回一个新的 CompletableFuture
示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private static ExecutorService executorService = Executors.newFixedThreadPool(4 );private static void thenComposeTest () throws Exception { CompletableFuture<Integer> f = CompletableFuture.supplyAsync(() -> { int t = new Random ().nextInt(3 ); System.out.println("t1=" + t); return t; }, executorService).thenCompose((x) -> { return CompletableFuture.supplyAsync(() -> { int t = x * 2 ; System.out.println("t2=" + t); return t; }); }); System.out.println("thenCompose result : " + f.get()); }
thenApply
当一个线程依赖另一个线程时,可以使用 thenApply 方法来把这两个线程串行化。
CompletableFuture 由于有回调,可以不必等待一个计算完成而阻塞着调用线程,可以在一个结果计算完成之后紧接着执行某个Action。我们可以将这些操作串联起来。
1 2 3 public <U> CompletableFuture<U> thenApply (Function<? super T,? extends U> fn) public <U> CompletableFuture<U> thenApplyAsync (Function<? super T,? extends U> fn) public <U> CompletableFuture<U> thenApplyAsync (Function<? super T,? extends U> fn, Executor executor)
首先说明一下已Async结尾的方法都是可以异步执行的,如果指定了线程池,会在指定的线程池中执行,如果没有指定,默认会在ForkJoinPool.commonPool()中执行,下文中将会有好多类似的,都不详细解释了。关键的入参只有一个Function,它是函数式接口,所以使用Lambda表示起来会更加优雅。它的入参是上一个阶段计算后的结果,返回值是经过转化后结果。
和 handle 方法的区别是,handle 会处理正常计算值和异常,不会抛出异常。而 thenApply 只会处理正常计算值,有异常则抛出。
参数解释
Function<? super T,? extends U>
T:上一个任务返回结果的类型
U:当前任务的返回值类型
示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private static ExecutorService executorService = Executors.newFixedThreadPool(4 );private static void thenApplyTest () throws Exception { CompletableFuture<Long> future = CompletableFuture.supplyAsync(() -> { long result = new Random ().nextInt(100 ); System.out.println("result1=" + result); return result; }, executorService).thenApply((x) -> { long result = x * 5 ; System.out.println("result2=" + result); return result; }); long result = future.get(); System.out.println(result); }
第二个任务依赖第一个任务的结果。
这些方法不是马上执行的,也不会阻塞,而是前一个执行完成后继续执行下一个。
注意:如果出现异常thenApply
将不会被执行
handle
handle 是执行任务完成时对结果的处理。
handle 方法和 thenApply 方法处理方式基本一样。不同的是 handle 是在任务完成后再执行,还可以处理异常的任务。thenApply 只可以执行正常的任务,任务出现异常则不执行 thenApply 方法。
1 2 3 public <U> CompletableFuture<U> handle (BiFunction<? super T, Throwable, ? extends U> fn) ;public <U> CompletableFuture<U> handleAsync (BiFunction<? super T, Throwable, ? extends U> fn) ;public <U> CompletableFuture<U> handleAsync (BiFunction<? super T, Throwable, ? extends U> fn,Executor executor) ;
与whenComplete()
不同的是这个函数返回CompletableFuture
并不是原始的CompletableFuture
返回的值,而是BiFunction
返回的值,handle 方法和whenComplete方法类似,只不过接收的是一个 BiFunction<? super T,Throwable,? extends U> fn 类型的参数,因此有 whenComplete 方法和 转换的功能 (thenApply)
示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 private static ExecutorService executorService = Executors.newFixedThreadPool(4 );public static void handleTest () throws Exception { CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { int i = 10 / 0 ; return new Random ().nextInt(10 ); }, executorService).handle((x, e) -> { int result = -1 ; if (e == null ) { result = x * 2 ; } else { System.out.println(e.getMessage()); } return result; }); System.out.println(future.get()); }
从示例中可以看出,在 handle 中可以根据任务是否有异常来进行做相应的后续处理操作。而 thenApply 方法,如果上个任务出现错误,则不会执行 thenApply 方法。
thenAccept
接收任务的处理结果,并消费处理,无返回结果。
1 2 3 public CompletableFuture<Void> thenAccept (Consumer<? super T> action) ;public CompletableFuture<Void> thenAcceptAsync (Consumer<? super T> action) ;public CompletableFuture<Void> thenAcceptAsync (Consumer<? super T> action,Executor executor) ;
当将多个任务连接起来执行时,有时最终是不需要返回结果,CompletableFuture中也提供了方法实现,thenAccept()使用与上述方法类似,接收任务执行结果,并使用,但其没有结果返回。
示例 1 2 3 4 5 6 7 8 public static void thenAcceptTest () throws Exception { CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> { return new Random ().nextInt(10 ); }).thenAccept(x -> { System.out.println(x); }); future.get(); }
从示例代码中可以看出,该方法只是消费执行完成的任务,并可以根据上面的任务返回的结果进行处理。并没有后续的输错操作。
thenRun
跟 thenAccept 方法不一样的是,不关心任务的处理结果。只要上面的任务执行完成,就开始执行 thenAccept ,thenRun它的入参是一个Runnable的实例,表示当得到上一步的结果时的操作。
1 2 3 public CompletableFuture<Void> thenRun (Runnable action) ;public CompletableFuture<Void> thenRunAsync (Runnable action) ;public CompletableFuture<Void> thenRunAsync (Runnable action,Executor executor) ;
thenRun()与thenAccept()使用基本相同,都是不会进行结果返回,但不同的是,thenRun()不关心方法是否有结 果,只要它完成,就会触发其执行。
示例 1 2 3 4 5 6 7 8 public static void thenRunTest () throws Exception { CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> { return new Random ().nextInt(10 ); }).thenRun(() -> { System.out.println("thenRun ..." ); }); future.get(); }
该方法同 thenAccept 方法类似。不同的是上个任务处理完成后,并不会把计算的结果传给 thenRun 方法。只是处理玩任务后,执行 thenAccept 的后续操作。
异步计算结果触发回调 whenComplete
当CompletableFuture的计算结果完成,或者抛出异常的时候,可以执行特定的Action。主要是下面的方法:
1 2 3 public CompletableFuture<T> whenComplete (BiConsumer<? super T,? super Throwable> action) public CompletableFuture<T> whenCompleteAsync (BiConsumer<? super T,? super Throwable> action) public CompletableFuture<T> whenCompleteAsync (BiConsumer<? super T,? super Throwable> action, Executor executor)
上面4个方法是当计算阶段结束的时候触发,BiConsumer
有两个入参,分别代表计算返回值,另外一个是异常.无返回值.方法不以Async结尾,意味着Action使用相同的线程执行,而Async可能会使用其它的线程去执行(如果使用相同的线程池,也可能会被同一个线程选中执行)。
whenComplete 和 whenCompleteAsync 的区别
whenComplete:是执行当前任务的线程执行继续执行 whenComplete 的任务。
whenCompleteAsync:是执行把 whenCompleteAsync 这个任务继续提交给线程池来进行执行。
示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public static void whenCompleteTest () throws Exception { CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { } if (new Random ().nextInt() % 2 >= 0 ) { int i = 12 / 0 ; } System.out.println("run end ..." ); return 111 ; }); future.whenComplete((x, e) -> { System.out.println("执行完成!" + e.getMessage()); }); TimeUnit.SECONDS.sleep(2 ); System.out.println("执行结果:" + future.get()); }
exceptionally exceptionally()与上述两个方法类似,都是用于当异步任务结束后,执行特定处理,但不同的是,上述两个方法即 可以处理正常的返回结果也可以处理异常,而exceptionally()只对异常进行处理,且其使用的是主线程。
1 2 public CompletableFuture<T> exceptionally (Function<Throwable,? extends T> fn)
示例
exceptionally比较复杂,需要通过4个实例才能真正明白
示例1
这段代码,由于会抛出异常,会先走whenCompleteAsync,然后再走exceptionally,而且是无法获取到返回值的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public static void exceptionallyTest1 () throws Exception { CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { } if (new Random ().nextInt() % 2 >= 0 ) { int i = 12 / 0 ; } return 111 ; }); future.whenCompleteAsync((result, exception) -> { System.out.println("计算已执行完毕,result:" + result); System.out.println("计算已执行完毕,exception:" + (exception == null ? "无异常" : exception.getClass())); }).exceptionally(exception -> { System.out.println("计算执行过程中发生了异常,exception:" + exception.getClass()); return 333 ; }); System.out.println("执行到最后一段代码了,future result:" + future.get()); }
执行结果
1 2 3 ............ 计算已执行完毕,exception:class java.util.concurrent.CompletionException 计算执行过程中发生了异常,exception:class java.util.concurrent.CompletionException
示例2
这里的打印结果是和上面类似的,可是为什么这次要获取新的CompletableFuture对象呢?看下面的exceptionally实例3后,再回来对比吧
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public static void exceptionallyTest2 () throws Exception { CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { } if (new Random ().nextInt() % 2 >= 0 ) { int i = 12 / 0 ; } return 111 ; }); CompletableFuture<Integer> future2 = future.whenCompleteAsync((result, exception) -> { System.out.println("计算已执行完毕,result:" + result); System.out.println("计算已执行完毕,exception:" + (exception == null ? "无异常" : exception.getMessage())); }); CompletableFuture<Integer> future3 = future2.exceptionally(exception -> { System.out.println("计算执行过程中发生了异常,exception:" + exception.getMessage()); return 333 ; }); System.out.println("执行到最后一段代码了,future result:" + future.get()); System.out.println("执行到最后一段代码了,future2 result:" + future2.get()); System.out.println("执行到最后一段代码了,future3 result:" + future3.get()); }
执行结果
1 2 3 计算已执行完毕,result:null 计算已执行完毕,exception:java.lang.ArithmeticException: / by zero 计算执行过程中发生了异常,exception:java.lang.ArithmeticException: / by zero
示例3
这因为如果抛出了异常,future的get方法是执行不到的;而如果没有抛出异常的话,还是会返回原始的CompletableFuture的值的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public static void exceptionallyTest3 () throws Exception { CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { } if (new Random ().nextInt() % 2 >= 0 ) { int i = 12 / 0 ; } return 111 ; }); CompletableFuture<Integer> future2 = future.whenCompleteAsync((result, exception) -> { System.out.println("计算已执行完毕,result:" + result); System.out.println("计算已执行完毕,exception:" + (exception == null ? "无异常" : exception.getMessage())); }); CompletableFuture<Integer> future3 = future2.exceptionally(exception -> { System.out.println("计算执行过程中发生了异常,exception:" + exception.getMessage()); return 333 ; }); System.out.println("执行到最后一段代码了,future3 result:" + future3.get()); }
执行结果
1 2 3 4 计算已执行完毕,result:null 计算已执行完毕,exception:java.lang.ArithmeticException: / by zero 计算执行过程中发生了异常,exception:java.lang.ArithmeticException: / by zero 执行到最后一段代码了,future3 result:333
示例4 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public static void exceptionallyTest4 () throws Exception { CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { } return 111 ; }); CompletableFuture<Integer> future2 = future.whenCompleteAsync((result, exception) -> { System.out.println("计算已执行完毕,result:" + result); System.out.println("计算已执行完毕,exception:" + (exception == null ? "无异常" : exception.getMessage())); }); CompletableFuture<Integer> future3 = future2.exceptionally(exception -> { System.out.println("计算执行过程中发生了异常,exception:" + exception.getMessage()); return 0 ; }); System.out.println("执行到最后一段代码了,future result:" + future.get()); System.out.println("执行到最后一段代码了,future2 result:" + future2.get()); System.out.println("执行到最后一段代码了,future3 result:" + future3.get()); }
执行结果
1 2 3 4 5 执行到最后一段代码了,future result:111 计算已执行完毕,result:111 计算已执行完毕,exception:无异常 执行到最后一段代码了,future2 result:111 执行到最后一段代码了,future3 result:111
两个任务全部完成触发
在进行多异步任务执行时,有时不光要让任务之间串联执行,有时还要将多个任务执行结果进行合并处理, CompletableFuture中也提供了一些方法实现。
thenCombine
当两个异步任务都执行完毕后,它可以将两个任务进行合并,获取到两个任务的执行结果,进行合并处理,最后会 有返回值。
1 2 3 public <U,V> CompletableFuture<V> thenCombine (CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn) ;public <U,V> CompletableFuture<V> thenCombineAsync (CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn) ;public <U,V> CompletableFuture<V> thenCombineAsync (CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor) ;
两个CompletionStage是并行执行的,它们之间并没有先后依赖顺序,other并不会等待先前的CompletableFuture执行完毕后再执行。
thenCombine 和 supplyAsync 不一定哪个先哪个后,是并行执行的。
示例 1 2 3 4 5 6 7 8 9 10 11 12 private static void thenCombineTest () throws Exception { CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { return "hello" ; }); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { return "word" ; }); CompletableFuture<String> result = future1.thenCombine(future2, (x, y) -> { return x + " " + y; }); System.out.println(result.get()); }
thenAcceptBoth
当两个CompletableFuture都执行完成后,把结果一块交给thenAcceptBoth来进行处理
thenAcceptBoth()使用与thenCombine()类似,当两个任务执行完,获取两个任务的结果进行特定处理,但 thenAcceptBoth()没有返回值
1 2 3 public <U> CompletableFuture<Void> thenAcceptBoth (CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action) ;public <U> CompletableFuture<Void> thenAcceptBothAsync (CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action) ;public <U> CompletableFuture<Void> thenAcceptBothAsync (CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor) ;
thenAccept 相比,参数类型多了一个 CompletionStage<? extends U> other,以上方法会接收上一个CompletionStage返回值,和当前的一个。
示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 private static void thenAcceptBothTest () throws Exception { CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> { int t = new Random ().nextInt(3 ); try { TimeUnit.SECONDS.sleep(t); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("f1=" + t); return t; }); CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> { int t = new Random ().nextInt(3 ); try { TimeUnit.SECONDS.sleep(t); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("f2=" + t); return t; }); CompletableFuture future = f1.thenAcceptBoth(f2, (x, y) -> { System.out.println("f1=" + x + ";f2=" + y + ";" ); }); System.out.println(future.get()); }
runAfterBoth
当两个任务执行完毕,触发特定任务处理,但不要两个异步任务结果,且不会进行值返回。
runAfterBoth 和以上方法不同,传一个 Runnable 类型的参数,不接收上一级的返回值
并且两个CompletionStage,都完成了计算才会执行下一步的操作(Runnable)
1 2 3 public CompletableFuture<Void> runAfterBoth (CompletionStage<?> other,Runnable action) ;public CompletableFuture<Void> runAfterBothAsync (CompletionStage<?> other,Runnable action) ;public CompletableFuture<Void> runAfterBothAsync (CompletionStage<?> other,Runnable action,Executor executor) ;
示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 private static void runAfterBothTest () throws Exception { CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> { int t = new Random ().nextInt(3 ); try { TimeUnit.SECONDS.sleep(t); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("f1=" + t); return t; }); CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> { int t = new Random ().nextInt(3 ); try { TimeUnit.SECONDS.sleep(t); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("f2=" + t); return t; }); CompletableFuture future = f1.runAfterBoth(f2, () -> { System.out.println("上面两个任务都执行完成了。" ); }); System.out.println(future.get()); }
两个任务任意一个完成触发 applyToEither
总会碰到有两种渠道完成同一个事情,所以就可以调用这个方法,找一个最快的结果进行处理。
两个CompletionStage,谁执行返回的结果快,我就用那个CompletableFuture的结果进行下一步的转化操作。
1 2 3 public <U> CompletableFuture<U> applyToEither (CompletionStage<? extends T> other,Function<? super T, U> fn) ;public <U> CompletableFuture<U> applyToEitherAsync (CompletionStage<? extends T> other,Function<? super T, U> fn) ;public <U> CompletableFuture<U> applyToEitherAsync (CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor) ;
示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 private static void applyToEitherTest () throws Exception { CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> { int t = new Random ().nextInt(3 ); try { TimeUnit.SECONDS.sleep(t); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("f1=" + t); return t; }); CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> { int t = new Random ().nextInt(3 ); try { TimeUnit.SECONDS.sleep(t); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("f2=" + t); return t; }); CompletableFuture<Integer> result = f1.applyToEither(f2, (x) -> { System.out.println(x); return x * 2 ; }); System.out.println(result.get()); }
acceptEither
acceptEither()的使用效果与applyToEither()类似,但acceptEither()没有返回值
两个CompletionStage,谁执行返回的结果快,我就用那个CompletionStage的结果进行下一步的消耗操作。
1 2 3 public CompletableFuture<Void> acceptEither (CompletionStage<? extends T> other,Consumer<? super T> action) ;public CompletableFuture<Void> acceptEitherAsync (CompletionStage<? extends T> other,Consumer<? super T> action) ;public CompletableFuture<Void> acceptEitherAsync (CompletionStage<? extends T> other,Consumer<? super T> action,Executor executor) ;
示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 private static void acceptEitherTest () throws Exception { CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> { int t = new Random ().nextInt(3 ); try { TimeUnit.SECONDS.sleep(t); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("f1=" + t); return t; }); CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> { int t = new Random ().nextInt(3 ); try { TimeUnit.SECONDS.sleep(t); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("f2=" + t); return t; }); f1.acceptEither(f2, (x) -> { System.out.println(x); }); }
runAfterEither
当两个任务执行,只要有一个任务执行完,则触发特定处理执行,无需使用异步任务的执行结果,且特定处理不会 进行值的返回。
两个CompletionStage,任何一个完成了都会执行下一步的操作(Runnable)
1 2 3 public CompletableFuture<Void> runAfterEither (CompletionStage<?> other,Runnable action) ;public CompletableFuture<Void> runAfterEitherAsync (CompletionStage<?> other,Runnable action) ;public CompletableFuture<Void> runAfterEitherAsync (CompletionStage<?> other,Runnable action,Executor executor) ;
示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 private static void runAfterEitherTest () throws Exception { CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> { int t = new Random ().nextInt(3 ); try { TimeUnit.SECONDS.sleep(t); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("f1=" + t); return t; }); CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> { int t = new Random ().nextInt(3 ); try { TimeUnit.SECONDS.sleep(t); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("f2=" + t); return t; }); f1.runAfterEither(f2, () -> { System.out.println("上面有一个已经完成了。" ); }); }
多任务组合执行
刚才的操作异步任务的数量,只能局限在两个,现在如果需要有任意多个异步任务进行组合操作的话, CompletableFuture中也提供了对应方法进行实现
allOf
这个方法的意思是把有方法都执行完才往下执行,没有返回值
1 public static CompletableFuture<Void> allOf (CompletableFuture<?>... cfs)
示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public static void complateFuturAllOf () throws Exception { Random random = new Random (); CompletableFuture.allOf( CompletableFuture.runAsync(() -> { try { Thread.sleep(random.nextInt(5000 )); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(1 ); }), CompletableFuture.runAsync(() -> { try { Thread.sleep(random.nextInt(1000 )); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(2 ); })) .get(); }
anyOf
任务一个方法执行完都往下执行,返回一个Object类型的值
1 public static CompletableFuture anyOf (CompletableFuture<?>… cfs)
示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public static void complateFuturAnyOf () throws Exception { Random random = new Random (); Object o = CompletableFuture.anyOf( CompletableFuture.runAsync(() -> { try { Thread.sleep(random.nextInt(4000 )); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(1 ); }), CompletableFuture.runAsync(() -> { try { Thread.sleep(random.nextInt(1000 )); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(2 ); })) .get(); System.out.println(o); }