抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

java并发工具类-CompletableFuture

前言

​ CompletableFuture类实现了CompletionStage和Future接口。Future是Java 5添加的类,用来描述一个异步计算的结果,但是获取一个结果时方法较少,要么通过轮询isDone,确认完成后,调用get()获取值,要么调用get()设置一个超时时间。但是这个get()方法会阻塞住调用线程,这种阻塞的方式显然和我们的异步编程的初衷相违背。

​ 为了解决这个问题,JDK吸收了guava的设计思想,加入了Future的诸多扩展功能形成了CompletableFuture。

敲黑板:以Async结尾的方法都是可以异步执行的,如果指定了线程池,会在指定的线程池中执行,如果没有指定,默认会在ForkJoinPool.commonPool()中执行

Future

Future是Java 5添加的类,用来描述一个异步计算的结果。你可以使用isDone方法检查计算是否完成,或者使用get阻塞住调用线程,直到计算完成返回结果,你也可以使用cancel方法停止任务的执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class BasicFuture {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService es = Executors.newFixedThreadPool(10);
Future<Integer> f = es.submit(() ->{
// 长时间的异步计算
// ……
// 然后返回结果
return 100;
});
// while(!f.isDone())
// ;
f.get();
}
}

​ 虽然Future以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的CPU资源,而且也不能及时地得到计算结果,为什么不能用观察者设计模式当计算结果完成及时通知监听者呢?

CompletableFuture简介

​ CompletableFuture 是Java 8 新增加的Api,该类实现,Future和CompletionStage两个接口,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法

异步任务创建

CompletableFuture提供了四个静态方法用来创建CompletableFuture对象:

1
2
3
4
public static CompletableFuture<Void>   runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

Asynsc表示异步,而supplyAsyncrunAsync不同在与前者异步返回一个结果,后者是void.第二个函数第二个参数表示是用我们自己创建的线程池,否则采用默认的ForkJoinPool.commonPool()作为它的线程池.其中Supplier是一个函数式接口,代表是一个生成者的意思,传入0个参数,返回一个结果。

​ 当传入Executor会使用指定线程池执行,如果没有传入则使用默认ForkJoinPool.commonPool()执行,值得注意的是,commonPool中都是守护线程,主线程执行完,子线程也就over了。因此建议当任务非常耗 时,使用自定义线程池。

  • runAsync方法不支持返回值。
  • supplyAsync可以支持返回值。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//无返回值
public static void runAsyncTest() throws Exception {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
System.out.println("run end ...");
});

future.get();
}

//有返回值
public static void supplyAsyncTest() throws Exception {
CompletableFuture<Long> future = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
System.out.println("run end ...");
return System.currentTimeMillis();
});

long time = future.get();
System.out.println("time = " + time);
}

主动计算

以下4个方法用于获取结果

1
2
3
4
5
6
7
8
9
10
11
//同步获取结果
//该方法时阻塞方法,会等待计算结果完成
public T get()
//有时间限制的阻塞方法
public T get(long timeout, TimeUnit unit)
//立即获取方法结果,如果没有计算结束则返回传的值
public T getNow(T valueIfAbsent)
//和 get() 方法类似也是主动阻塞线程,等待计算结果。和get() 方法有细微的差别
public T join()
//立即完成计算,并把结果设置为传的值,返回是否设置成功
public boolean complete(T value)

getNow有点特殊,如果结果已经计算完则返回结果或者抛出异常,否则返回给定的valueIfAbsent值。join()get()区别在于join()返回计算的结果或者抛出一个unchecked异常(CompletionException),而get()返回一个具体的异常.

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static void completableFutureGet() throws  Exception{
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 100 * 10;
});
System.out.println(completableFuture.get());
System.out.println(completableFuture.get(1000, TimeUnit.MICROSECONDS));
System.out.println(completableFuture.join());
System.out.println(completableFuture.getNow(1));
}

主动触发计算

如果 CompletableFuture 没有关联任何的Callback、异步任务等,如果调用get方法,那会一直阻塞下去,可以使用complete方法主动完成计算。

1
2
public boolean complete(T  value)
public boolean completeExceptionally(Throwable ex)

​ 上面方法表示当调用CompletableFuture.get()被阻塞的时候,那么这个方法就是结束阻塞,并且get()获取设置的value.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public static CompletableFuture<Integer> compute() {
final CompletableFuture<Integer> future = new CompletableFuture<>();
return future;
}
public static void main(String[] args) throws Exception {
final CompletableFuture<Integer> f = compute();
class Client extends Thread {
CompletableFuture<Integer> f;
Client(String threadName, CompletableFuture<Integer> f) {
super(threadName);
this.f = f;
}
@Override
public void run() {
try {
System.out.println(this.getName() + ": " + f.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
new Client("Client1", f).start();
new Client("Client2", f).start();
System.out.println("waiting");
//设置Future.get()获取到的值
f.complete(100);
//以异常的形式触发计算
//f.completeExceptionally(new Exception());
Thread.sleep(1000);
}

多任务依赖执行

在异步编程中,有时会涉及到异步任务间存在依赖关系,如第二个任务的执行需要依赖与第一个任务的执行结果,对于这种需求,CompletableFuture中也提供了方法实现

thenCompose

thenCompose 方法允许你对两个 CompletionStage 进行流水线操作,第一个操作完成时,将其结果作为参数传递给第二个操作。

1
2
3
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) ;
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor) ;

​ 以上接收类型为 Function<? super T,? extends CompletionStage> fn ,fn 接收上一级返回的结果,并返回一个新的 CompletableFuture

示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private static ExecutorService executorService = Executors.newFixedThreadPool(4);

private static void thenComposeTest() throws Exception {
CompletableFuture<Integer> f = CompletableFuture.supplyAsync(() -> {
int t = new Random().nextInt(3);
System.out.println("t1=" + t);
return t;
}, executorService).thenCompose((x) -> {
return CompletableFuture.supplyAsync(() -> {
int t = x * 2;
System.out.println("t2=" + t);
return t;
});
});
System.out.println("thenCompose result : " + f.get());
}

thenApply

当一个线程依赖另一个线程时,可以使用 thenApply 方法来把这两个线程串行化。

​ CompletableFuture 由于有回调,可以不必等待一个计算完成而阻塞着调用线程,可以在一个结果计算完成之后紧接着执行某个Action。我们可以将这些操作串联起来。

1
2
3
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

​ 首先说明一下已Async结尾的方法都是可以异步执行的,如果指定了线程池,会在指定的线程池中执行,如果没有指定,默认会在ForkJoinPool.commonPool()中执行,下文中将会有好多类似的,都不详细解释了。关键的入参只有一个Function,它是函数式接口,所以使用Lambda表示起来会更加优雅。它的入参是上一个阶段计算后的结果,返回值是经过转化后结果。

​ 和 handle 方法的区别是,handle 会处理正常计算值和异常,不会抛出异常。而 thenApply 只会处理正常计算值,有异常则抛出。

参数解释

Function<? super T,? extends U>

  • T:上一个任务返回结果的类型
  • U:当前任务的返回值类型
示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private static ExecutorService executorService = Executors.newFixedThreadPool(4);

private static void thenApplyTest() throws Exception {
CompletableFuture<Long> future = CompletableFuture.supplyAsync(() -> {
long result = new Random().nextInt(100);
System.out.println("result1=" + result);
//如果出现异常 thenApply 将不会被执行
//int n = 1 / 0;
return result;
}, executorService).thenApply((x) -> {
long result = x * 5;
System.out.println("result2=" + result);
return result;
});

long result = future.get();
System.out.println(result);
}

第二个任务依赖第一个任务的结果。

这些方法不是马上执行的,也不会阻塞,而是前一个执行完成后继续执行下一个。

注意:如果出现异常thenApply将不会被执行

handle

handle 是执行任务完成时对结果的处理。

​ handle 方法和 thenApply 方法处理方式基本一样。不同的是 handle 是在任务完成后再执行,还可以处理异常的任务。thenApply 只可以执行正常的任务,任务出现异常则不执行 thenApply 方法。

1
2
3
public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);

​ 与whenComplete()不同的是这个函数返回CompletableFuture并不是原始的CompletableFuture返回的值,而是BiFunction返回的值,handle 方法和whenComplete方法类似,只不过接收的是一个 BiFunction<? super T,Throwable,? extends U> fn 类型的参数,因此有 whenComplete 方法和 转换的功能 (thenApply)

示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private static ExecutorService executorService = Executors.newFixedThreadPool(4);

public static void handleTest() throws Exception {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
int i = 10 / 0;
return new Random().nextInt(10);
}, executorService).handle((x, e) -> {
int result = -1;
if (e == null) {
result = x * 2;
} else {
System.out.println(e.getMessage());
}
return result;
});
System.out.println(future.get());
}

​ 从示例中可以看出,在 handle 中可以根据任务是否有异常来进行做相应的后续处理操作。而 thenApply 方法,如果上个任务出现错误,则不会执行 thenApply 方法。

thenAccept

接收任务的处理结果,并消费处理,无返回结果。

1
2
3
public CompletableFuture<Void> thenAccept(Consumer<? super T> action);
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);

​ 当将多个任务连接起来执行时,有时最终是不需要返回结果,CompletableFuture中也提供了方法实现,thenAccept()使用与上述方法类似,接收任务执行结果,并使用,但其没有结果返回。

示例
1
2
3
4
5
6
7
8
public static void thenAcceptTest() throws Exception {
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
return new Random().nextInt(10);
}).thenAccept(x -> {
System.out.println(x);
});
future.get();
}

​ 从示例代码中可以看出,该方法只是消费执行完成的任务,并可以根据上面的任务返回的结果进行处理。并没有后续的输错操作。

thenRun

跟 thenAccept 方法不一样的是,不关心任务的处理结果。只要上面的任务执行完成,就开始执行 thenAccept ,thenRun它的入参是一个Runnable的实例,表示当得到上一步的结果时的操作。

1
2
3
public CompletableFuture<Void> thenRun(Runnable action);
public CompletableFuture<Void> thenRunAsync(Runnable action);
public CompletableFuture<Void> thenRunAsync(Runnable action,Executor executor);

​ thenRun()与thenAccept()使用基本相同,都是不会进行结果返回,但不同的是,thenRun()不关心方法是否有结 果,只要它完成,就会触发其执行。

示例
1
2
3
4
5
6
7
8
public static void thenRunTest() throws Exception {
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
return new Random().nextInt(10);
}).thenRun(() -> {
System.out.println("thenRun ...");
});
future.get();
}

​ 该方法同 thenAccept 方法类似。不同的是上个任务处理完成后,并不会把计算的结果传给 thenRun 方法。只是处理玩任务后,执行 thenAccept 的后续操作。

异步计算结果触发回调

whenComplete

当CompletableFuture的计算结果完成,或者抛出异常的时候,可以执行特定的Action。主要是下面的方法:

1
2
3
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)

​ 上面4个方法是当计算阶段结束的时候触发,BiConsumer有两个入参,分别代表计算返回值,另外一个是异常.无返回值.方法不以Async结尾,意味着Action使用相同的线程执行,而Async可能会使用其它的线程去执行(如果使用相同的线程池,也可能会被同一个线程选中执行)。

whenComplete 和 whenCompleteAsync 的区别

  • whenComplete:是执行当前任务的线程执行继续执行 whenComplete 的任务。
  • whenCompleteAsync:是执行把 whenCompleteAsync 这个任务继续提交给线程池来进行执行。
示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static void whenCompleteTest() throws Exception {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
if (new Random().nextInt() % 2 >= 0) {
int i = 12 / 0;
}
System.out.println("run end ...");
return 111;
});

future.whenComplete((x, e) -> {
System.out.println("执行完成!" + e.getMessage());
});
TimeUnit.SECONDS.sleep(2);
//如果出现异常打印不出来结果的
System.out.println("执行结果:" + future.get());
}

exceptionally

​ exceptionally()与上述两个方法类似,都是用于当异步任务结束后,执行特定处理,但不同的是,上述两个方法即 可以处理正常的返回结果也可以处理异常,而exceptionally()只对异常进行处理,且其使用的是主线程。

1
2
//该方法是对异常情况的处理,当函数异常时应该的返回值
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
示例

exceptionally比较复杂,需要通过4个实例才能真正明白

示例1

这段代码,由于会抛出异常,会先走whenCompleteAsync,然后再走exceptionally,而且是无法获取到返回值的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public static void exceptionallyTest1() throws Exception {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
if (new Random().nextInt() % 2 >= 0) {
int i = 12 / 0;
}
return 111;
});

future.whenCompleteAsync((result, exception) -> {
System.out.println("计算已执行完毕,result:" + result);
System.out.println("计算已执行完毕,exception:" + (exception == null ? "无异常" : exception.getClass()));
}).exceptionally(exception -> {
System.out.println("计算执行过程中发生了异常,exception:" + exception.getClass());
return 333;
});

System.out.println("执行到最后一段代码了,future result:" + future.get());
}

执行结果

1
2
3
............
计算已执行完毕,exception:class java.util.concurrent.CompletionException
计算执行过程中发生了异常,exception:class java.util.concurrent.CompletionException
示例2

这里的打印结果是和上面类似的,可是为什么这次要获取新的CompletableFuture对象呢?看下面的exceptionally实例3后,再回来对比吧

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public static void exceptionallyTest2() throws Exception {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
if (new Random().nextInt() % 2 >= 0) {
int i = 12 / 0;
}
return 111;
});

CompletableFuture<Integer> future2 = future.whenCompleteAsync((result, exception) -> {
System.out.println("计算已执行完毕,result:" + result);
System.out.println("计算已执行完毕,exception:" + (exception == null ? "无异常" : exception.getMessage()));
});

CompletableFuture<Integer> future3 = future2.exceptionally(exception -> {
System.out.println("计算执行过程中发生了异常,exception:" + exception.getMessage());
return 333;
});

System.out.println("执行到最后一段代码了,future result:" + future.get());
//因为上面的执行过程中,已经抛出了异常了,那么下面的这两段代码是无法执行到的,
System.out.println("执行到最后一段代码了,future2 result:" + future2.get());
System.out.println("执行到最后一段代码了,future3 result:" + future3.get());

}

执行结果

1
2
3
计算已执行完毕,result:null
计算已执行完毕,exception:java.lang.ArithmeticException: / by zero
计算执行过程中发生了异常,exception:java.lang.ArithmeticException: / by zero
示例3

这因为如果抛出了异常,future的get方法是执行不到的;而如果没有抛出异常的话,还是会返回原始的CompletableFuture的值的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public static void exceptionallyTest3() throws Exception {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
if (new Random().nextInt() % 2 >= 0) {
int i = 12 / 0;
}
return 111;
});

CompletableFuture<Integer> future2 = future.whenCompleteAsync((result, exception) -> {
System.out.println("计算已执行完毕,result:" + result);
System.out.println("计算已执行完毕,exception:" + (exception == null ? "无异常" : exception.getMessage()));
});

CompletableFuture<Integer> future3 = future2.exceptionally(exception -> {
System.out.println("计算执行过程中发生了异常,exception:" + exception.getMessage());
//这里的返回值实际其是没有用处的。因为如果抛出了异常,future的get方法是执行不到的;而如果没有抛出异常的话,还是会返回原始的CompletableFuture的值的
//所以这个exceptionally就是仅仅用来处理异常的。
return 333;
});

//System.out.println("执行到最后一段代码了,future result:" + future.get());
//System.out.println("执行到最后一段代码了,future2 result:" + future2.get());
//和上面实例2唯一的区别就是注释掉了上面两段代码,但是执行结果却不一样了,而且整个main方法都没有抛出来异常,原因就在于future和future2是异步执行的,所以是在别的线程抛了异常,而main方法是不会抛出来的。而且在获取future3的结果时,可以发现,返回了future3对象自定义的返回值
System.out.println("执行到最后一段代码了,future3 result:" + future3.get());

}

执行结果

1
2
3
4
计算已执行完毕,result:null
计算已执行完毕,exception:java.lang.ArithmeticException: / by zero
计算执行过程中发生了异常,exception:java.lang.ArithmeticException: / by zero
执行到最后一段代码了,future3 result:333
示例4
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public static void exceptionallyTest4() throws Exception {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
return 111;
});

CompletableFuture<Integer> future2 = future.whenCompleteAsync((result, exception) -> {
System.out.println("计算已执行完毕,result:" + result);
System.out.println("计算已执行完毕,exception:" + (exception == null ? "无异常" : exception.getMessage()));
});

CompletableFuture<Integer> future3 = future2.exceptionally(exception -> {
System.out.println("计算执行过程中发生了异常,exception:" + exception.getMessage());
return 0;
});

System.out.println("执行到最后一段代码了,future result:" + future.get());
System.out.println("执行到最后一段代码了,future2 result:" + future2.get());
//原始的计算逻辑不变,exceptionally返回的新的CompletableFuture对象的结果和原始计算逻辑返回的结果一致。
System.out.println("执行到最后一段代码了,future3 result:" + future3.get());

}

执行结果

1
2
3
4
5
执行到最后一段代码了,future result:111
计算已执行完毕,result:111
计算已执行完毕,exception:无异常
执行到最后一段代码了,future2 result:111
执行到最后一段代码了,future3 result:111

两个任务全部完成触发

在进行多异步任务执行时,有时不光要让任务之间串联执行,有时还要将多个任务执行结果进行合并处理, CompletableFuture中也提供了一些方法实现。

thenCombine

当两个异步任务都执行完毕后,它可以将两个任务进行合并,获取到两个任务的执行结果,进行合并处理,最后会 有返回值。

1
2
3
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);

​ 两个CompletionStage是并行执行的,它们之间并没有先后依赖顺序,other并不会等待先前的CompletableFuture执行完毕后再执行。

​ thenCombine 和 supplyAsync 不一定哪个先哪个后,是并行执行的。

示例
1
2
3
4
5
6
7
8
9
10
11
12
private static void thenCombineTest() throws Exception {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
return "hello";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
return "word";
});
CompletableFuture<String> result = future1.thenCombine(future2, (x, y) -> {
return x + " " + y;
});
System.out.println(result.get());
}

thenAcceptBoth

当两个CompletableFuture都执行完成后,把结果一块交给thenAcceptBoth来进行处理

thenAcceptBoth()使用与thenCombine()类似,当两个任务执行完,获取两个任务的结果进行特定处理,但 thenAcceptBoth()没有返回值

1
2
3
public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor);

​ thenAccept 相比,参数类型多了一个 CompletionStage<? extends U> other,以上方法会接收上一个CompletionStage返回值,和当前的一个。

示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private static void thenAcceptBothTest() throws Exception {
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f1=" + t);
return t;
});

CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f2=" + t);
return t;
});
CompletableFuture future = f1.thenAcceptBoth(f2, (x, y) -> {
System.out.println("f1=" + x + ";f2=" + y + ";");
});
System.out.println(future.get());
}

runAfterBoth

当两个任务执行完毕,触发特定任务处理,但不要两个异步任务结果,且不会进行值返回。

runAfterBoth 和以上方法不同,传一个 Runnable 类型的参数,不接收上一级的返回值

并且两个CompletionStage,都完成了计算才会执行下一步的操作(Runnable)

1
2
3
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,Runnable action);
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action);
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor);
示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private static void runAfterBothTest() throws Exception {
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f1=" + t);
return t;
});

CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f2=" + t);
return t;
});
CompletableFuture future = f1.runAfterBoth(f2, () -> {
System.out.println("上面两个任务都执行完成了。");
});
System.out.println(future.get());
}

两个任务任意一个完成触发

applyToEither

总会碰到有两种渠道完成同一个事情,所以就可以调用这个方法,找一个最快的结果进行处理。

两个CompletionStage,谁执行返回的结果快,我就用那个CompletableFuture的结果进行下一步的转化操作。

1
2
3
public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor);
示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private static void applyToEitherTest() throws Exception {
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f1=" + t);
return t;
});
CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f2=" + t);
return t;
});

CompletableFuture<Integer> result = f1.applyToEither(f2, (x) -> {
System.out.println(x);
return x * 2;
});

System.out.println(result.get());
}

acceptEither

acceptEither()的使用效果与applyToEither()类似,但acceptEither()没有返回值

两个CompletionStage,谁执行返回的结果快,我就用那个CompletionStage的结果进行下一步的消耗操作。

1
2
3
public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action,Executor executor);
示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private static void acceptEitherTest() throws Exception {
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f1=" + t);
return t;
});

CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f2=" + t);
return t;
});
f1.acceptEither(f2, (x) -> {
System.out.println(x);
});
}

runAfterEither

当两个任务执行,只要有一个任务执行完,则触发特定处理执行,无需使用异步任务的执行结果,且特定处理不会 进行值的返回。

两个CompletionStage,任何一个完成了都会执行下一步的操作(Runnable)

1
2
3
public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,Runnable action);
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action);
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor);
示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private static void runAfterEitherTest() throws Exception {
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f1=" + t);
return t;
});

CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f2=" + t);
return t;
});
f1.runAfterEither(f2, () -> {
System.out.println("上面有一个已经完成了。");
});
}

多任务组合执行

刚才的操作异步任务的数量,只能局限在两个,现在如果需要有任意多个异步任务进行组合操作的话, CompletableFuture中也提供了对应方法进行实现

allOf

这个方法的意思是把有方法都执行完才往下执行,没有返回值

1
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public static void complateFuturAllOf() throws Exception {
Random random = new Random();
CompletableFuture.allOf(
CompletableFuture.runAsync(() -> {
try {
Thread.sleep(random.nextInt(5000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(1);
}),
CompletableFuture.runAsync(() -> {
try {
Thread.sleep(random.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(2);
}))
.get();
}

anyOf

任务一个方法执行完都往下执行,返回一个Object类型的值

1
public static CompletableFuture anyOf(CompletableFuture<?>… cfs)
示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public static void complateFuturAnyOf() throws Exception {
Random random = new Random();
Object o = CompletableFuture.anyOf(
CompletableFuture.runAsync(() -> {
try {
Thread.sleep(random.nextInt(4000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(1);
}),
CompletableFuture.runAsync(() -> {
try {
Thread.sleep(random.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(2);
}))
.get();
System.out.println(o);
}

评论