热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

hystrix源码线程池隔离_线程池ThreadPoolExecutor原理及源码笔记

前言前面在学习JUC源码时,很多代码举例中都使用了线程池ThreadPoolExecutor,并且在工作中也经常用到线程池,所以现在就一
4371738cc38a99c20ebe268e48a3aa64.png 前言"前面在学习 JUC 源码时,很多代码举例中都使用了线程池 ThreadPoolExecutor ,并且在工作中也经常用到线程池,所以现在就一步一步看看,线程池的源码,了解其背后的核心原理。"

  1  

介绍

  什么是线程池

"

线程池(英语:thread pool :一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。

—— 维基百科

  为什么要使用线程池

  1. 降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
  2. 提高响应速度:任务到达时,无需等待线程创建即可立即执行。
  3. 提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。

  如何使用线程池

线程池使用有很多种方式,不过按照《Java 开发手册》描述,尽量还是要使用 ThreadPoolExecutor 进行创建。

a3e39d87d2555fd16c6c1c263f5657c2.png

代码举例:


ExecutorService pool = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(1024),
                new ThreadFactoryBuilder().setNameFormat("Thread-pool-%d").build(),
                new ThreadPoolExecutor.AbortPolicy());

那创建线程池的这些构造参数有什么含义&#xff1f;线程池的运行原理是什么&#xff1f;下面则开始通过源码及作图一步一步的了解。

  2  

源码分析

  参数介绍

public class ThreadPoolExecutor extends AbstractExecutorService {
    /**
    * ctx 为原子类型的变量, 有两个概念
    * workerCount, 表示有效的线程数
    * runState, 表示线程状态, 是否正在运行, 关闭等
    */
    private final AtomicInteger ctl &#61; new AtomicInteger(ctlOf(RUNNING, 0));
    // 29
    private static final int COUNT_BITS &#61; Integer.SIZE - 3;
    // 容量 2²⁹-1
    private static final int CAPACITY   &#61; (1 <1;
    // runState is stored in the high-order bits 线程池的五中状态
    // 即高3位为111, 接受新任务并处理排队任务
    private static final int RUNNING    &#61; -1 <    // 即高3位为000, 不接受新任务, 但处理排队任务
    private static final int SHUTDOWN   &#61;  0 <    // 即高3位为001, 不接受新任务, 不处理排队任务, 并中断正在进行的任务
    private static final int STOP       &#61;  1 <    // 即高3位为010, 所有任务都已终止, 工作线程为0, 线程转换到状态TIDYING, 将运行terminate()钩子方法
    private static final int TIDYING    &#61;  2 <    // 即高3位为011, 标识terminate()已经完成
    private static final int TERMINATED &#61;  3 <    // Packing and unpacking ctl 用来计算线程的方法
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }
}

  构造参数及含义


public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    // 省略
}

参数说明&#xff1a;

  1. corePoolSize - 核心线程数&#xff0c;提交任务时&#xff0c;如果当前线程池的数量小于 corePoolSize&#xff0c;则创建新线程执行任务。
  2. maximumPoolSize - 最大线程数&#xff0c;如果阻塞队列已满&#xff0c;并且线程数小于 maximumPoolSize&#xff0c;则会创建新线程执行任务。
  3. keepAliveTime - 当线程数大于核心线程数时&#xff0c;且线程空闲&#xff0c;keepAliveTime 时间后会销毁线程。
  4. unit - keepAliveTime 的时间单位。
  5. workQueue - 阻塞队列&#xff0c;当线程数大于核心线程数时&#xff0c;用来保存任务。
  6. threadFactory - 线程创建的工厂。
  7. handler - 线程饱和策略。

  线程池执行流程

06b6b3085993199bccbf59dae5bbf91e.png

  execute 源码

public class ThreadPoolExecutor extends AbstractExecutorService {

    public void execute(Runnable command) {
        // 空则抛出异常
        if (command &#61;&#61; null)
            throw new NullPointerException();
        // 获取当前线程池的状态
        int c &#61; ctl.get();
        // 计算工作线程数 并判断是否小于核心线程数
        if (workerCountOf(c)             // addWorker提交任务, 提交成功则结束
            if (addWorker(command, true))
                return;
            // 提交失败再次获取当前状态
            c &#61; ctl.get();
        }
        // 判断线程状态, 并插入队列, 失败则移除
        if (isRunning(c) && workQueue.offer(command)) {
            // 再次获取状态
            int recheck &#61; ctl.get();
            // 如果状态不是RUNNING, 并移除失败
            if (! isRunning(recheck) && remove(command))
                // 调用拒绝策略
                reject(command);
            // 如果工作线程为0 则调用 addWorker
            else if (workerCountOf(recheck) &#61;&#61; 0)
                addWorker(null, false);
        }
        // 提交任务失败 走拒绝策略
        else if (!addWorker(command, false))
            reject(command);
    }

}

execute 方法流程和流程图画的相同&#xff0c;值得注意的是&#xff1a;

  1. 当前线程数小于核心线程数&#xff0c;则会创建新线程&#xff0c;这里即使是核心线程数有空闲线程也会创建新线程&#xff01;
  2. 而核心线程里面的空闲线程会不断执行阻塞队列里面的任务。
  • workQueue阻塞队列&#xff1a;
  1. ArrayBlockingQueue: 是一个基于数组结构的有界阻塞队列&#xff0c;此队列按 FIFO(先进先出) 原则对元素进行排序。
  2. LinkedBlockingQueue: 一个基于链表结构的阻塞队列,此队列按 FIFO(先进先出) 排序元素&#xff0c;吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
  3. SynchronousQueue: 一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作。否则插入操作一直处于阻塞状态&#xff0c;吞吐量通常要高于LinkedBlockingQueue&#xff0c;静态工厂方法Executors.newCachedThreadPool使用了这个队列。
  4. PriorityBlockingQueue: 一个具有优先级的无限阻塞队列。
  • 线程工厂&#xff1a;

// 默认工厂
ThreadFactory threadFactory &#61; Executors.defaultThreadFactory();
// google guava工具提供
ThreadFactory namedThreadFactory &#61; new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();

一般创建工厂&#xff0c;是为了更好的排查问题&#xff0c;也建议使用工厂指定线程名字。

  • handler线程拒绝策略&#xff1a;

当线程池达到最大线程数&#xff0c;并且队列满了&#xff0c;新的线程要采取的处理策略。

  1. AbortPolicy 拒绝新任务并抛出RejectedExecutionException异常。
  2. CallerRunsPolicy 直接在调用程序的线程中运行。
  3. DiscardOldestPolicy 放弃最早的任务, 即队列最前面的任务。
  4. DiscardPolicy 丢弃&#xff0c;不处理。

  addWorker 源码

public class ThreadPoolExecutor extends AbstractExecutorService {
    /**
     * 检查任务是否可以提交
     *
     */
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        // 外层循环 
        for (;;) {
            // 获取当前状态
            int c &#61; ctl.get();
            int rs &#61; runStateOf(c);

            // 检查线程池是否关闭
            if (rs >&#61; SHUTDOWN &&
                ! (rs &#61;&#61; SHUTDOWN &&
                   firstTask &#61;&#61; null &&
                   ! workQueue.isEmpty()))
                return false;
            // 内层循环 CAS 增加线程个数
            for (;;) {
                int wc &#61; workerCountOf(c);
                // 工作线程大于容量 或者大于 核心或最大线程数
                if (wc >&#61; CAPACITY ||
                    wc >&#61; (core ? corePoolSize : maximumPoolSize))
                    return false;
                // CAS 线程数增加, 成功则调到外层循环
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                // 失败则再次获取线程状态
                c &#61; ctl.get();  // Re-read ctl
                // 不相等则重新走外层循环
                if (runStateOf(c) !&#61; rs)
                    continue retry;
                // 否则内层继续循环
            }
        }
        /**
         * 创建新worker 开始新线程
         * 此时已经 CAS 成功了
         */
        boolean workerStarted &#61; false;
        boolean workerAdded &#61; false;
        Worker w &#61; null;
        try {
            // 创建 Worker
            w &#61; new Worker(firstTask);
            final Thread t &#61; w.thread;
            if (t !&#61; null) {
                final ReentrantLock mainLock &#61; this.mainLock;
                // 加锁&#xff0c;防止多线程同时执行线程池的 execute
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs &#61; runStateOf(ctl.get());

                    if (rs                         (rs &#61;&#61; SHUTDOWN && firstTask &#61;&#61; null)) {
                        // 判断线程是否存活, 已存活抛出非法异常
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        // 添加任务
                        workers.add(w);
                        int s &#61; workers.size();
                        // 设置池最大大小, 并将 workerAdded设置为 true
                        if (s > largestPoolSize)
                            largestPoolSize &#61; s;
                        workerAdded &#61; true;
                    }
                } finally {
                    // 解锁
                    mainLock.unlock();
                }
                // 添加成功 开始启动线程 并将 workerStarted 设置为 true
                if (workerAdded) {
                    t.start();
                    workerStarted &#61; true;
                }
            }
        } finally {
            // 启动线程失败
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
    /**
     * 启动线程失败, 加锁
     * 移除线程, 并减少线程总数
     * 转换状态
     */
    private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock &#61; this.mainLock;
        mainLock.lock();
        try {
            if (w !&#61; null)
                workers.remove(w);
            decrementWorkerCount();
            tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }

}

addWorker 代码比较长&#xff0c;主要分为两部分&#xff1a;

  1. 双重循环&#xff0c;使用 CAS 增加线程数。
  2. 创建工作线程 Worker &#xff0c;并使用独占锁&#xff0c;将其添加到线程池&#xff0c;并启动。

  3  

总结

  Q&A

Q: 线程池的原理及相关参数&#xff1f;

A: 主要参数为核心线程数、阻塞队列、最大线程数、拒绝策略。

Q: 线程池的线程是怎么回收的&#xff1f;

A: 线程被创建之后&#xff0c;如果 task &#61;&#61; null 或者调用 getTask 获取任务为 null&#xff0c;则调用 processWorkerExit 对线程执行清理工作。

Q: 核心线程是不是就不可以回收了&#xff1f;

A: 核心线程数只会增加&#xff0c;而又没有回收&#xff0c;这时候假如线程池没有任务&#xff0c;就会一直维持核心线程。

当然也可以通过调用 allowCoreThreadTimeOut 方法&#xff0c;设置是否允许回收核心线程。

  结束语

通过阅读 ThreadPoolExecutor 了解线程池的基本结构和原理&#xff0c;至于其他的更多扩展&#xff0c;文章篇幅有限&#xff0c;就需要小伙伴们自己阅读了。

- -


历史文章 &#xff5c; 相关推荐

  • ReentrantLock 源码、画图一起看一看&#xff01;
  • ReentrantReadWriteLock 的原理&#xff01;
  • Spring 自调用事务失效&#xff0c;你是怎么解决的&#xff1f;

117f80f69d85f88f5ac93001562273f4.gif




推荐阅读
  • 深入理解Kafka服务端请求队列中请求的处理
    本文深入分析了Kafka服务端请求队列中请求的处理过程,详细介绍了请求的封装和放入请求队列的过程,以及处理请求的线程池的创建和容量设置。通过场景分析、图示说明和源码分析,帮助读者更好地理解Kafka服务端的工作原理。 ... [详细]
  • LeetCode笔记:剑指Offer 41. 数据流中的中位数(Java、堆、优先队列、知识点)
    本文介绍了LeetCode剑指Offer 41题的解题思路和代码实现,主要涉及了Java中的优先队列和堆排序的知识点。优先队列是Queue接口的实现,可以对其中的元素进行排序,采用小顶堆的方式进行排序。本文还介绍了Java中queue的offer、poll、add、remove、element、peek等方法的区别和用法。 ... [详细]
  • JVM 学习总结(三)——对象存活判定算法的两种实现
    本文介绍了垃圾收集器在回收堆内存前确定对象存活的两种算法:引用计数算法和可达性分析算法。引用计数算法通过计数器判定对象是否存活,虽然简单高效,但无法解决循环引用的问题;可达性分析算法通过判断对象是否可达来确定存活对象,是主流的Java虚拟机内存管理算法。 ... [详细]
  • Java中包装类的设计原因以及操作方法
    本文主要介绍了Java中设计包装类的原因以及操作方法。在Java中,除了对象类型,还有八大基本类型,为了将基本类型转换成对象,Java引入了包装类。文章通过介绍包装类的定义和实现,解答了为什么需要包装类的问题,并提供了简单易用的操作方法。通过本文的学习,读者可以更好地理解和应用Java中的包装类。 ... [详细]
  • 重入锁(ReentrantLock)学习及实现原理
    本文介绍了重入锁(ReentrantLock)的学习及实现原理。在学习synchronized的基础上,重入锁提供了更多的灵活性和功能。文章详细介绍了重入锁的特性、使用方法和实现原理,并提供了类图和测试代码供读者参考。重入锁支持重入和公平与非公平两种实现方式,通过对比和分析,读者可以更好地理解和应用重入锁。 ... [详细]
  • 本文介绍了在Android开发中使用软引用和弱引用的应用。如果一个对象只具有软引用,那么只有在内存不够的情况下才会被回收,可以用来实现内存敏感的高速缓存;而如果一个对象只具有弱引用,不管内存是否足够,都会被垃圾回收器回收。软引用和弱引用还可以与引用队列联合使用,当被引用的对象被回收时,会将引用加入到关联的引用队列中。软引用和弱引用的根本区别在于生命周期的长短,弱引用的对象可能随时被回收,而软引用的对象只有在内存不够时才会被回收。 ... [详细]
  • STL迭代器的种类及其功能介绍
    本文介绍了标准模板库(STL)定义的五种迭代器的种类和功能。通过图表展示了这几种迭代器之间的关系,并详细描述了各个迭代器的功能和使用方法。其中,输入迭代器用于从容器中读取元素,输出迭代器用于向容器中写入元素,正向迭代器是输入迭代器和输出迭代器的组合。本文的目的是帮助读者更好地理解STL迭代器的使用方法和特点。 ... [详细]
  • 深入解析Linux下的I/O多路转接epoll技术
    本文深入解析了Linux下的I/O多路转接epoll技术,介绍了select和poll函数的问题,以及epoll函数的设计和优点。同时讲解了epoll函数的使用方法,包括epoll_create和epoll_ctl两个系统调用。 ... [详细]
  • 深入理解Java虚拟机的并发编程与性能优化
    本文主要介绍了Java内存模型与线程的相关概念,探讨了并发编程在服务端应用中的重要性。同时,介绍了Java语言和虚拟机提供的工具,帮助开发人员处理并发方面的问题,提高程序的并发能力和性能优化。文章指出,充分利用计算机处理器的能力和协调线程之间的并发操作是提高服务端程序性能的关键。 ... [详细]
  • Java容器中的compareto方法排序原理解析
    本文从源码解析Java容器中的compareto方法的排序原理,讲解了在使用数组存储数据时的限制以及存储效率的问题。同时提到了Redis的五大数据结构和list、set等知识点,回忆了作者大学时代的Java学习经历。文章以作者做的思维导图作为目录,展示了整个讲解过程。 ... [详细]
  • 计算机存储系统的层次结构及其优势
    本文介绍了计算机存储系统的层次结构,包括高速缓存、主存储器和辅助存储器三个层次。通过分层存储数据可以提高程序的执行效率。计算机存储系统的层次结构将各种不同存储容量、存取速度和价格的存储器有机组合成整体,形成可寻址存储空间比主存储器空间大得多的存储整体。由于辅助存储器容量大、价格低,使得整体存储系统的平均价格降低。同时,高速缓存的存取速度可以和CPU的工作速度相匹配,进一步提高程序执行效率。 ... [详细]
  • 本文介绍了Swing组件的用法,重点讲解了图标接口的定义和创建方法。图标接口用来将图标与各种组件相关联,可以是简单的绘画或使用磁盘上的GIF格式图像。文章详细介绍了图标接口的属性和绘制方法,并给出了一个菱形图标的实现示例。该示例可以配置图标的尺寸、颜色和填充状态。 ... [详细]
  • 本文介绍了操作系统的定义和功能,包括操作系统的本质、用户界面以及系统调用的分类。同时还介绍了进程和线程的区别,包括进程和线程的定义和作用。 ... [详细]
  • 本文介绍了一道经典的状态压缩题目——关灯问题2,并提供了解决该问题的算法思路。通过使用二进制表示灯的状态,并枚举所有可能的状态,可以求解出最少按按钮的次数,从而将所有灯关掉。本文还对状压和位运算进行了解释,并指出了该方法的适用性和局限性。 ... [详细]
  • linux进阶50——无锁CAS
    1.概念比较并交换(compareandswap,CAS),是原⼦操作的⼀种,可⽤于在多线程编程中实现不被打断的数据交换操作࿰ ... [详细]
author-avatar
你说的白是小白的白_958
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有