1
介绍
什么是线程池
"线程池(英语:thread pool :一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。
—— 维基百科
为什么要使用线程池
如何使用线程池
线程池使用有很多种方式,不过按照《Java 开发手册》描述,尽量还是要使用 ThreadPoolExecutor
进行创建。
代码举例:
ExecutorService pool = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1024),
new ThreadFactoryBuilder().setNameFormat("Thread-pool-%d").build(),
new ThreadPoolExecutor.AbortPolicy());
那创建线程池的这些构造参数有什么含义&#xff1f;线程池的运行原理是什么&#xff1f;下面则开始通过源码及作图一步一步的了解。
2
源码分析
参数介绍
public class ThreadPoolExecutor extends AbstractExecutorService {
/**
* ctx 为原子类型的变量, 有两个概念
* workerCount, 表示有效的线程数
* runState, 表示线程状态, 是否正在运行, 关闭等
*/
private final AtomicInteger ctl &#61; new AtomicInteger(ctlOf(RUNNING, 0));
// 29
private static final int COUNT_BITS &#61; Integer.SIZE - 3;
// 容量 2²⁹-1
private static final int CAPACITY &#61; (1 <1;
// runState is stored in the high-order bits 线程池的五中状态
// 即高3位为111, 接受新任务并处理排队任务
private static final int RUNNING &#61; -1 < // 即高3位为000, 不接受新任务, 但处理排队任务
private static final int SHUTDOWN &#61; 0 < // 即高3位为001, 不接受新任务, 不处理排队任务, 并中断正在进行的任务
private static final int STOP &#61; 1 < // 即高3位为010, 所有任务都已终止, 工作线程为0, 线程转换到状态TIDYING, 将运行terminate()钩子方法
private static final int TIDYING &#61; 2 < // 即高3位为011, 标识terminate()已经完成
private static final int TERMINATED &#61; 3 < // Packing and unpacking ctl 用来计算线程的方法
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
}
构造参数及含义
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
// 省略
}
参数说明&#xff1a;
线程池执行流程
execute 源码
public class ThreadPoolExecutor extends AbstractExecutorService {
public void execute(Runnable command) {
// 空则抛出异常
if (command &#61;&#61; null)
throw new NullPointerException();
// 获取当前线程池的状态
int c &#61; ctl.get();
// 计算工作线程数 并判断是否小于核心线程数
if (workerCountOf(c) // addWorker提交任务, 提交成功则结束
if (addWorker(command, true))
return;
// 提交失败再次获取当前状态
c &#61; ctl.get();
}
// 判断线程状态, 并插入队列, 失败则移除
if (isRunning(c) && workQueue.offer(command)) {
// 再次获取状态
int recheck &#61; ctl.get();
// 如果状态不是RUNNING, 并移除失败
if (! isRunning(recheck) && remove(command))
// 调用拒绝策略
reject(command);
// 如果工作线程为0 则调用 addWorker
else if (workerCountOf(recheck) &#61;&#61; 0)
addWorker(null, false);
}
// 提交任务失败 走拒绝策略
else if (!addWorker(command, false))
reject(command);
}
}
execute 方法流程和流程图画的相同&#xff0c;值得注意的是&#xff1a;
即使是核心线程数有空闲线程也会创建新线程&#xff01;
。// 默认工厂
ThreadFactory threadFactory &#61; Executors.defaultThreadFactory();
// google guava工具提供
ThreadFactory namedThreadFactory &#61; new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();
一般创建工厂&#xff0c;是为了更好的排查问题&#xff0c;也建议使用工厂指定线程名字。
当线程池达到最大线程数&#xff0c;并且队列满了&#xff0c;新的线程要采取的处理策略。
addWorker 源码
public class ThreadPoolExecutor extends AbstractExecutorService {
/**
* 检查任务是否可以提交
*
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
// 外层循环
for (;;) {
// 获取当前状态
int c &#61; ctl.get();
int rs &#61; runStateOf(c);
// 检查线程池是否关闭
if (rs >&#61; SHUTDOWN &&
! (rs &#61;&#61; SHUTDOWN &&
firstTask &#61;&#61; null &&
! workQueue.isEmpty()))
return false;
// 内层循环 CAS 增加线程个数
for (;;) {
int wc &#61; workerCountOf(c);
// 工作线程大于容量 或者大于 核心或最大线程数
if (wc >&#61; CAPACITY ||
wc >&#61; (core ? corePoolSize : maximumPoolSize))
return false;
// CAS 线程数增加, 成功则调到外层循环
if (compareAndIncrementWorkerCount(c))
break retry;
// 失败则再次获取线程状态
c &#61; ctl.get(); // Re-read ctl
// 不相等则重新走外层循环
if (runStateOf(c) !&#61; rs)
continue retry;
// 否则内层继续循环
}
}
/**
* 创建新worker 开始新线程
* 此时已经 CAS 成功了
*/
boolean workerStarted &#61; false;
boolean workerAdded &#61; false;
Worker w &#61; null;
try {
// 创建 Worker
w &#61; new Worker(firstTask);
final Thread t &#61; w.thread;
if (t !&#61; null) {
final ReentrantLock mainLock &#61; this.mainLock;
// 加锁&#xff0c;防止多线程同时执行线程池的 execute
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs &#61; runStateOf(ctl.get());
if (rs (rs &#61;&#61; SHUTDOWN && firstTask &#61;&#61; null)) {
// 判断线程是否存活, 已存活抛出非法异常
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 添加任务
workers.add(w);
int s &#61; workers.size();
// 设置池最大大小, 并将 workerAdded设置为 true
if (s > largestPoolSize)
largestPoolSize &#61; s;
workerAdded &#61; true;
}
} finally {
// 解锁
mainLock.unlock();
}
// 添加成功 开始启动线程 并将 workerStarted 设置为 true
if (workerAdded) {
t.start();
workerStarted &#61; true;
}
}
} finally {
// 启动线程失败
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
/**
* 启动线程失败, 加锁
* 移除线程, 并减少线程总数
* 转换状态
*/
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock &#61; this.mainLock;
mainLock.lock();
try {
if (w !&#61; null)
workers.remove(w);
decrementWorkerCount();
tryTerminate();
} finally {
mainLock.unlock();
}
}
}
addWorker 代码比较长&#xff0c;主要分为两部分&#xff1a;
3
总结
Q&A
Q: 线程池的原理及相关参数&#xff1f;
A: 主要参数为核心线程数、阻塞队列、最大线程数、拒绝策略。
Q: 线程池的线程是怎么回收的&#xff1f;
A: 线程被创建之后&#xff0c;如果 task &#61;&#61; null 或者调用 getTask 获取任务为 null
&#xff0c;则调用 processWorkerExit
对线程执行清理工作。
Q: 核心线程是不是就不可以回收了&#xff1f;
A: 核心线程数只会增加&#xff0c;而又没有回收&#xff0c;这时候假如线程池没有任务&#xff0c;就会一直维持核心线程。
当然也可以通过调用 allowCoreThreadTimeOut
方法&#xff0c;设置是否允许回收核心线程。
结束语
通过阅读 ThreadPoolExecutor
了解线程池的基本结构和原理&#xff0c;至于其他的更多扩展&#xff0c;文章篇幅有限&#xff0c;就需要小伙伴们自己阅读了。
- -
历史文章 &#xff5c; 相关推荐