java并发工具类-Fork-Join
Fork-Join
java下多线程的开发可以我们自己启用多线程,线程池,还可以使用forkjoin,forkjoin可以让我们不去了解诸如Thread,Runnable等相关的知识,只要遵循forkjoin的开发模式,就可以写出很好的多线程并发程序,
Fork-Join 是什么
Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。
分而治之
“分而治之” 一直是一个有效的处理大量数据的方法。著名的 MapReduce 也是采取了分而治之的思想。简单来说,就是如果你要处理1000个数据,但是你并不具备处理1000个数据的能力,那么你可以只处理其中的10个,然后,分阶段处理100次,将100次的结果进行合成,那就是最终想要的对原始的1000个数据的处理结果。
同时forkjoin在处理某一类问题时非常的有用,哪一类问题?分而治之的问题。十大计算机经典算法:快速排序、堆排序、归并排序、二分查找、线性查找、
深度优先、广度优先、Dijkstra、动态规划、朴素贝叶斯分类,有几个属于分而治之?3个,快速排序、归并排序、二分查找,还有大数据中M/R都是。
分治法的设计思想是:将一个难以直接解决的大问题,分割成一些规模较小的相同问题,以便各个击破,分而治之。
分治策略是:对于一个规模为n的问题,若该问题可以容易地解决(比如说规模n较小)则直接解决,否则将其分解为k个规模较小的子问题,这些子问题互相独立且与原问题形式相同(子问题相互之间有联系就会变为动态规范算法),递归地解这些子问题,然后将各子问题的解合并得到原问题的解。这种算法设计策略叫做分治法。
归并排序
归并排序是建立在归并操作上的一种有效的排序算法。该算法是采用分治法的一个非常典型的应用。将已有序的子序列合并,得到完全有序的序列;即先使每个子序列有序,再使子序列段间有序。
若将两个有序表合并成一个有序表,称为2-路归并,与之对应的还有多路归并。
对于给定的一组数据,利用递归与分治技术将数据序列划分成为越来越小的半子表,在对半子表排序后,再用递归方法将排好序的半子表合并成为越来越大的有序序列。
为了提升性能,有时我们在半子表的个数小于某个数(比如15)的情况下,对半子表的排序采用其他排序算法,比如插入排序。
Fork-Join原理
Fork/Join框架要完成两件事情:
任务分割
Fork/Join框架的基本思想就是将一个大任务分解(Fork)成一系列子任务,子任务可以继续往下分解,当多个不同的子任务都执行完成后,可以将它们各自的结果合并(Join)成一个大结果,最终合并成大任务的结果:
ForkJoinTask
基本任务,使用forkjoin框架必须创建的对象,提供fork,join操作,常用的两个子类
- RecursiveAction : 无结果返回的任务
- RecursiveTask : 有返回结果的任务
说明:
fork : 让task异步执行
join : 让task同步执行,可以获取返回值
ForkJoinTask 在不显示使用ForkJoinPool.execute/invoke/submit()方法进行执行的情况下,也可以使用自己的fork/invoke方法进行执行
结果合并
ForkJoinPool 执行 ForkJoinTask
- 任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。
- 当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务
三中提交方式:
execute: 异步,无返回结果
submit :异步,有返回结果 (返回
Future<T>
)invoke :同步,有返回结果 (会阻塞)
工作密取
即当前线程的Task已经全被执行完毕,则自动取到其他线程的Task池中取出Task继续执行。
ForkJoinPool中维护着多个线程(一般为CPU核数)在不断地执行Task,每个线程除了执行自己职务内的Task之外,还会根据自己工作线程的闲置情况去获取其他繁忙的工作线程的Task,如此一来就能能够减少线程阻塞或是闲置的时间,提高CPU利用率。
Fork/Join使用
我们要使用ForkJoin框架,必须首先创建一个ForkJoin任务。它提供在任务中执行fork和join的操作机制,通常我们不直接继承ForkjoinTask类,只需要直接继承其子类。
RecursiveAction,用于没有返回结果的任务
RecursiveTask,用于有返回值的任务
task要通过ForkJoinPool来执行,使用submit 或 invoke 提交,两者的区别是:invoke是同步执行,调用之后需要等待任务完成,才能执行后面的代码;submit是异步执行。
join()和get方法当任务完成的时候返回计算结果。
在我们自己实现的compute方法里,首先需要判断任务是否足够小,如果足够小就直接执行任务。如果不足够小,就必须分割成两个子任务,每个子任务在调用invokeAll方法时,又会进入compute方法,看看当前子任务是否需要继续分割成孙任务,如果不需要继续分割,则执行当前子任务并返回结果。使用join方法会等待子任务执行完并得到其结果。
Fork/Join的使用
1 | public class SubmitTask extends RecursiveTask<Long> { |
运行结果
forkJoin调用:result:50005000
forkJoin调用耗时:2286
普通调用:result:50005000
普通调用耗时:17038
Fork/Join 同步用法
同步用法就是将初始化的任务扔进连接池,如果没有执行完成会阻塞
forkJoinPool.invoke(submitTask);
1 | /** |
打进结果
同步方式,任务结束才会调用该方法,当前耗时2367
任务执行完成,当前耗时:2368
forkJoin调用:result:50005000
forkJoin调用耗时:2368
Fork/Join 异步用法
异步用法就是将初始化的任务扔进连接池,然后继续其他任务
forkJoinPool.submit(submitTask);
1 | /** |
打印结果
异步方式,任务结束才会调用该方法,当前耗时3
任务执行完成,当前耗时:2315
forkJoin调用:result:50005000
forkJoin调用耗时:2315
总结
关于ForkJoinPool
可以使用ForkJoinPool.execute(异步,不返回结果)/invoke(同步,返回结果)/submit(异步,返回结果)方法,来执行ForkJoinTask。
ForkJoinPool有一个方法commonPool(),这个方法返回一个ForkJoinPool内部声明的静态ForkJoinPool实例。
- 文档上说,这个方法适用于大多数的应用。这个静态实例的初始线程数,为“CPU核数-1 ”,
(Runtime.getRuntime().availableProcessors() - 1)
- ForkJoinTask自己启动时,使用的就是这个静态实例。
- 文档上说,这个方法适用于大多数的应用。这个静态实例的初始线程数,为“CPU核数-1 ”,
关于ForkJoinTask
可以使用invokeAll(task)方法,主动执行其它的ForkJoinTask,并等待Task完成。(是同步的)
还可以使用fork方法,让一个task执行(这个方法是异步的)
还可以使用join方法,让一个task执行(这个方法是同步的,它和fork不同点是同步或者异步的区别)
可以使用join来取得ForkJoinTask的返回值。由于RecursiveTask类实现了Future接口,所以也可以使用get()取得返回值。
- get()和join()有两个主要的区别:
- join()方法不能被中断。
- 如果你中断调用join()方法的线程,这个方法将抛出InterruptedException异常。
- 如果任务抛出任何未受检异常,get()方法将返回一个ExecutionException异常,而join()方法将返回一个RuntimeException异常。
- get()和join()有两个主要的区别:
ForkJoinTask在不显示使用ForkJoinPool.execute/invoke/submit()方法进行执行的情况下,也可以使用自己的fork/invoke方法进行执行。
使用fork/invoke方法执行时,其实原理也是在ForkJoinPool里执行,只不过使用的是一个“在ForkJoinPool内部生成的静态的”ForkJoinPool。
ForkJoinTask有两个子类,RecursiveAction和RecursiveTask。
- 他们之间的区别是,RecursiveAction没有返回值,RecursiveTask有返回值。
看看ForkjoinTask的Complete方法的使用场景
这个方法好要是用来使一个任务结束。这个方法被用在结束异步任务上,或者为那些能不正常结束的任务,提供一个选择。Task的completeExceptionally方法是怎么回事。
- 这个方法被用来,在异步的Task中产生一个exception,或者强制结束那些“不会结束”的任务
这个方法是在Task想要“自己结束自己”时,可以被使用。而cancel方法,被设计成被其它TASK调用。 - 当你在一个任务中抛出一个未检查异常时,它也影响到它的父任务(把它提交到ForkJoinPool类的任务)和父任务的父任务,以此类推。
- 这个方法被用来,在异步的Task中产生一个exception,或者强制结束那些“不会结束”的任务