抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

java并发工具类-Fork-Join

Fork-Join

​ java下多线程的开发可以我们自己启用多线程,线程池,还可以使用forkjoin,forkjoin可以让我们不去了解诸如Thread,Runnable等相关的知识,只要遵循forkjoin的开发模式,就可以写出很好的多线程并发程序,

Fork-Join 是什么

​ Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。

分而治之

​ “分而治之” 一直是一个有效的处理大量数据的方法。著名的 MapReduce 也是采取了分而治之的思想。简单来说,就是如果你要处理1000个数据,但是你并不具备处理1000个数据的能力,那么你可以只处理其中的10个,然后,分阶段处理100次,将100次的结果进行合成,那就是最终想要的对原始的1000个数据的处理结果。

​ 同时forkjoin在处理某一类问题时非常的有用,哪一类问题?分而治之的问题。十大计算机经典算法:快速排序、堆排序、归并排序、二分查找、线性查找、

深度优先、广度优先、Dijkstra、动态规划、朴素贝叶斯分类,有几个属于分而治之?3个,快速排序、归并排序、二分查找,还有大数据中M/R都是。

分治法的设计思想是:将一个难以直接解决的大问题,分割成一些规模较小的相同问题,以便各个击破,分而治之。

分治策略是:对于一个规模为n的问题,若该问题可以容易地解决(比如说规模n较小)则直接解决,否则将其分解为k个规模较小的子问题,这些子问题互相独立且与原问题形式相同(子问题相互之间有联系就会变为动态规范算法),递归地解这些子问题,然后将各子问题的解合并得到原问题的解。这种算法设计策略叫做分治法。

归并排序

​ 归并排序是建立在归并操作上的一种有效的排序算法。该算法是采用分治法的一个非常典型的应用。将已有序的子序列合并,得到完全有序的序列;即先使每个子序列有序,再使子序列段间有序。

若将两个有序表合并成一个有序表,称为2-路归并,与之对应的还有多路归并。

​ 对于给定的一组数据,利用递归与分治技术将数据序列划分成为越来越小的半子表,在对半子表排序后,再用递归方法将排好序的半子表合并成为越来越大的有序序列。

​ 为了提升性能,有时我们在半子表的个数小于某个数(比如15)的情况下,对半子表的排序采用其他排序算法,比如插入排序。

Fork-Join原理

Fork/Join框架要完成两件事情:

任务分割

​ Fork/Join框架的基本思想就是将一个大任务分解(Fork)成一系列子任务,子任务可以继续往下分解,当多个不同的子任务都执行完成后,可以将它们各自的结果合并(Join)成一个大结果,最终合并成大任务的结果:

ForkJoinTask

基本任务,使用forkjoin框架必须创建的对象,提供fork,join操作,常用的两个子类

  • RecursiveAction : 无结果返回的任务
  • RecursiveTask : 有返回结果的任务
说明:
  • fork : 让task异步执行

  • join : 让task同步执行,可以获取返回值

  • ForkJoinTask 在不显示使用ForkJoinPool.execute/invoke/submit()方法进行执行的情况下,也可以使用自己的fork/invoke方法进行执行

结果合并

ForkJoinPool 执行 ForkJoinTask
  • 任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。
  • 当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务
三中提交方式:
  • execute: 异步,无返回结果

  • submit :异步,有返回结果 (返回Future<T>

  • invoke :同步,有返回结果 (会阻塞)

工作密取

​ 即当前线程的Task已经全被执行完毕,则自动取到其他线程的Task池中取出Task继续执行。

ForkJoinPool中维护着多个线程(一般为CPU核数)在不断地执行Task,每个线程除了执行自己职务内的Task之外,还会根据自己工作线程的闲置情况去获取其他繁忙的工作线程的Task,如此一来就能能够减少线程阻塞或是闲置的时间,提高CPU利用率。

Fork/Join使用

​ 我们要使用ForkJoin框架,必须首先创建一个ForkJoin任务。它提供在任务中执行fork和join的操作机制,通常我们不直接继承ForkjoinTask类,只需要直接继承其子类。

  1. RecursiveAction,用于没有返回结果的任务

  2. RecursiveTask,用于有返回值的任务

task要通过ForkJoinPool来执行,使用submit 或 invoke 提交,两者的区别是:invoke是同步执行,调用之后需要等待任务完成,才能执行后面的代码;submit是异步执行。

join()和get方法当任务完成的时候返回计算结果。

​ 在我们自己实现的compute方法里,首先需要判断任务是否足够小,如果足够小就直接执行任务。如果不足够小,就必须分割成两个子任务,每个子任务在调用invokeAll方法时,又会进入compute方法,看看当前子任务是否需要继续分割成孙任务,如果不需要继续分割,则执行当前子任务并返回结果。使用join方法会等待子任务执行完并得到其结果。

Fork/Join的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
public class SubmitTask extends RecursiveTask<Long> {
/**
* 起始值
*/
private long start;
/**
* 结束值
*/
private long end;
/**
* 阈值
*/
private long threshold = 10L;

public SubmitTask(long start, long end) {
this.start = start;
this.end = end;
}

/**
* 计算逻辑
*
* @return
*/
@Override
protected Long compute() {
//校验是否达到了阈值
if (isLessThanThreshold()) {
//处理并返回结果
return handle();
} else {
//没有达到阈值 计算一个中间值
long mid = (start + end) / 2;
//拆分 左边的
SubmitTask left = new SubmitTask(start, mid);
//拆分右边的
SubmitTask right = new SubmitTask(mid + 1, end);
//添加到任务列表
invokeAll(left, right);
//合并结果并返回
return left.join() + right.join();
}
}

/**
* 处理的任务
*
* @return
*/
public Long handle() {
long sum = 0;
for (long i = start; i <= end; i++) {
sum += i;
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return sum;
}

/*是否达到了阈值*/
private boolean isLessThanThreshold() {
return end - start <= threshold;
}

/**
* forkJoin 方式调用
*
* @param start
* @param end
*/
public static void forkJoinInvok(long start, long end) {
long sum = 0;
long currentTime = System.currentTimeMillis();
//创建ForkJoinPool 连接池
ForkJoinPool forkJoinPool = new ForkJoinPool();
//创建初始化任务
SubmitTask submitTask = new SubmitTask(start, end);
//讲初始任务扔进连接池中执行
forkJoinPool.invoke(submitTask);
//等待返回结果
sum = submitTask.join();
System.out.println("forkJoin调用:result:" + sum);
System.out.println("forkJoin调用耗时:" + (System.currentTimeMillis() - currentTime));
}

/**
* 普通方式调用
*
* @param start
* @param end
*/
public static void normalInvok(long start, long end) {
long sum = 0;
long currentTime = System.currentTimeMillis();
for (long i = start; i <= end; i++) {
sum += i;
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("普通调用:result:" + sum);
System.out.println("普通调用耗时:" + (System.currentTimeMillis() - currentTime));
}

public static void main(String[] args) {
//起始值的大小
long start = 0;
//结束值的大小
long end = 10000;
//forkJoin 调用
forkJoinInvok(start, end);
System.out.println("========================");
//普通调用
normalInvok(start, end);
}
}

运行结果

forkJoin调用:result:50005000

forkJoin调用耗时:2286


普通调用:result:50005000
普通调用耗时:17038

Fork/Join 同步用法

同步用法就是将初始化的任务扔进连接池,如果没有执行完成会阻塞

forkJoinPool.invoke(submitTask);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
 /**
* forkJoin 方式调用
*
* @param start
* @param end
*/
public static void forkJoinInvok(long start, long end) {
long sum = 0;
long currentTime = System.currentTimeMillis();
//创建ForkJoinPool 连接池
ForkJoinPool forkJoinPool = new ForkJoinPool();
//创建初始化任务
SubmitTask submitTask = new SubmitTask(start, end);
//讲初始任务扔进连接池中执行 同步用法
forkJoinPool.invoke(submitTask);
System.out.println("同步方式,任务结束才会调用该方法,当前耗时"+(System.currentTimeMillis() - currentTime));
//等待返回结果
sum = submitTask.join();
System.out.println("任务执行完成,当前耗时:"+(System.currentTimeMillis() - currentTime));
System.out.println("forkJoin调用:result:" + sum);
System.out.println("forkJoin调用耗时:" + (System.currentTimeMillis() - currentTime));
}

打进结果

同步方式,任务结束才会调用该方法,当前耗时2367
任务执行完成,当前耗时:2368
forkJoin调用:result:50005000
forkJoin调用耗时:2368

Fork/Join 异步用法

异步用法就是将初始化的任务扔进连接池,然后继续其他任务

forkJoinPool.submit(submitTask);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* forkJoin 方式调用
*
* @param start
* @param end
*/
public static void forkJoinInvok(long start, long end) {
long sum = 0;
long currentTime = System.currentTimeMillis();
//创建ForkJoinPool 连接池
ForkJoinPool forkJoinPool = new ForkJoinPool();
//创建初始化任务
SubmitTask submitTask = new SubmitTask(start, end);
//讲初始任务扔进连接池中执行 异步方式
forkJoinPool.submit(submitTask);
System.out.println("异步方式,任务结束才会调用该方法,当前耗时"+(System.currentTimeMillis() - currentTime));
//等待返回结果
sum = submitTask.join();
System.out.println("任务执行完成,当前耗时:"+(System.currentTimeMillis() - currentTime));
System.out.println("forkJoin调用:result:" + sum);
System.out.println("forkJoin调用耗时:" + (System.currentTimeMillis() - currentTime));
}

打印结果

异步方式,任务结束才会调用该方法,当前耗时3
任务执行完成,当前耗时:2315
forkJoin调用:result:50005000
forkJoin调用耗时:2315

总结

关于ForkJoinPool

  • 可以使用ForkJoinPool.execute(异步,不返回结果)/invoke(同步,返回结果)/submit(异步,返回结果)方法,来执行ForkJoinTask。

  • ForkJoinPool有一个方法commonPool(),这个方法返回一个ForkJoinPool内部声明的静态ForkJoinPool实例。

    • 文档上说,这个方法适用于大多数的应用。这个静态实例的初始线程数,为“CPU核数-1 ”,(Runtime.getRuntime().availableProcessors() - 1)
    • ForkJoinTask自己启动时,使用的就是这个静态实例。

关于ForkJoinTask

  • 可以使用invokeAll(task)方法,主动执行其它的ForkJoinTask,并等待Task完成。(是同步的)

  • 还可以使用fork方法,让一个task执行(这个方法是异步的)

  • 还可以使用join方法,让一个task执行(这个方法是同步的,它和fork不同点是同步或者异步的区别)

  • 可以使用join来取得ForkJoinTask的返回值。由于RecursiveTask类实现了Future接口,所以也可以使用get()取得返回值。

    • get()和join()有两个主要的区别:
      • join()方法不能被中断。
      • 如果你中断调用join()方法的线程,这个方法将抛出InterruptedException异常。
      • 如果任务抛出任何未受检异常,get()方法将返回一个ExecutionException异常,而join()方法将返回一个RuntimeException异常。
  • ForkJoinTask在不显示使用ForkJoinPool.execute/invoke/submit()方法进行执行的情况下,也可以使用自己的fork/invoke方法进行执行。

  • 使用fork/invoke方法执行时,其实原理也是在ForkJoinPool里执行,只不过使用的是一个“在ForkJoinPool内部生成的静态的”ForkJoinPool。

  • ForkJoinTask有两个子类,RecursiveAction和RecursiveTask。

    • 他们之间的区别是,RecursiveAction没有返回值,RecursiveTask有返回值。
  • 看看ForkjoinTask的Complete方法的使用场景
    这个方法好要是用来使一个任务结束。这个方法被用在结束异步任务上,或者为那些能不正常结束的任务,提供一个选择。

  • Task的completeExceptionally方法是怎么回事。

    • 这个方法被用来,在异步的Task中产生一个exception,或者强制结束那些“不会结束”的任务
      这个方法是在Task想要“自己结束自己”时,可以被使用。而cancel方法,被设计成被其它TASK调用。
    • 当你在一个任务中抛出一个未检查异常时,它也影响到它的父任务(把它提交到ForkJoinPool类的任务)和父任务的父任务,以此类推。

评论