抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

java并发工具类-线程池

什么是线程池

​ 之前我们在使用多线程都是用Thread的start()来创建启动一个线程,但是在实际开发中,如果每个请求到达就创建一个新线程,开销是相当大的。服务器在创建和销毁线程上花费的时间和消耗的系统资源都相当大,甚至可能要比在处理实际的用请求的时间和资源要多的多。除了创建和销毁线程的开销之外,活动的线程也需要消耗系统资源。如果在一个jvm里创建太多的线程,可能会使系统由于过度消耗内存或“切换过度”而导致系统资源不足。这就引入了线程池概念。

​ 线程池的原理其实就是对多线程的一个管理,为了实现异步机制的一种方法,其实就是多个线程执行多个任务,最终这些线程通过线程池进行管理不用手动去维护一次可以处理多个任务,这样就可以迅速的进行相应比如说一个网站成为了热点网站,那么对于大量的点击量,就必须要对每一次的点击做出迅速的处理,这样才能达到更好的交互效果,这样就需要多个线程去处理这些请求,以便能够更好的提供服务。

为什么使用线程池

​ 操作系统创建线程、切换线程状态、终结线程都要进行CPU调度——这是一个耗费时间和系统资源的事情。

​ 大多数实际场景中是这样的:处理某一次请求的时间是非常短暂的,但是请求数量是巨大的。这种技术背景下,如果我们为每一个请求都单独创建一个线程,那么物理机的所有资源基本上都被操作系统创建线程、切换线程状态、销毁线程这些操作所占用,用于业务请求处理的资源反而减少了。所以最理想的处理方式是,将处理请求的线程数量控制在一个范围,既保证后续的请求不会等待太长时间,又保证物理机将足够的资源用于请求处理本身。另外,一些操作系统是有最大线程数量限制的。当运行的线程数量逼近这个值的时候,操作系统会变得不稳定。这也是我们要限制线程数量的原因。

线程池的优点

  • 降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁带来的消耗。
  • 提高响应速度:当任务到达时,任务可以不需要等待线程创建就能立即执行。
  • 提高线程的可管理性:使用线程池可以统一进行线程分配、调度和监控。
  • 线程统一管理:线程池具有创建线程和销毁线程的能力,线程集中在一起比起分散开来,更加便于管理

线程池可以应对突然大爆发量的访问,通过有限个固定线程为大量的操作服务,减少创建和销毁线程所需的时间。

线程池的分类和作用

线程池创建可以使用Executors进行创建一下是他的几种创建方式

newCachedThreadPool

创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们,并在需要时使用提供的 ThreadFactory 创建新线程。

  1. 线程池中数量没有固定,可达到最大值(Interger. MAX_VALUE)
  2. 线程池中的线程可进行缓存重复利用和回收(回收默认时间为1分钟)
  3. 当线程池中,没有可用线程,会重新创建一个线程

注意:线程数量较多的时候禁止使用因为没有设置最大线程数,会在高并发情况下出现线程数暴增。

1
ExecutorService executorService = Executors.newCachedThreadPool();

newFixedThreadPool

创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程。在任意点,在大多数 Threads 线程会处于处理任务的活动状态。如果在所有线程处于活动状态时提交附加任务,则在有可用线程之前,附加任务将在队列中等待。如果在关闭前的执行期间由于失败而导致任何线程终止,那么一个新线程将代替它执行后续的任务(如果需要)。在某个线程被显式地关闭之前,池中的线程将一直存在。

  1. 线程池中的线程处于一定的量,可以很好的控制线程的并发量
  2. 线程可以重复被使用,在显示关闭之前,都将一直存在
  3. 超出一定量的线程被提交时候需在队列中等待

注意:如果任务是非常多禁止使用,因为使用了无界队列,没有任务的数量限制,在任务量特别大的情况下会出现内存暴增。

1
ExecutorService executorService = Executors.newFixedThreadPool(5);

newSingleThreadExecutor

创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。(注意,如果因为在关闭前的执行期间出现失败而终止了此单个线程,那么如果需要,一个新线程将代替它执行后续的任务)。可保证顺序地执行各个任务,并且在任意给定的时间不会有多个线程是活动的。与其他等效的 newFixedThreadPool(1) 不同,可保证无需重新配置此方法所返回的执行程序即可使用其他的线程。

  1. 线程池中最多执行1个线程,之后提交的线程活动将会排在队列中以此执行

注意:和和上面一样,如果任务是非常多禁止使用,因为使用了无界队列,没有任务的数量限制,在任务量特别大的情况下会出现内存暴增。

1
ExecutorService executorService = Executors.newSingleThreadExecutor();

newScheduleThreadPool

创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。

  1. 线程池中具有指定数量的线程,即便是空线程也将保留
  2. 可定时或者延迟执行线程活动
1
ExecutorService executorService = Executors.newScheduledThreadPool(10);

newSingleThreadScheduledExecutor

创建一个单线程执行程序,它可安排在给定延迟后运行命令或者定期地执行。

  1. 线程池中最多执行1个线程,之后提交的线程活动将会排在队列中以此执行
  2. 可定时或者延迟执行线程活动

以上的集中线程池创建都可以使用Executors静态类来进行创建

1
ExecutorService executorService = Executors.newSingleThreadScheduledExecutor();

为什么不允许使用Executors去创建线程池

阿里规范强制规定 线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样
的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险

说明: Executors 返回的线程池对象的弊端如下:

  1. FixedThreadPool 和 SingleThreadPool:
    允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。
  2. CachedThreadPool 和 ScheduledThreadPool:
    允许的创建线程数量为 Integer.MAX_VALUE, 可能会创建大量的线程,从而导致 OOM。

ThreadPoolExector

     在《阿里巴巴java开发手册》中指出了线程资源必须通过线程池提供,不允许在应用中自行显示的创建线程,这样一方面是线程的创建更加规范,可以合理控制开辟线程的数量;另一方面线程的细节管理交给线程池处理,优化了资源的开销。而线程池不允许使用Executors去创建,而要通过ThreadPoolExecutor方式,这一方面是由于jdk中Executor框架虽然提供了如newFixedThreadPool()、newSingleThreadExecutor()、newCachedThreadPool()等创建线程池的方法,但都有其局限性,不够灵活;另外由于前面几种方法内部也是通过ThreadPoolExecutor方式实现,使用ThreadPoolExecutor有助于大家明确线程池的运行规则,创建符合自己的业务场景需要的线程池,避免资源耗尽的风险。 

线程池的执行流程

这个也一般是面试官问到的问题

  1. 首先我们提交第一个任务到线程池,此时核心线程数都还没有用,所以会启动核心线程之一来执行任务,记住为了说明这个流程,我们的任务的占用时间都很长,所以短时间内不会结束;

  2. 接着提交第二个第三个任务到线程池,他们的执行逻辑同第一个任务是一模一样的,线程池会启动核心线程池中剩下的两个线程来执行你新提交的任务。

  3. 接着又有新的任务提交过来,这个时候线程池发现核心线程池中的线程已经都在工作中,所以会去看任务队列taskQueue是否满了,发现并没有,是空的,所以将这个任务放入任务队列中等待核心线程池中有空闲线程时自己来取任务执行。

  4. 接着又提交了4个任务到线程池,他们分别判断核心线程是否空闲,不空闲,然后判断任务队列是否已满,不满,则直接将任务放入队列;

  5. 接着新的任务又来,则在判断核心线程池和任务队列之后,发现任务依然没有办法处理,则会判断是否线程数达到最大,发现没有,则新启动线程来执行任务;

  6. 接着又来一个任务,执行流程同 5;

  7. 再来一个任务,发现核心线程池在忙,任务队列也满了,线程池中的全部线程也都在工作,没有办法处理他了,所以他找到了饱和策略,因为饱和策略是默认的抛异常,所以线程池会告诉提交任务的线程,已经没有可以用的线程了。

构造方法

我们看下ThreadPoolExector的原代码,有如下的一些构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
/**
* 创建线程池
* @param corePoolSize 核心线程池大小
* @param maximumPoolSize 最大线程池大小
* @param keepAliveTime 空闲等待时间
* @param unit 时间单位
* @param workQueue 传入的阻塞队列
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
//调用重载的构造方法
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}

/**
* 创建线程池
* @param corePoolSize 核心线程池大小
* @param maximumPoolSize 最大线程池大小
* @param keepAliveTime 空闲等待时间
* @param unit 时间单位
* @param workQueue 传入的阻塞队列
* @param threadFactory 线程工厂
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
///调用重载的构造方法
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}

/**
* 创建线程池
* @param corePoolSize 核心线程池大小
* @param maximumPoolSize 最大线程池大小
* @param keepAliveTime 空闲等待时间
* @param unit 时间单位
* @param workQueue 传入的阻塞队列
* @param handler 拒绝策略
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
///调用重载的构造方法
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}


/**
* 创建线程池
* @param corePoolSize 核心线程池大小
* @param maximumPoolSize 最大线程池大小
* @param keepAliveTime 空闲等待时间
* @param unit 时间单位
* @param workQueue 传入的阻塞队列
* @param threadFactory 线程工厂
* @param handler 拒绝策略
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
//条件校验,不满足抛出异常
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
// 阻塞队列,线程工厂,拒绝策略不允许为空
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
//java安全模式
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

可以看出ThreadPoolExector一共有四个构造函数,但是最后调用的都是最后一个,我们可以只看最后一个,它主要有核心池大小、最大池大小、存活时间、时间单位、阻塞队列、线程工厂这几个参数,其中又对其进行了值范围的检查,如果参数违法就抛出异常,然后构造进去。

构造参数
参数名 作用
corePoolSize 指定了线程池中的线程数量,它的数量决定了添加的任务是开辟新的线程去执行,还是放到workQueue任务队列中去
maximumPoolSize 指定了线程池中的最大线程数量,这个参数会根据你使用的workQueue任务队列的类型,决定线程池会开辟的最大线程数量
keepAliveTime 当线程池中空闲线程数量超过corePoolSize时,多余的线程会在多长时间内被销毁
unit keepAliveTime的单位
workQueue 任务队列,被添加到线程池中,但尚未被执行的任务;它一般分为直接提交队列、有界任务队列、无界任务队列、优先任务队列几种
threadFactory 线程工厂,用于创建线程,一般用默认即可
handler 拒绝策略;当任务太多来不及处理时,如何拒绝任务

workQueue任务队列

它一般分为直接提交队列、有界任务队列、无界任务队列、优先任务队列

直接提交队列

设置为SynchronousQueue队列,SynchronousQueue是一个特殊的BlockingQueue,它没有容量,每执行一个插入操作就会阻塞,需要再执行一个删除操作才会被唤醒,反之每一个删除操作也都要等待对应的插入操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package chapter02.pool;

import java.util.concurrent.*;

public class SynchronousQueueThreadPool {
private static ExecutorService pool;

public static void main(String[] args) {
//maximumPoolSize设置为2 ,拒绝策略为AbortPolic策略,直接抛出异常
pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
for (int i = 0; i < 3; i++) {
pool.execute(new ThreadTask("ThreadPool-" + i));
}
pool.shutdown();
}
}

​ 使用SynchronousQueue队列,提交的任务不会被保存,总是会马上提交执行。如果用于执行任务的线程数量小于maximumPoolSize,则尝试创建新的进程,如果达到maximumPoolSize设置的最大值,则根据你设置的handler执行拒绝策略。因此这种方式你提交的任务不会被缓存起来,而是会被马上执行,在这种情况下,你需要对你程序的并发量有个准确的评估,才能设置合适的maximumPoolSize数量,否则很容易就会执行拒绝策略;

有界的任务队列

有界的任务队列可以使用ArrayBlockingQueue实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package chapter02.pool;

import java.util.concurrent.*;

public class BoundedQueueThreadPool {
private static ExecutorService pool;

public static void main(String[] args) {
//maximumPoolSize设置为2 ,拒绝策略为AbortPolic策略,直接抛出异常
pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
for (int i = 0; i < 3; i++) {
pool.execute(new ThreadTask("ThreadPool-" + i));
}
pool.shutdown();
}
}

​ 使用ArrayBlockingQueue有界任务队列,若有新的任务需要执行时,线程池会创建新的线程,直到创建的线程数量达到corePoolSize时,则会将新的任务加入到等待队列中。若等待队列已满,即超过ArrayBlockingQueue初始化的容量,则继续创建线程,直到线程数量达到maximumPoolSize设置的最大线程数量,若大于maximumPoolSize,则执行拒绝策略。在这种情况下,线程数量的上限与有界任务队列的状态有直接关系,如果有界队列初始容量较大或者没有达到超负荷的状态,线程数将一直维持在corePoolSize以下,反之当任务队列已满时,则会以maximumPoolSize为最大线程数上限。

无界的任务队列

有界任务队列可以使用LinkedBlockingQueue实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package chapter02.pool;

import java.util.concurrent.*;

public class UnboundedQueueThreadPool {
private static ExecutorService pool;

public static void main(String[] args) {
//maximumPoolSize设置为2 ,拒绝策略为AbortPolic策略,直接抛出异常
pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
for (int i = 0; i < 3; i++) {
pool.execute(new ThreadTask("ThreadPool-" + i));
}
pool.shutdown();
}
}

​ 使用无界任务队列,线程池的任务队列可以无限制的添加新的任务,而线程池创建的最大线程数量就是你corePoolSize设置的数量,也就是说在这种情况下maximumPoolSize这个参数是无效的,哪怕你的任务队列中缓存了很多未执行的任务,当线程池的线程数达到corePoolSize后,就不会再增加了;若后续有新的任务加入,则直接进入队列等待,当使用这种任务队列模式时,一定要注意你任务提交与处理之间的协调与控制,不然会出现队列中的任务由于无法及时处理导致一直增长,直到最后资源耗尽的问题。

优先任务队列

优先任务队列通过PriorityBlockingQueue实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package chapter02.pool;

import java.util.concurrent.*;

public class PriorityQueueThreadPool {
private static ExecutorService pool;

public static void main(String[] args) {
pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new PriorityBlockingQueue<Runnable>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
for (int i = 0; i < 20; i++) {
pool.execute(new PriorityThreadTadk(i));
}
pool.shutdown();
}
}

​ 大家可以看到除了第一个任务直接创建线程执行外,其他的任务都被放入了优先任务队列,按优先级进行了重新排列执行,且线程池的线程数一直为corePoolSize,也就是只有一个。

​ 通过运行的代码我们可以看出PriorityBlockingQueue它其实是一个特殊的无界队列,它其中无论添加了多少个任务,线程池创建的线程数也不会超过corePoolSize的数量,只不过其他队列一般是按照先进先出的规则处理任务,而PriorityBlockingQueue队列可以自定义规则根据任务的优先级顺序先后执行。

拒绝策略

     一般我们创建线程池时,为防止资源被耗尽,任务队列都会选择创建有界任务队列,但种模式下如果出现任务队列已满且线程池创建的线程数达到你设置的最大线程数时,这时就需要你指定ThreadPoolExecutor的RejectedExecutionHandler参数即合理的拒绝策略,来处理线程池"超载"的情况。ThreadPoolExecutor自带的拒绝策略如下 
RejectedExecutionHandler 特性及效果
AbortPolicy 该策略会直接抛出异常,阻止系统正常工作
CallerRunsPolicy 如果线程池的线程数量达到上限,该策略会把任务队列中的任务放在调用者线程当中运行
DiscardOledestPolicy 该策略会丢弃任务队列中最老的一个任务,也就是当前任务队列中最先被添加进去的,马上要被执行的那个任务,并尝试再次提交
DiscardPolicy 该策略会默默丢弃无法处理的任务,不予任何处理。当然使用此策略,业务场景中需允许任务的丢失

ThreadFactory自定义线程创建

     线程池中线程就是通过ThreadPoolExecutor中的ThreadFactory,线程工厂创建的。那么通过自定义ThreadFactory,可以按需要对线程池中创建的线程进行一些特殊的设置,如命名、优先级等,下面代码我们通过ThreadFactory对线程池中创建的线程进行记录与命名 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package chapter02.pool;

import java.util.concurrent.*;

/**
* 自定义线程创建工厂
*/
public class CustomThreadFactory {
private static ExecutorService pool;

public static void main(String[] args) {
pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(100), getThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
for (int i = 0; i < 20; i++) {
pool.execute(new ThreadTask("ThreadPool-" + i));
}
pool.shutdown();
}

/**
* 自定义线程工厂
*
* @return
*/
public static ThreadFactory getThreadFactory() {
return new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
System.out.println("线程:" + r.hashCode() + "创建了");
//自定义创建线程
return new Thread(r, "threadPool" + r.hashCode());
}
};
}

}

可以看到线程池中,每个线程的创建我们都进行了记录输出与命名。

ThreadPoolExecutor扩展

ThreadPoolExecutor扩展主要是围绕beforeExecute()、afterExecute()和terminated()三个接口实现的

接口名 作用
beforeExecute 线程池中任务运行前执
afterExecute 线程池中任务运行完毕后执行
terminated 线程池退出后执行

通过这三个接口我们可以监控每个任务的开始和结束时间,或者其他一些功能。下面我们可以通过代码实现一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package chapter02.pool;

import java.util.concurrent.*;

public class ThreadPoolExtension {
private static ExecutorService pool;

public static void main(String[] args) {
pool = new ExtensionThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(100), getThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
for (int i = 0; i < 20; i++) {
pool.execute(new ThreadTask("ThreadPool-" + i));
}
pool.shutdown();
}

/**
* 自定义线程工厂
*
* @return
*/
public static ThreadFactory getThreadFactory() {
return new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
System.out.println("线程:" + r.hashCode() + "创建了");
//自定义创建线程
return new Thread(r, "threadPool" + r.hashCode());
}
};
}

static class ExtensionThreadPoolExecutor extends ThreadPoolExecutor {

public ExtensionThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}

public ExtensionThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}

public ExtensionThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}

public ExtensionThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}


/**
* 线程运行前执行
*
* @param t
* @param r
*/
@Override
protected void beforeExecute(Thread t, Runnable r) {
System.out.println("threadName" + t.getName() + "开始进行执行");
}

/**
* 线程完成任务后执行
*
* @param r
* @param t
*/
@Override
protected void afterExecute(Runnable r, Throwable t) {
ThreadTask threadTask = (ThreadTask) r;
System.out.println("threadName:" + threadTask.getThreadName() + "执行完成");
}

@Override
protected void terminated() {
System.out.println("线程池退出...");
}

}
}

​ 可以看到通过对beforeExecute()、afterExecute()和terminated()的实现,我们对线程池中线程的运行状态进行了监控,在其执行前后输出了相关打印信息。另外使用shutdown方法可以比较安全的关闭线程池, 当线程池调用该方法后,线程池中不再接受后续添加的任务。但是,此时线程池不会立刻退出,直到添加到线程池中的任务都已经处理完成,才会退出。

合理配置线程池数量(重点)

线程数一般与CPU核数有关 可以通过一下代码获取CPU核数

1
int nCpu =  Runtime.getRuntime().availableProcessors()

一般业务分为CPU密集型以及IO密集型

CPU密集该任务(run代码)需要大量的运算,而没有阻塞的情况,CPU全速运行。

CPU密集

CPU密集的意思是该任务需要大量的运算,而没有阻塞,CPU一直全速运行。

CPU密集任务只有在真正的多核CPU上才可能得到加速(通过多线程),而在单核CPU上,无论你开几个模拟的多线程,该任务都不可能得到加速,因为CPU总的运算能力就那些。

一般设置为n+1

1
nCpu+1

IO密集

​ IO密集型,即该任务需要大量的IO,即大量的阻塞。在单线程上运行IO密集型的任务会导致浪费大量的CPU运算能力浪费在等待。所以在IO密集型任务中使用多线程可以大大的加速程序运行,即时在单核CPU上,这种加速主要就是利用了被浪费掉的阻塞时间。

一般设置为 2n+1

1
2*nCpu+1

精确设置

以上的设置都是通用设置,对于有些需要特殊优化的可以采用一下精确设置

根据实际情况,只要不是设置的偏大和偏小都问题不大,结合下面这个公式即可

1
2
3
4
5
6
7
8
 /**
* Nthreads=CPU数量
* Ucpu=目标CPU的使用率,0<=Ucpu<=1
* w:等待时间
* c:计算实践
* W/C=任务等待时间与任务计算时间的比率
*/
Nthreads = Ncpu*Ucpu*(1+W/C)

评论