抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

java并发工具类-Callable、Future 和FutureTask

前言

创建线程的2种方式,一种是直接继承Thread,另外一种就是实现Runnable接口。

这2种方式都有一个缺陷就是:在执行完任务之后无法获取执行结果。
如果需要获取执行结果,就必须通过共享变量或者使用线程通信的方式来达到效果,这样使用起来就比较麻烦。

而自从Java 1.5开始,就提供了CallableFuture,通过它们可以在任务执行完毕之后得到任务执行结果

Callable接口

Callable位于JUC包下,它也是一个接口,在它里面也只声明了一个方法叫做call():

1
2
3
4
5
6
7
8
9
10
@FunctionalInterface
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}

​ Callable接口代表一段可以调用并返回结果的代码。

​ Future接口表示异步任务,是还没有完成的任务给出的未来结果。所以说Callable用于产生结果,Future用于获取结果。

​ Callable接口使用泛型去定义它的返回类型。

Executors类提供了一些有用的方法在线程池中执行Callable内的任务。由于Callable任务是并行的(并行就是整体看上去是并行的,其实在某个时间点只有一个线程在执行),我们必须等待它返回的结果。
java.util.concurrent.Future对象为我们解决了这个问题。在线程池提交Callable任务后返回了一个Future对象,使用它可以知道Callable任务的状态和得到Callable返回的执行结果。Future提供了**get()**方法让我们可以等待Callable结束并获取它的执行结果。

那么怎么使用Callable呢?一般情况下是配合ExecutorService来使用的,在ExecutorService接口中声明了若干个submit方法的重载版本:

1
2
3
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);

第一个方法:submit提交一个实现Callable接口的任务,并且返回封装了异步计算结果的Future。

第二个方法:submit提交一个实现Runnable接口的任务,并且指定了在调用Future的get方法时返回的result对象。

第三个方法:submit提交一个实现Runnable接口的任务,并且返回封装了异步计算结果的Future。

因此我们只要创建好我们的线程对象(实现Callable接口或者Runnable接口),然后通过上面3个方法提交给线程池去执行即可。

Future接口

​ Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果

​ Future接口是用来获取异步计算结果的,说白了就是对具体的Runnable或者Callable对象任务执行的结果进行获取(get()),取消(cancel()),判断是否完成等操作。我们看看Future接口的源码:

1
2
3
4
5
6
7
8
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

在Future接口中声明了5个方法,下面依次解释每个方法的作用

方法 作用
cance(boolean mayInterruptIfRunning) 试图取消执行的任务,参数为true时直接中断正在执行的任务,否则直到当前任务执行完成,成功取消后返回true,否则返回false
isCancelled() 方法表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true。
isDone() 方法表示任务是否已经完成,若任务完成,则返回true;
get() 方法用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回;
get(long timeout, TimeUnit unit) 设定计算结果的返回时间,如果在规定时间内没有返回计算结果则抛出TimeOutException

Future提供了三种功能

  • 判断任务是否完成;
  • 能够中断任务;
  • 能够获取任务执行结果。

因为Future只是一个接口,所以是无法直接用来创建对象使用的,因此就有了下面的FutureTask。

RunnableFuture接口

RunnableFuture实现了Runnable和Future。因此FutureTask可以传递到线程对象Thread或Excutor(线程池)来执行。

1
2
3
4
5
6
7
8
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}

FutureTask

我们先来看一下FutureTask的实现:

1
public class FutureTask<V> implements RunnableFuture<V> 

FutureTask类实现了RunnableFuture接口,我们看一下RunnableFuture接口的实现

1
public interface RunnableFuture<V> extends Runnable, Future<V>

可以看出RunnableFuture继承了Runnable接口和Future接口,而FutureTask实现了RunnableFuture接口。所以它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。

分析

FutureTask除了实现了Future接口外还实现了Runnable接口,因此FutureTask也可以直接提交给Executor执行。 当然也可以调用线程直接执行(FutureTask.run())。接下来我们根据FutureTask.run()的执行时机来分析其所处的3种状态:

  1. 未启动,FutureTask.run()方法还没有被执行之前,FutureTask处于未启动状态,当创建一个FutureTask,而且没有执行FutureTask.run()方法前,这个FutureTask也处于未启动状态。

  2. 已启动,FutureTask.run()被执行的过程中,FutureTask处于已启动状态。

  3. 已完成,FutureTask.run()方法执行完正常结束,或者被取消或者抛出异常而结束,FutureTask都处于完成状态。

下面我们再来看看FutureTask的方法执行示意图(方法和Future接口基本是一样的,这里就不过多描述了)

  • 当FutureTask处于未启动或已启动状态时,如果此时我们执行FutureTask.get()方法将导致调用线程阻塞;当FutureTask处于已完成状态时,执行FutureTask.get()方法将导致调用线程立即返回结果或者抛出异常。

  • 当FutureTask处于未启动状态时,执行FutureTask.cancel()方法将导致此任务永远不会执行。当FutureTask处于已启动状态时,执行cancel(true)方法将以中断执行此任务线程的方式来试图停止任务,如果任务取消成功,cancel(…)返回true;但如果执行cancel(false)方法将不会对正在执行的任务线程产生影响(让线程正常执行到完成),此时cancel(…)返回false。当任务已经完成,执行cancel(…)方法将返回false。

最后我们给出FutureTask的两种构造函数:

1
2
3
4
public FutureTask(Callable<V> callable) {
}
public FutureTask(Runnable runnable, V result) {
}

事实上,FutureTask是Future接口的一个唯一实现类。

使用创景

​ 通过上面的介绍,我们对Callable,Future,RunnableFuture,FutureTask都有了比较清晰的了解了,那么它们到底有什么用呢?我们前面说过通过这样的方式去创建线程的话,最大的好处就是能够返回结果,加入有这样的场景,我们现在需要计算一个数据,而这个数据的计算比较耗时,而我们后面的程序也要用到这个数据结果,那么这个时Callable岂不是最好的选择?我们可以开设一个线程去执行计算,而主线程继续做其他事,而后面需要使用到这个数据时,我们再使用Future获取不就可以了吗?下面我们就来编写一个这样的实例。

多任务计算

​ 利用FutureTask和ExecutorService,可以用多线程的方式提交计算任务,主线程继续执行其他任务,当主线程需要子线程的计算结果时,在异步获取子线程的执行结果。

​ 例如主线程执行到需要发邮件,发短信的环节,发送完成后才能继续下一次任务,对于发短信发邮件可以使用FutureTask进行多线程进行并行执行,通过get来阻塞返回结果,如果发邮件需要5s,发短信需要8s,传统的方式需要13s,使用FutureTask异步执行只需要8s。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package chapter02.future;

import util.ThreadUtils;

import java.util.concurrent.*;

/**
* 发送消息的FutuerTask
* 将原来发送邮件发送短信的串行任务改为FutuerTask的并行任务提高CPU利用率以及速度
*/
public class FutuerSendMessage {
private static ExecutorService executorService = Executors.newFixedThreadPool(10);

public static void main(String[] args) throws ExecutionException, InterruptedException {
FutuerSendMessage futuerSendMessage = new FutuerSendMessage();
futuerSendMessage.sendmessage();
}

public void sendmessage() throws ExecutionException, InterruptedException {
long currentTime = System.currentTimeMillis();
//发送邮件
Future<Boolean> sendMailFuture = executorService.submit(() -> {
return sendMail();
});
//发送短信
Future<Boolean> sendMsgFuture = executorService.submit(() -> {
return sendMsg();
});
System.out.println("提交任务到线程池,耗时:" + (System.currentTimeMillis() - currentTime));
System.out.println("主线程继续任务...");
//如果发送都成功了
if (sendMailFuture.get() && sendMsgFuture.get()) {
System.out.println("消息发送成功,耗时:" + (System.currentTimeMillis() - currentTime));
}
executorService.shutdown();
}

/**
* 发送邮件
*
* @return
*/
private boolean sendMail() {
//睡眠5s
ThreadUtils.sleep(5, TimeUnit.SECONDS);
System.out.println("发送邮件");
return true;
}

/**
* 发送短信
*
* @return
*/
private boolean sendMsg() {
//睡眠8s
ThreadUtils.sleep(8, TimeUnit.SECONDS);
System.out.println("发送短信");
return true;
}

//-----------------------------原始的代码---------------

public void orginSendmessage() {
long currentTime = System.currentTimeMillis();
boolean sendMail = sendMail();
boolean sendMsg = sendMsg();
if (sendMail && sendMsg) {
System.out.println("消息发送成功,耗时:" + (System.currentTimeMillis() - currentTime));
}
}
}

高并发环境下确保任务只执行一次

​ 在很多高并发的环境下,往往我们只需要某些任务只执行一次。这种使用情景FutureTask的特性恰能胜任。举一个例子,假设有一个map的缓存,当key存在时,即直接返回key对应的对象;当key不存在时,则创创建一个对象。对于这样的应用场景,通常采用的方法为使用一个Map对象来存储key和换粗你对象的对应关系,典型的代码如下面所示:

原始的方式如下

1
2
3
4
5
6
7
8
9
10
private Map<String, String> orginCacheMap = new HashMap<String, String>();
public synchronized String getOrginValue(String key) {
if (cacheMap.containsKey(key)) {
return orginCacheMap.get(key);
} else {
String cacheValue = createCache();
orginCacheMap.putIfAbsent(key, cacheValue);
return cacheValue;
}
}

​ 在上面的例子中,我们通过加锁确保高并发环境下的线程安全,也确保了缓存对象只创建一次,然而确牺牲了性能。改用ConcurrentHash的情况下,几乎可以避免加锁的操作,性能大大提高,但是在高并发的情况下有可能出现Connection被创建多次的现象。这时最需要解决的问题就是当key不存在时,创建缓存对象的动作能放在connectionPool之后执行,这正是FutureTask发挥作用的时机,基于ConcurrentHashMap和FutureTask的改造代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package chapter02.future;

import util.ThreadUtils;

import java.sql.Connection;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.*;

/**
* FutuerTask 多线程下只进行一次读取
*/
public class FutuerOnceExec {
private Map<String, String> orginCacheMap = new HashMap<String, String>();
private Map<String, Future<String>> cacheMap = new ConcurrentHashMap<String, Future<String>>();
private ExecutorService executorService = Executors.newSingleThreadScheduledExecutor();

public static void main(String[] args) {
FutuerOnceExec futuerOnceExec = new FutuerOnceExec();
long currentTime = System.currentTimeMillis();
String value = futuerOnceExec.getValue("xx");
System.out.println("value:" + value + ",第一次获取耗时:" + (System.currentTimeMillis() - currentTime));
value = futuerOnceExec.getValue("xx");
System.out.println("value:" + value + ",第二次获取耗时:" + (System.currentTimeMillis() - currentTime));
value = futuerOnceExec.getValue("xx");
System.out.println("value:" + value + ",第三次获取耗时:" + (System.currentTimeMillis() - currentTime));
}


/**
* 获取缓存数据
*
* @param key
* @return
*/
public String getValue(String key) {
Future<String> futureValue = cacheMap.get(key);
//如果已经存在Future了直接get 完成的Future是不会阻塞的
if (null != futureValue) {
return getFutureValue(futureValue);
} else {
//使用异步线程常见Future任务
futureValue = executorService.submit(() -> {
ThreadUtils.sleep(1, TimeUnit.SECONDS);
return createCache();
});
executorService.shutdown();
cacheMap.putIfAbsent(key, futureValue);
}
//获取future中的值
return getFutureValue(futureValue);
}

/**
* 获取 Future中的值
* get的时候可能会阻塞
*
* @param futureValue
* @return
*/
private String getFutureValue(Future<String> futureValue) {
try {
return futureValue.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
return null;
}

private String createCache() {
//todo 业务代码
return "缓存对象";
}


//--------------------原始的方式

public synchronized String getOrginValue(String key) {
if (cacheMap.containsKey(key)) {
return orginCacheMap.get(key);
} else {
String cacheValue = createCache();
orginCacheMap.putIfAbsent(key, cacheValue);
return cacheValue;
}
}
}

总结

实现Runnable接口和实现Callable接口的区别:

  1. Runnable是自从java1.1就有了,而Callable是1.5之后才加上去的。

  2. Callable规定的方法是call(),Runnable规定的方法是run()。

  3. Callable的任务执行后可返回值,而Runnable的任务是不能返回值(是void)。

  4. call方法可以抛出异常,run方法不可以。

  5. 运行Callable任务可以拿到一个Future对象,表示异步计算的结果。它提供了检查计算是否完成的方法,以等待计算的完成,并检索计算的结果。通过Future对象可以了解任务执行情况,可取消任务的执行,还可获取执行结果。

  6. 加入线程池运行,Runnable使用ExecutorService的execute方法,Callable使用submit方法。

Callable、Runnable、Future和FutureTask 的区别

  1. Callable、Runnable、Future和FutureTask 做为java 线程池运行的重要载体,有必要深入理解。

  2. Callable 和 Runnable 都是执行的任务的接口,区别在于Callable有返回值,而Runnable无返回值。

  3. Future 表示异步任务返回结果的接口

  4. RunnableFuture 继承了Runnable, Future,表示可以带有返回值的run接口

  5. FutureTask是一个实现类,实现了RunnableFuture接口,既能接受Runnable类型的任务,也可以接受Callable类型的任务,这个类的作用主要是 有一个protected void done()方法用来扩展使用,作为一个回调方法

评论