热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

ThreadPoolExecutor源码解析(线程池的工作原理)

资料使用ThreadPoolExecutor遇到的核心线程被阻塞,非核心线程未按照预期运行问题简单分析ThreadPoolExecutor回收工作线程的原理线程有
资料

使用ThreadPoolExecutor遇到的核心线程被阻塞,非核心线程未按照预期运行问题
简单分析ThreadPoolExecutor回收工作线程的原理

线程有几种状态

public enum State {/*** Thread state for a thread which has not yet started. 尚未启动的线程的线程状态。*/NEW,/*** Thread state for a runnable thread. A thread in the runnable* state is executing in the Java virtual machine but it may* be waiting for other resources from the operating system* such as processor. * 可运行线程的线程状态。处于可运行状态的线程正在 Java 虚拟机中执行,但它可能正在等待来自操作系统的其他资源,例如处理器。*/RUNNABLE,/*** Thread state for a thread blocked waiting for a monitor lock.* A thread in the blocked state is waiting for a monitor lock* to enter a synchronized block/method or* reenter a synchronized block/method after calling* {@link Object#wait() Object.wait}.* 线程阻塞等待监视器锁的线程状态。处于阻塞状态的线程在调用后等待监视器锁进入同步块/方法*/BLOCKED,/*** Thread state for a waiting thread.* A thread is in the waiting state due to calling one of the* following methods:* 等待线程的线程状态。由于调用以下方法之一,线程处于等待状态:*

    *
  • {@link Object#wait() Object.wait} with no timeout
  • *
  • {@link #join() Thread.join} with no timeout
  • *
  • {@link LockSupport#park() LockSupport.park}
  • *
**

A thread in the waiting state is waiting for another thread to* perform a particular action.* 处于等待状态的线程正在等待另一个线程执行特定操作。** For example, a thread that has called {@code Object.wait()}* on an object is waiting for another thread to call* {@code Object.notify()} or {@code Object.notifyAll()} on* that object. A thread that has called {@code Thread.join()}* is waiting for a specified thread to terminate.*/WAITING,/*** Thread state for a waiting thread with a specified waiting time.* A thread is in the timed waiting state due to calling one of* the following methods with a specified positive waiting time:* 具有指定等待时间的等待线程的线程状态。由于以指定的正等待时间调用以下方法之一,线程处于定时等待状态:*

    *
  • {@link #sleep Thread.sleep}
  • *
  • {@link Object#wait(long) Object.wait} with timeout
  • *
  • {@link #join(long) Thread.join} with timeout
  • *
  • {@link LockSupport#parkNanos LockSupport.parkNanos}
  • *
  • {@link LockSupport#parkUntil LockSupport.parkUntil}
  • *
*/TIMED_WAITING,/*** Thread state for a terminated thread.* The thread has completed execution.* 已终止线程的线程状态。线程已完成执行。*/TERMINATED;}

1. ThreadPoolExecutor构造函数

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,threadFactory, defaultHandler);}int corePoolSize, // 核心线程
int maximumPoolSize, // 最大线程数
long keepAliveTime, // 持续时间
TimeUnit unit, // 时间单位
BlockingQueue<Runnable> workQueue, // 阻塞线程
ThreadFactory threadFactory //工厂方法

2. public void execute(Runnable command)

public void execute(Runnable command) {if (command &#61;&#61; null)throw new NullPointerException();/** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task. The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn&#39;t, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread. If it fails, we know we are shut down or saturated* and so reject the task.*/int c &#61; ctl.get();// 小于核心线程&#xff0c;直接加入if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c &#61; ctl.get();}// 判断是否加入阻塞队列if (isRunning(c) && workQueue.offer(command)) {int recheck &#61; ctl.get();if (! isRunning(recheck) && remove(command))reject(command); // 拒绝策略else if (workerCountOf(recheck) &#61;&#61; 0)addWorker(null, false);}else if (!addWorker(command, false)) // 尝试加入非核心线程reject(command); // 没有加入成功&#xff0c;拒绝策略}

总结&#xff1a;

  1. 先尝试加入核心线程&#xff08;如果还没超过核心线程数&#xff09;
  2. 再尝试加入阻塞队列&#xff0c;如果没有SHUTDOWN&#xff0c;判断核心运行为0&#xff0c;则再次运行
  3. 最后加入非核心线程&#xff0c;没有加入&#xff0c;怎拒绝策略

3. addWorker

/** Methods for creating, running and cleaning up after workers*//*** Checks if a new worker can be added with respect to current* pool state and the given bound (either core or maximum). If so,* the worker count is adjusted accordingly, and, if possible, a* new worker is created and started, running firstTask as its* first task. This method returns false if the pool is stopped or* eligible to shut down. It also returns false if the thread* factory fails to create a thread when asked. If the thread* creation fails, either due to the thread factory returning* null, or due to an exception (typically OutOfMemoryError in* Thread.start()), we roll back cleanly.** &#64;param firstTask the task the new thread should run first (or* null if none). Workers are created with an initial first task* (in method execute()) to bypass queuing when there are fewer* than corePoolSize threads (in which case we always start one),* or when the queue is full (in which case we must bypass queue).* Initially idle threads are usually created via* prestartCoreThread or to replace other dying workers.** &#64;param core if true use corePoolSize as bound, else* maximumPoolSize. (A boolean indicator is used here rather than a* value to ensure reads of fresh values after checking other pool* state).* &#64;return true if successful*/private boolean addWorker(Runnable firstTask, boolean core) {retry:for (int c &#61; ctl.get();;) {// Check if queue empty only if necessary.if (runStateAtLeast(c, SHUTDOWN)&& (runStateAtLeast(c, STOP)|| firstTask !&#61; null|| workQueue.isEmpty()))return false;for (;;) {// 如果不满足核心和非核心线程数&#xff0c;就falseif (workerCountOf(c)>&#61; ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))return false;if (compareAndIncrementWorkerCount(c))break retry;c &#61; ctl.get(); // Re-read ctlif (runStateAtLeast(c, SHUTDOWN))continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted &#61; false;boolean workerAdded &#61; false;Worker w &#61; null;try {// 创建一个Worker工作类w &#61; new Worker(firstTask);final Thread t &#61; w.thread;if (t !&#61; null) {final ReentrantLock mainLock &#61; this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int c &#61; ctl.get();if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask &#61;&#61; null)) {if (t.getState() !&#61; Thread.State.NEW)throw new IllegalThreadStateException();workers.add(w);workerAdded &#61; true;int s &#61; workers.size();if (s > largestPoolSize)largestPoolSize &#61; s;}} finally {mainLock.unlock();}if (workerAdded) {t.start();workerStarted &#61; true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;}

总结&#xff1a;
1. 判断是否在核心和非核心线程内
2. 创建一个Worker
3. 添加到workers线程中
4. 开启Worker中Thread的线程

4. Worker

private final class Worker extends AbstractQueuedSynchronizer implements Runnable{/*** This class will never be serialized, but we provide a* serialVersionUID to suppress a javac warning.*/private static final long serialVersionUID &#61; 6138294804551838833L;/** Thread this worker is running in. Null if factory fails. */final Thread thread;/** Initial task to run. Possibly null. */Runnable firstTask;/** Per-thread task counter */volatile long completedTasks;// TODO: switch to AbstractQueuedLongSynchronizer and move// completedTasks into the lock word./*** Creates with given first task and thread from ThreadFactory.* &#64;param firstTask the first task (null if none)*/Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask &#61; firstTask;this.thread &#61; getThreadFactory().newThread(this);}/** Delegates main run loop to outer runWorker. */public void run() {// 真正运行线程开始runWorker(this);}// Lock methods//// The value 0 represents the unlocked state.// The value 1 represents the locked state.protected boolean isHeldExclusively() {return getState() !&#61; 0;}protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);setState(0);return true;}public void lock() { acquire(1); }public boolean tryLock() { return tryAcquire(1); }public void unlock() { release(1); }public boolean isLocked() { return isHeldExclusively(); }void interruptIfStarted() {Thread t;if (getState() >&#61; 0 && (t &#61; thread) !&#61; null && !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}}}}

总结&#xff1a;

  1. this.thread &#61; getThreadFactory().newThread(this); 要运行Worker里面的run方法
  2. run中运行外面的runWorker方法
  3. firstTask也就起到一个保存&#xff0c;和被调用的作用

5. runWorker

/*** Main worker run loop. Repeatedly gets tasks from queue and* executes them, while coping with a number of issues:** 1. We may start out with an initial task, in which case we* don&#39;t need to get the first one. Otherwise, as long as pool is* running, we get tasks from getTask. If it returns null then the* worker exits due to changed pool state or configuration* parameters. Other exits result from exception throws in* external code, in which case completedAbruptly holds, which* usually leads processWorkerExit to replace this thread.** 2. Before running any task, the lock is acquired to prevent* other pool interrupts while the task is executing, and then we* ensure that unless pool is stopping, this thread does not have* its interrupt set.** 3. Each task run is preceded by a call to beforeExecute, which* might throw an exception, in which case we cause thread to die* (breaking loop with completedAbruptly true) without processing* the task.** 4. Assuming beforeExecute completes normally, we run the task,* gathering any of its thrown exceptions to send to afterExecute.* We separately handle RuntimeException, Error (both of which the* specs guarantee that we trap) and arbitrary Throwables.* Because we cannot rethrow Throwables within Runnable.run, we* wrap them within Errors on the way out (to the thread&#39;s* UncaughtExceptionHandler). Any thrown exception also* conservatively causes thread to die.** 5. After task.run completes, we call afterExecute, which may* also throw an exception, which will also cause thread to* die. According to JLS Sec 14.20, this exception is the one that* will be in effect even if task.run throws.** The net effect of the exception mechanics is that afterExecute* and the thread&#39;s UncaughtExceptionHandler have as accurate* information as we can provide about any problems encountered by* user code.** &#64;param w the worker*/final void runWorker(Worker w) {Thread wt &#61; Thread.currentThread();Runnable task &#61; w.firstTask;w.firstTask &#61; null;w.unlock(); // allow interruptsboolean completedAbruptly &#61; true;try {// 1. task不为空// 2. getTask 从阻塞列表中获取while (task !&#61; null || (task &#61; getTask()) !&#61; null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);try {task.run();afterExecute(task, null);} catch (Throwable ex) {afterExecute(task, ex);throw ex;}} finally {task &#61; null;w.completedTasks&#43;&#43;;w.unlock();}}completedAbruptly &#61; false;} finally {processWorkerExit(w, completedAbruptly);}}

总结;

  1. task不为空&#xff0c;就直接执行task
  2. task为空(firstTask&#61;&#61;null)&#xff0c;通过getTask获取任务
  3. 此时runWorker在Thread线程里
  4. 运行完之后调用processWorkerExit(…

getTask

/*** Performs blocking or timed wait for a task, depending on* current configuration settings, or returns null if this worker* must exit because of any of:* 1. There are more than maximumPoolSize workers (due to* a call to setMaximumPoolSize).* 2. The pool is stopped.* 3. The pool is shutdown and the queue is empty.* 4. This worker timed out waiting for a task, and timed-out* workers are subject to termination (that is,* {&#64;code allowCoreThreadTimeOut || workerCount > corePoolSize})* both before and after the timed wait, and if the queue is* non-empty, this worker is not the last thread in the pool.** &#64;return task, or null if the worker must exit, in which case* workerCount is decremented*/private Runnable getTask() {boolean timedOut &#61; false; // Did the last poll() time out?for (;;) {int c &#61; ctl.get();// Check if queue empty only if necessary.if (runStateAtLeast(c, SHUTDOWN)&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc &#61; workerCountOf(c);// Are workers subject to culling?boolean timed &#61; allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {// 从阻塞队列中获取&#xff0c; 此处有用到延迟消息// poll(keepAliveTime 是延迟获取// take() 是阻塞获取Runnable r &#61; timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r !&#61; null)return r;timedOut &#61; true;} catch (InterruptedException retry) {timedOut &#61; false;}}}

总结:

  1. 从阻塞队列中获取&#xff0c; 此处有用到延迟消息
  2. poll(keepAliveTime 是延迟获取
  3. take() 是阻塞获取

processWorkerExit(w, completedAbruptly);

/*** Performs cleanup and bookkeeping for a dying worker. Called* only from worker threads. Unless completedAbruptly is set,* assumes that workerCount has already been adjusted to account* for exit. This method removes thread from worker set, and* possibly terminates the pool or replaces the worker if either* it exited due to user task exception or if fewer than* corePoolSize workers are running or queue is non-empty but* there are no workers.** &#64;param w the worker* &#64;param completedAbruptly if the worker died due to user exception*/private void processWorkerExit(Worker w, boolean completedAbruptly) {if (completedAbruptly) // If abrupt, then workerCount wasn&#39;t adjusteddecrementWorkerCount();final ReentrantLock mainLock &#61; this.mainLock;mainLock.lock();try {completedTaskCount &#43;&#61; w.completedTasks;workers.remove(w);} finally {mainLock.unlock();}tryTerminate();int c &#61; ctl.get();if (runStateLessThan(c, STOP)) {if (!completedAbruptly) {int min &#61; allowCoreThreadTimeOut ? 0 : corePoolSize;if (min &#61;&#61; 0 && ! workQueue.isEmpty())min &#61; 1;if (workerCountOf(c) >&#61; min)return; // replacement not needed}addWorker(null, false);}}

总结&#xff1a;

  1. 回收Worker
  2. 通过addWorker添加到worker集合中

怎样回收线程

addWorkder(null, false);
添加一个null的runnable, 当为空时候,会从队列中去取.


推荐阅读
  • 本文深入探讨了 Java 中的 Serializable 接口,解释了其实现机制、用途及注意事项,帮助开发者更好地理解和使用序列化功能。 ... [详细]
  • 本文详细介绍了Akka中的BackoffSupervisor机制,探讨其在处理持久化失败和Actor重启时的应用。通过具体示例,展示了如何配置和使用BackoffSupervisor以实现更细粒度的异常处理。 ... [详细]
  • 本文详细介绍了Java编程语言中的核心概念和常见面试问题,包括集合类、数据结构、线程处理、Java虚拟机(JVM)、HTTP协议以及Git操作等方面的内容。通过深入分析每个主题,帮助读者更好地理解Java的关键特性和最佳实践。 ... [详细]
  • 本文将介绍如何编写一些有趣的VBScript脚本,这些脚本可以在朋友之间进行无害的恶作剧。通过简单的代码示例,帮助您了解VBScript的基本语法和功能。 ... [详细]
  • Explore a common issue encountered when implementing an OAuth 1.0a API, specifically the inability to encode null objects and how to resolve it. ... [详细]
  • 本文详细介绍了 Apache Jena 库中的 Txn.executeWrite 方法,通过多个实际代码示例展示了其在不同场景下的应用,帮助开发者更好地理解和使用该方法。 ... [详细]
  • Scala 实现 UTF-8 编码属性文件读取与克隆
    本文介绍如何使用 Scala 以 UTF-8 编码方式读取属性文件,并实现属性文件的克隆功能。通过这种方式,可以确保配置文件在多线程环境下的一致性和高效性。 ... [详细]
  • 本文详细探讨了JDBC(Java数据库连接)的内部机制,重点分析其作为服务提供者接口(SPI)框架的应用。通过类图和代码示例,展示了JDBC如何注册驱动程序、建立数据库连接以及执行SQL查询的过程。 ... [详细]
  • 并发编程 12—— 任务取消与关闭 之 shutdownNow 的局限性
    Java并发编程实践目录并发编程01——ThreadLocal并发编程02——ConcurrentHashMap并发编程03——阻塞队列和生产者-消费者模式并发编程04——闭锁Co ... [详细]
  • 本文介绍如何使用Objective-C结合dispatch库进行并发编程,以提高素数计数任务的效率。通过对比纯C代码与引入并发机制后的代码,展示dispatch库的强大功能。 ... [详细]
  • 本文详细介绍了如何构建一个高效的UI管理系统,集中处理UI页面的打开、关闭、层级管理和页面跳转等问题。通过UIManager统一管理外部切换逻辑,实现功能逻辑分散化和代码复用,支持多人协作开发。 ... [详细]
  • 作为一名新手,您可能会在初次尝试使用Eclipse进行Struts开发时遇到一些挑战。本文将为您提供详细的指导和解决方案,帮助您克服常见的配置和操作难题。 ... [详细]
  • 本文提供了使用Java实现Bellman-Ford算法解决POJ 3259问题的代码示例,详细解释了如何通过该算法检测负权环来判断时间旅行的可能性。 ... [详细]
  • 本问题探讨了在特定条件下排列儿童队伍的方法数量。题目要求计算满足条件的队伍排列总数,并使用递推算法和大数处理技术来解决这一问题。 ... [详细]
  • 本题要求在一组数中反复取出两个数相加,并将结果放回数组中,最终求出最小的总加法代价。这是一个经典的哈夫曼编码问题,利用贪心算法可以有效地解决。 ... [详细]
author-avatar
ruigh
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有