热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

ThreadPoolExecutor源码解析(线程池的工作原理)

资料使用ThreadPoolExecutor遇到的核心线程被阻塞,非核心线程未按照预期运行问题简单分析ThreadPoolExecutor回收工作线程的原理线程有
资料

使用ThreadPoolExecutor遇到的核心线程被阻塞,非核心线程未按照预期运行问题
简单分析ThreadPoolExecutor回收工作线程的原理

线程有几种状态

public enum State {/*** Thread state for a thread which has not yet started. 尚未启动的线程的线程状态。*/NEW,/*** Thread state for a runnable thread. A thread in the runnable* state is executing in the Java virtual machine but it may* be waiting for other resources from the operating system* such as processor. * 可运行线程的线程状态。处于可运行状态的线程正在 Java 虚拟机中执行,但它可能正在等待来自操作系统的其他资源,例如处理器。*/RUNNABLE,/*** Thread state for a thread blocked waiting for a monitor lock.* A thread in the blocked state is waiting for a monitor lock* to enter a synchronized block/method or* reenter a synchronized block/method after calling* {@link Object#wait() Object.wait}.* 线程阻塞等待监视器锁的线程状态。处于阻塞状态的线程在调用后等待监视器锁进入同步块/方法*/BLOCKED,/*** Thread state for a waiting thread.* A thread is in the waiting state due to calling one of the* following methods:* 等待线程的线程状态。由于调用以下方法之一,线程处于等待状态:*

    *
  • {@link Object#wait() Object.wait} with no timeout
  • *
  • {@link #join() Thread.join} with no timeout
  • *
  • {@link LockSupport#park() LockSupport.park}
  • *
**

A thread in the waiting state is waiting for another thread to* perform a particular action.* 处于等待状态的线程正在等待另一个线程执行特定操作。** For example, a thread that has called {@code Object.wait()}* on an object is waiting for another thread to call* {@code Object.notify()} or {@code Object.notifyAll()} on* that object. A thread that has called {@code Thread.join()}* is waiting for a specified thread to terminate.*/WAITING,/*** Thread state for a waiting thread with a specified waiting time.* A thread is in the timed waiting state due to calling one of* the following methods with a specified positive waiting time:* 具有指定等待时间的等待线程的线程状态。由于以指定的正等待时间调用以下方法之一,线程处于定时等待状态:*

    *
  • {@link #sleep Thread.sleep}
  • *
  • {@link Object#wait(long) Object.wait} with timeout
  • *
  • {@link #join(long) Thread.join} with timeout
  • *
  • {@link LockSupport#parkNanos LockSupport.parkNanos}
  • *
  • {@link LockSupport#parkUntil LockSupport.parkUntil}
  • *
*/TIMED_WAITING,/*** Thread state for a terminated thread.* The thread has completed execution.* 已终止线程的线程状态。线程已完成执行。*/TERMINATED;}

1. ThreadPoolExecutor构造函数

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,threadFactory, defaultHandler);}int corePoolSize, // 核心线程
int maximumPoolSize, // 最大线程数
long keepAliveTime, // 持续时间
TimeUnit unit, // 时间单位
BlockingQueue<Runnable> workQueue, // 阻塞线程
ThreadFactory threadFactory //工厂方法

2. public void execute(Runnable command)

public void execute(Runnable command) {if (command &#61;&#61; null)throw new NullPointerException();/** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task. The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn&#39;t, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread. If it fails, we know we are shut down or saturated* and so reject the task.*/int c &#61; ctl.get();// 小于核心线程&#xff0c;直接加入if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c &#61; ctl.get();}// 判断是否加入阻塞队列if (isRunning(c) && workQueue.offer(command)) {int recheck &#61; ctl.get();if (! isRunning(recheck) && remove(command))reject(command); // 拒绝策略else if (workerCountOf(recheck) &#61;&#61; 0)addWorker(null, false);}else if (!addWorker(command, false)) // 尝试加入非核心线程reject(command); // 没有加入成功&#xff0c;拒绝策略}

总结&#xff1a;

  1. 先尝试加入核心线程&#xff08;如果还没超过核心线程数&#xff09;
  2. 再尝试加入阻塞队列&#xff0c;如果没有SHUTDOWN&#xff0c;判断核心运行为0&#xff0c;则再次运行
  3. 最后加入非核心线程&#xff0c;没有加入&#xff0c;怎拒绝策略

3. addWorker

/** Methods for creating, running and cleaning up after workers*//*** Checks if a new worker can be added with respect to current* pool state and the given bound (either core or maximum). If so,* the worker count is adjusted accordingly, and, if possible, a* new worker is created and started, running firstTask as its* first task. This method returns false if the pool is stopped or* eligible to shut down. It also returns false if the thread* factory fails to create a thread when asked. If the thread* creation fails, either due to the thread factory returning* null, or due to an exception (typically OutOfMemoryError in* Thread.start()), we roll back cleanly.** &#64;param firstTask the task the new thread should run first (or* null if none). Workers are created with an initial first task* (in method execute()) to bypass queuing when there are fewer* than corePoolSize threads (in which case we always start one),* or when the queue is full (in which case we must bypass queue).* Initially idle threads are usually created via* prestartCoreThread or to replace other dying workers.** &#64;param core if true use corePoolSize as bound, else* maximumPoolSize. (A boolean indicator is used here rather than a* value to ensure reads of fresh values after checking other pool* state).* &#64;return true if successful*/private boolean addWorker(Runnable firstTask, boolean core) {retry:for (int c &#61; ctl.get();;) {// Check if queue empty only if necessary.if (runStateAtLeast(c, SHUTDOWN)&& (runStateAtLeast(c, STOP)|| firstTask !&#61; null|| workQueue.isEmpty()))return false;for (;;) {// 如果不满足核心和非核心线程数&#xff0c;就falseif (workerCountOf(c)>&#61; ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))return false;if (compareAndIncrementWorkerCount(c))break retry;c &#61; ctl.get(); // Re-read ctlif (runStateAtLeast(c, SHUTDOWN))continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted &#61; false;boolean workerAdded &#61; false;Worker w &#61; null;try {// 创建一个Worker工作类w &#61; new Worker(firstTask);final Thread t &#61; w.thread;if (t !&#61; null) {final ReentrantLock mainLock &#61; this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int c &#61; ctl.get();if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask &#61;&#61; null)) {if (t.getState() !&#61; Thread.State.NEW)throw new IllegalThreadStateException();workers.add(w);workerAdded &#61; true;int s &#61; workers.size();if (s > largestPoolSize)largestPoolSize &#61; s;}} finally {mainLock.unlock();}if (workerAdded) {t.start();workerStarted &#61; true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;}

总结&#xff1a;
1. 判断是否在核心和非核心线程内
2. 创建一个Worker
3. 添加到workers线程中
4. 开启Worker中Thread的线程

4. Worker

private final class Worker extends AbstractQueuedSynchronizer implements Runnable{/*** This class will never be serialized, but we provide a* serialVersionUID to suppress a javac warning.*/private static final long serialVersionUID &#61; 6138294804551838833L;/** Thread this worker is running in. Null if factory fails. */final Thread thread;/** Initial task to run. Possibly null. */Runnable firstTask;/** Per-thread task counter */volatile long completedTasks;// TODO: switch to AbstractQueuedLongSynchronizer and move// completedTasks into the lock word./*** Creates with given first task and thread from ThreadFactory.* &#64;param firstTask the first task (null if none)*/Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask &#61; firstTask;this.thread &#61; getThreadFactory().newThread(this);}/** Delegates main run loop to outer runWorker. */public void run() {// 真正运行线程开始runWorker(this);}// Lock methods//// The value 0 represents the unlocked state.// The value 1 represents the locked state.protected boolean isHeldExclusively() {return getState() !&#61; 0;}protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);setState(0);return true;}public void lock() { acquire(1); }public boolean tryLock() { return tryAcquire(1); }public void unlock() { release(1); }public boolean isLocked() { return isHeldExclusively(); }void interruptIfStarted() {Thread t;if (getState() >&#61; 0 && (t &#61; thread) !&#61; null && !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}}}}

总结&#xff1a;

  1. this.thread &#61; getThreadFactory().newThread(this); 要运行Worker里面的run方法
  2. run中运行外面的runWorker方法
  3. firstTask也就起到一个保存&#xff0c;和被调用的作用

5. runWorker

/*** Main worker run loop. Repeatedly gets tasks from queue and* executes them, while coping with a number of issues:** 1. We may start out with an initial task, in which case we* don&#39;t need to get the first one. Otherwise, as long as pool is* running, we get tasks from getTask. If it returns null then the* worker exits due to changed pool state or configuration* parameters. Other exits result from exception throws in* external code, in which case completedAbruptly holds, which* usually leads processWorkerExit to replace this thread.** 2. Before running any task, the lock is acquired to prevent* other pool interrupts while the task is executing, and then we* ensure that unless pool is stopping, this thread does not have* its interrupt set.** 3. Each task run is preceded by a call to beforeExecute, which* might throw an exception, in which case we cause thread to die* (breaking loop with completedAbruptly true) without processing* the task.** 4. Assuming beforeExecute completes normally, we run the task,* gathering any of its thrown exceptions to send to afterExecute.* We separately handle RuntimeException, Error (both of which the* specs guarantee that we trap) and arbitrary Throwables.* Because we cannot rethrow Throwables within Runnable.run, we* wrap them within Errors on the way out (to the thread&#39;s* UncaughtExceptionHandler). Any thrown exception also* conservatively causes thread to die.** 5. After task.run completes, we call afterExecute, which may* also throw an exception, which will also cause thread to* die. According to JLS Sec 14.20, this exception is the one that* will be in effect even if task.run throws.** The net effect of the exception mechanics is that afterExecute* and the thread&#39;s UncaughtExceptionHandler have as accurate* information as we can provide about any problems encountered by* user code.** &#64;param w the worker*/final void runWorker(Worker w) {Thread wt &#61; Thread.currentThread();Runnable task &#61; w.firstTask;w.firstTask &#61; null;w.unlock(); // allow interruptsboolean completedAbruptly &#61; true;try {// 1. task不为空// 2. getTask 从阻塞列表中获取while (task !&#61; null || (task &#61; getTask()) !&#61; null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);try {task.run();afterExecute(task, null);} catch (Throwable ex) {afterExecute(task, ex);throw ex;}} finally {task &#61; null;w.completedTasks&#43;&#43;;w.unlock();}}completedAbruptly &#61; false;} finally {processWorkerExit(w, completedAbruptly);}}

总结;

  1. task不为空&#xff0c;就直接执行task
  2. task为空(firstTask&#61;&#61;null)&#xff0c;通过getTask获取任务
  3. 此时runWorker在Thread线程里
  4. 运行完之后调用processWorkerExit(…

getTask

/*** Performs blocking or timed wait for a task, depending on* current configuration settings, or returns null if this worker* must exit because of any of:* 1. There are more than maximumPoolSize workers (due to* a call to setMaximumPoolSize).* 2. The pool is stopped.* 3. The pool is shutdown and the queue is empty.* 4. This worker timed out waiting for a task, and timed-out* workers are subject to termination (that is,* {&#64;code allowCoreThreadTimeOut || workerCount > corePoolSize})* both before and after the timed wait, and if the queue is* non-empty, this worker is not the last thread in the pool.** &#64;return task, or null if the worker must exit, in which case* workerCount is decremented*/private Runnable getTask() {boolean timedOut &#61; false; // Did the last poll() time out?for (;;) {int c &#61; ctl.get();// Check if queue empty only if necessary.if (runStateAtLeast(c, SHUTDOWN)&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc &#61; workerCountOf(c);// Are workers subject to culling?boolean timed &#61; allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {// 从阻塞队列中获取&#xff0c; 此处有用到延迟消息// poll(keepAliveTime 是延迟获取// take() 是阻塞获取Runnable r &#61; timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r !&#61; null)return r;timedOut &#61; true;} catch (InterruptedException retry) {timedOut &#61; false;}}}

总结:

  1. 从阻塞队列中获取&#xff0c; 此处有用到延迟消息
  2. poll(keepAliveTime 是延迟获取
  3. take() 是阻塞获取

processWorkerExit(w, completedAbruptly);

/*** Performs cleanup and bookkeeping for a dying worker. Called* only from worker threads. Unless completedAbruptly is set,* assumes that workerCount has already been adjusted to account* for exit. This method removes thread from worker set, and* possibly terminates the pool or replaces the worker if either* it exited due to user task exception or if fewer than* corePoolSize workers are running or queue is non-empty but* there are no workers.** &#64;param w the worker* &#64;param completedAbruptly if the worker died due to user exception*/private void processWorkerExit(Worker w, boolean completedAbruptly) {if (completedAbruptly) // If abrupt, then workerCount wasn&#39;t adjusteddecrementWorkerCount();final ReentrantLock mainLock &#61; this.mainLock;mainLock.lock();try {completedTaskCount &#43;&#61; w.completedTasks;workers.remove(w);} finally {mainLock.unlock();}tryTerminate();int c &#61; ctl.get();if (runStateLessThan(c, STOP)) {if (!completedAbruptly) {int min &#61; allowCoreThreadTimeOut ? 0 : corePoolSize;if (min &#61;&#61; 0 && ! workQueue.isEmpty())min &#61; 1;if (workerCountOf(c) >&#61; min)return; // replacement not needed}addWorker(null, false);}}

总结&#xff1a;

  1. 回收Worker
  2. 通过addWorker添加到worker集合中

怎样回收线程

addWorkder(null, false);
添加一个null的runnable, 当为空时候,会从队列中去取.


推荐阅读
  • 深入解析Java并发之ArrayBlockingQueue
    本文详细探讨了ArrayBlockingQueue,这是一种基于数组实现的阻塞队列。ArrayBlockingQueue在初始化时需要指定容量,因此它是一个有界的阻塞队列。文章不仅介绍了其基本概念和数据结构,还深入分析了其源码实现,包括各种入队、出队、获取元素和删除元素的方法。 ... [详细]
  • 本文详细介绍了Oracle RMAN中的增量备份机制,重点解析了差异增量和累积增量备份的概念及其在不同Oracle版本中的实现。通过对比两种备份方式的特点,帮助读者选择合适的备份策略。 ... [详细]
  • 本文介绍了如何通过创建自定义 XML 文件来修改 Android 中 Spinner 的项样式,包括颜色和大小的调整。 ... [详细]
  • 本文提供了一个SQL脚本,用于在Microsoft SQL Server中创建一个数据字典视图,该视图详细列出了表名、表描述、字段名称、字段描述、字段类型、字段大小、字段精度、是否可为空、默认值以及是否为标识或主键等信息。 ... [详细]
  • A1166 峰会区域安排问题(25分)PAT甲级 C++满分解析【图论】
    峰会是指国家元首或政府首脑之间的会议。合理安排峰会的休息区是一项复杂的工作,理想的情况是邀请的每位领导人都是彼此的直接朋友。 ... [详细]
  • 本文探讨了Java中有效停止线程的多种方法,包括使用标志位、中断机制及处理阻塞I/O操作等,旨在帮助开发者避免使用已废弃的危险方法,确保线程安全和程序稳定性。 ... [详细]
  • 深入解析mt_allocator内存分配器(二):多线程与单线程场景下的实现
    本文详细介绍了mt_allocator内存分配器在多线程和单线程环境下的实现机制。该分配器以2的幂次方字节为单位分配内存,支持灵活的配置和高效的性能。文章分为内存池特性描述、内存池实现、单线程内存池实现、内存池策略类实现及多线程内存池实现等部分,深入探讨了内存池的初始化、内存分配与回收的具体实现。 ... [详细]
  • 本文详细介绍了如何在本地环境中安装配置Frida及其服务器组件,以及如何通过Frida进行基本的应用程序动态分析,包括获取应用版本和加载的类信息。 ... [详细]
  • 本文详细解析 Skynet 的启动流程,包括配置文件的读取、环境变量的设置、主要线程的启动(如 timer、socket、monitor 和 worker 线程),以及消息队列的实现机制。 ... [详细]
  • 本文介绍了一个将 Java 实体对象转换为 Map 的工具类,通过反射机制获取实体类的字段并将其值映射到 Map 中,适用于需要将对象数据结构化处理的场景。 ... [详细]
  • 文章目录IntroductionRelatedWork网络稀疏化(NetworkSlimming)whychoosechennel-levelspars ... [详细]
  • iOS 小组件开发指南
    本文详细介绍了iOS小部件(Widget)的开发流程,从环境搭建、证书配置到业务逻辑实现,提供了一系列实用的技术指导与代码示例。 ... [详细]
  • 本文将详细介绍如何配置并整合MVP架构、Retrofit网络请求库、Dagger2依赖注入框架以及RxAndroid响应式编程库,构建高效、模块化的Android应用。 ... [详细]
  • Python Selenium WebDriver 浏览器驱动详解与实践
    本文详细介绍了如何使用Python结合Selenium和unittest构建自动化测试框架,重点解析了WebDriver浏览器驱动的配置与使用方法,涵盖Chrome、Firefox、IE/Edge等主流浏览器。 ... [详细]
  • 本文探讨了一个Web工程项目的需求,即允许用户随时添加定时任务,并通过Quartz框架实现这些任务的自动化调度。文章将介绍如何设计任务表以存储任务信息和执行周期,以及如何通过一个定期扫描机制自动识别并加载新任务到调度系统中。 ... [详细]
author-avatar
ruigh
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有