抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

JAVA线程池实现02-提交任务

submit提交任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* 提交一个带有返回值的任务
* @param task 任务
* @param result 结果
* @param <T> 泛型
* @return Future
*/
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
//调用execute进行执行
execute(ftask);
return ftask;
}

/**
* 创建一个FutureTask
* @param runnable 运行的任务
* @param value 返回结果
* @param <T> 泛型
* @return FutureTask
*/
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}

流程步骤如下

  1. 调用submit方法,传入Runnable或者Callable对象
  2. 判断传入的对象是否为null,为null则抛出异常,不为null继续流程
  3. 将传入的对象转换为RunnableFuture对象
  4. 执行execute方法,传入RunnableFuture对象
  5. 返回RunnableFuture对象

execute 执行线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/**
* 在未来执行任务
* 任务将新建或者现有的线程池中执行
* 如果线程池关闭或者线程池满了将执行拒绝策略
* @param command
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();

int c = ctl.get();
/**
* 1、运行线程数少于核心线程数,则调用addWorker启动一个新的线程
* 需要检查否应该添加线程
*/
if (workerCountOf(c) < corePoolSize) {
//添加线程
if (addWorker(command, true)) {
return;
}
c = ctl.get();
}
/**
* 运行线程数量大于核心线程数量时,上面的if分支针对大于corePoolSize,并且缓存队列加入任务操作成功的情况。
* 运行中并且将任务加入缓冲队列成功,正常来说这样已经完成了处理逻辑。
* 但是为了保险起见,增加了状态出现异常的确认判断,如果状态出现异常会继续remove操作,如果执行true,则按照拒绝处理策略驳回任务;
*/
//线程运行状态,并且添加进队列成功
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//线程未运行并且删除成功
if (! isRunning(recheck) && remove(command))
//拒绝任务
reject(command);
//线程正在运行中
else if (workerCountOf(recheck) == 0)
//添加任务
addWorker(null, false);
}
/**
* 这里针对运行线程数量超过了corePoolSize,并且缓存队列也已经放满的情况。
* 注意第二个参数是false,可以在下面addWorker方法看到,就是针对线程池最大线程数量maximumPoolSize的判断。
*/
else if (!addWorker(command, false))
//拒绝任务
reject(command);
}

其实从上面代码注释中可以看出就三个判断,

  1. 核心线程数是否已满
  2. 队列是否已满
  3. 线程池是否已满

  1. 调用execute方法,传入Runable对象
  2. 判断传入的对象是否为null,为null则抛出异常,不为null继续流程
  3. 获取当前线程池的状态和线程个数变量
  4. 判断当前线程数是否小于核心线程数,是走流程5,否则走流程6
  5. 添加线程数,添加成功则结束,失败则重新获取当前线程池的状态和线程个数变量,
  6. 判断线程池是否处于RUNNING状态,是则添加任务到阻塞队列,否则走流程10,添加任务成功则继续流程7
  7. 重新获取当前线程池的状态和线程个数变量
  8. 重新检查线程池状态,不是运行状态则移除之前添加的任务,有一个false走流程9,都为true则走流程11
  9. 检查线程池线程数量是否为0,否则结束流程,是调用addWorker(null, false),然后结束
  10. 调用!addWorker(command, false),为true走流程11,false则结束
  11. 调用拒绝策略reject(command),结束

addWorker 增加工作线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
/**
* 添加工作线程
* @param firstTask 任务
* @param core 是否是核心线程
* @return
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
//自旋
for (;;) {
int c = ctl.get();
//获取运行状态
int rs = runStateOf(c);

// 检查当前线程池状态是否是SHUTDOWN、STOP、TIDYING或者TERMINATED
// 且!(当前状态为SHUTDOWN、且传入的任务为null,且队列不为null)
// 条件都成立则返回false
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty())) {
return false;
}
//有一个自旋
for (;;) {
//获取工作线程数
int wc = workerCountOf(c);
/**
* 工作线程数 >= 队列容量 返回fasle
* 如果是核心线程 工作线程数>=核心线程数 返回false
* 如果不是核心线程 工作线程数>=最大线程数 返回false
*/
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize)) {
return false;
}
//CAS增加c,成功则跳出retry
if (compareAndIncrementWorkerCount(c)) {
break retry;
}
c = ctl.get(); // Re-read ctl
//CAS失败执行下面方法,查看当前线程数是否变化,变化则继续retry循环,没变化则继续内部循环
if (runStateOf(c) != rs) {
continue retry;
}
// else CAS failed due to workerCount change; retry inner loop
}
}
//CAS成功

//工作线程状态
boolean workerStarted = false;
//工作线程添加状态
boolean workerAdded = false;
Worker w = null;
try {
//创建一个工作线程
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
//获取重入锁
final ReentrantLock mainLock = this.mainLock;
//加锁
mainLock.lock();
try {
//重新检查线程池状态
//避免ThreadFactory退出故障或者在锁获取前线程池被关闭
int rs = runStateOf(ctl.get());
//再次检查线程池状态 ???
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
//检查thread的状态
if (t.isAlive()) { // precheck that t is startable
throw new IllegalThreadStateException();
}
//任务列表添加任务
workers.add(w);
//获取任务列表大小
int s = workers.size();
//最大线程数 计数
if (s > largestPoolSize) {
largestPoolSize = s;
}
//线程添加成功
workerAdded = true;
}
} finally {
//解锁
mainLock.unlock();
}
//判断worker是否添加成功,成功则启动线程,然后将workerStarted设置为true
if (workerAdded) {
t.start();
//启动状态成功
workerStarted = true;
}
}
} finally {
//判断线程有没有启动成功,没有则调用addWorkerFailed方法
if (! workerStarted) {
addWorkerFailed(w);
}
}
//返回任务启动状态
return workerStarted;
}

这里可以将addWorker分为两部分,第一部分增加线程池个数,第二部分是将任务添加到workder里面并执行。

第一部分主要是两个循环,外层循环主要是判断线程池状态

1
2
3
4
rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty())

展开!运算后等价于

1
2
3
4
s >= SHUTDOWN &&
(rs != SHUTDOWN ||
firstTask != null ||
workQueue.isEmpty())

也就是说下面几种情况下会返回false:

  • 当前线程池状态为STOP,TIDYING,TERMINATED
  • 当前线程池状态为SHUTDOWN并且已经有了第一个任务
  • 当前线程池状态为SHUTDOWN并且任务队列为空

内层循环作用是使用cas增加线程个数,如果线程个数超限则返回false,否者进行cas,cas成功则退出双循环,否者cas失败了,要看当前线程池的状态是否变化了,如果变了,则重新进入外层循环重新获取线程池状态,否者进入内层循环继续进行cas尝试。

到了第二部分说明CAS成功了,也就是说线程个数加一了,但是现在任务还没开始执行,这里使用全局的独占锁来控制workers里面添加任务,其实也可以使用并发安全的set,但是性能没有独占锁好(这个从注释中知道的)。这里需要注意的是要在获取锁后重新检查线程池的状态,这是因为其他线程可可能在本方法获取锁前改变了线程池的状态,比如调用了shutdown方法。添加成功则启动任务执行。

所以这里也将流程图分为两部分来描述

第一部分流程图

第二部分流程图

这里面有一个核心的工作类 Worker

AQS的Worker工作任务

这个类继承了抽象队列同步器 是标准的AQS线程安全的类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
/**
* 工作任务对象
* 继承了AQS 抽象队列同步器 以及 Runnable 接口
*/
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;

/**
* Thread this worker is running in. Null if factory fails.
*/
//正在运行的线程,工厂创建线程失败则为null
final Thread thread;
/**
* Initial task to run. Possibly null.
*/
//运行的初始任务,可能为null
Runnable firstTask;
/**
* Per-thread task counter
*/
//完成任务的计数器
volatile long completedTasks;

/**
* Creates with given first task and thread from ThreadFactory.
*
* @param firstTask the first task (null if none)
*/
//构造方法
Worker(Runnable firstTask) {
//设置状态为未运行
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
//使用线程工厂创建线程
this.thread = getThreadFactory().newThread(this);
}

/**
* Delegates main run loop to outer runWorker
*/
//实现Runnable的run方法
@Override
public void run() {
//运行任务方法
runWorker(this);
}

// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.

/*是否是独占的
* @return 0 未锁 1 已锁定
*/
protected boolean isHeldExclusively() {
return getState() != 0;
}

/**
* 尝试获取占用权
* @param unused
* @return
*/
protected boolean tryAcquire(int unused) {
//CAS 设置锁定状态
if (compareAndSetState(0, 1)) {
//设置持有者是当前线程
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

/**
* 尝试释放锁
* @param unused
* @return
*/
protected boolean tryRelease(int unused) {
//设置是持有者为null
setExclusiveOwnerThread(null);
//设置锁定状态为 未锁定
setState(0);
return true;
}

/**
* 加锁
*/
public void lock() {
acquire(1);
}

/**
* 尝试获取锁
* @return
*/
public boolean tryLock() {
return tryAcquire(1);
}
//释放锁
public void unlock() {
release(1);
}

/**
* 释放
* @return
*/
public boolean isLocked() {
return isHeldExclusively();
}

/**
* 中断启动
*/
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}

这个类很值得学习,里面最核心的方法是 runWorker 方法

runWorker方法

运行任务的主体,通过循环从阻塞队列中拿任务,进行执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
/**
* 运行任务
* @param w 任务
*/
final void runWorker(Worker w) {
//获取当前线程
Thread wt = Thread.currentThread();
//获取任务 task
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
//是否突然完成任务(异常,或者其他情况)
boolean completedAbruptly = true;
try {
//循环获取任务
while (task != null || (task = getTask()) != null) {
//加锁
w.lock();

// 当线程池是处于STOP状态或者TIDYING、TERMINATED状态时,设置当前线程处于中断状态
// 如果不是,当前线程就处于RUNNING或者SHUTDOWN状态,确保当前线程不处于中断状态
// 重新检查当前线程池的状态是否大于等于STOP状态
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) {
wt.interrupt();
}
try {
//线程执行前执行一些任务,在ThreadPoolExecutor是空实现
beforeExecute(wt, task);
Throwable thrown = null;
try {
//运行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//完成任务后执行一些任务,在ThreadPoolExecutor是空实现
afterExecute(task, thrown);
}
} finally {
//完成任务task置为空,交给GC处理
task = null;
//完成任务计数器+1
w.completedTasks++;
//解锁
w.unlock();
}
}
/**
* 正常完成任务为false
* 否则completedAbruptly 为true
*/
completedAbruptly = false;
} finally {
//整个线程结束时调用,线程退出操作。统计整个线程池完成的任务个数之类的工作
processWorkerExit(w, completedAbruptly);
}
}

这里面有两个核心方法

  • getTask:从队列中获取任务
  • processWorkerExit:处任务并退出

我们先从getTask开始

getTask 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
 /**
* 获取待执行的任务
*
* @return
*/
private Runnable getTask() {
//最后一次poll()是否超时
boolean timedOut = false;
//自旋
for (; ; ) {
int c = ctl.get();
//获取运行状态
int rs = runStateOf(c);

//线程不在运行状态并且队列为空
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
//使用CAS进行工作任务数-1
decrementWorkerCount();
return null;
}
//获取当前工作任务数
int wc = workerCountOf(c);

/**
* 是否进行任务淘汰 如果 allowCoreThreadTimeOut为true 就一直淘汰下去
*/
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//(当前线程数是否大于最大线程数或者)
//且(线程数大于1或者任务队列为空)
//这里有个问题(timed && timedOut)timedOut = false,好像(timed && timedOut)一直都是false吧
if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
//CAS方式进行工作线程-1
if (compareAndDecrementWorkerCount(c)) {
return null;
}
continue;
}

try {
/**
* 如果需要淘汰淘汰从工作先队列中在指定keepAliveTime时间内获取一个工作线程否则返回null
* 否则工作线程池为空就一直等待
*/
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
if (r != null) {
return r;
}
//如果获取超时设置超时时间为true
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

接下来我们分析下processWorkerExit方法

processWorkerExit 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
* 处理完成后续的线程统计工作
* 删除完成工作的线程
* @param w 工作线程
* @param completedAbruptly 是否突然完成(异常情况)
*/
private void processWorkerExit(Worker w, boolean completedAbruptly) {
//如果突然完成,工作线程数统计未统计
if (completedAbruptly) { // If abrupt, then workerCount wasn't adjusted
//重新对工作线程数-1
decrementWorkerCount();
}
//获取锁
final ReentrantLock mainLock = this.mainLock;
//加锁
mainLock.lock();
try {
//完成任务数统计
completedTaskCount += w.completedTasks;
//从工作任务队列删除队列
workers.remove(w);
} finally {
//解锁
mainLock.unlock();
}
//尝试终止线程池
tryTerminate();

int c = ctl.get();
//正在运行或者停止
if (runStateLessThan(c, STOP)) {
//没有突然完成
if (!completedAbruptly) {
// 计算最小工作线程,如果allowCoreThreadTimeOut为true 就是 0 否则就是核心线程数
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
//如果最小线程为0并且工作任务队列不为空则设置最小线程数为1
if (min == 0 && !workQueue.isEmpty()) {
min = 1;
}
//如果工作线程数>=最小线程数返回
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}

到这里为止,submit 和 execute已经分析完成了。

评论