热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Java并发编程札记(六)JUC线程池02ThreadPoolExecutor实现原理

本文通过学习ThreadPoolExecutor源码来学习线程池的实现原理。简介为什么要使用线程池许多服务器都面临着处理大量客户端远程请求的压力,如果每收到一个请

本文通过学习ThreadPoolExecutor源码来学习线程池的实现原理。


简介

为什么要使用线程池
许多服务器都面临着处理大量客户端远程请求的压力,如果每收到一个请求,就创建一个线程来处理,表面看是没有问题的,但实际上存在着很严重的缺陷。服务器应用程序中经常出现的情况是请求处理的任务很简单但客户端的数目却是庞大的,这种情况下如果还是每收到一个请求就创建一个线程来处理它,服务器在创建和销毁线程所花费的时间和资源可能比处理客户端请求处理的任务花费的时间和资源更多。为了缓解服务器压力,需要解决频繁创建和销毁线程的问题。线程池可以实现这个需求。

什么是线程池
线程池可以看做是许多线程的集合。在没有任务时线程处于空闲状态,当请求到来,线程池给这个请求分配一个空闲的线程,任务完成后回到线程池中等待下次任务。这样就实现了线程的重用。线程池会通过相应的调度策略和拒绝策略,对添加到线程池中的线程进行管理。

工作模型
MarkdownPhotos/master/CSDNBlogs/concurrency/0602/workModel.png

工作模型中一共有三种队列:正在执行的任务队列,等待被执行的阻塞队列,等待被commit进阻塞队列中的任务队列。

Java中的线程池
Java中常用的线程池有三个,最出名的当然是ThreadPoolExecutor,除此之外还有ScheduledThreadPoolExecutor、ForkJoinPool。本文主要学习ThreadPoolExecutor的实现原理。


创建ThreadPoolExecutor线程池

强烈推荐使用Executors工厂方法创建线程池,如Executors.newCachedThreadPool()(无界线程池,可以进行自动线程回收)、Executors.newFixedThreadPool(int)(固定大小线程池)和 Executors.newSingleThreadExecutor()(单个后台线程),它们均为大多数使用场景预定义了设置。本文主要学习如何手动配置,下面是ThreadPoolExecutor的一个构造方法。

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize <0 ||maximumPoolSize <&#61; 0 ||maximumPoolSize 0)throw new IllegalArgumentException();if (workQueue &#61;&#61; null || threadFactory &#61;&#61; null || handler &#61;&#61; null)throw new NullPointerException();this.corePoolSize &#61; corePoolSize;this.maximumPoolSize &#61; maximumPoolSize;this.workQueue &#61; workQueue;this.keepAliveTime &#61; unit.toNanos(keepAliveTime);this.threadFactory &#61; threadFactory;this.handler &#61; handler;
}

ThreadPoolExecutor一共有四个构造方法&#xff0c;其他三个构造方法都是通过上述的构造方法来实现的。毫无疑问手动配置线程池的关键就是学好构造方法中的几个参数如何设置。这几个参数对应着ThreadPoolExecutor中的几个成员属性。


属性

corePoolSize与maximumPoolSize分别是核心池大小与最大池大小。在源码中的声明为
private volatile int corePoolSize;private volatile int maximumPoolSize;

当新任务在方法 execute(java.lang.Runnable) 中提交时&#xff0c;如果运行的线程少于 corePoolSize&#xff0c;则创建新线程来处理请求&#xff0c;即使其他辅助线程是空闲的。如果运行的线程多于 corePoolSize 而少于 maximumPoolSize&#xff0c;则仅当队列满时才创建新线程。如果设置的 corePoolSize 和 maximumPoolSize 相同&#xff0c;则创建了固定大小的线程池。如果将 maximumPoolSize 设置为基本的无界值&#xff08;如 Integer.MAX_VALUE&#xff09;&#xff0c;则允许池适应任意数量的并发任务。在大多数情况下&#xff0c;核心和最大池大小仅基于构造来设置&#xff0c;不过也可以使用 setCorePoolSize(int) 和 setMaximumPoolSize(int) 进行动态更改。

workQueue是线程池工作模型中的阻塞队列&#xff0c;用于传输和保持提交的任务。在源码中的声明为private final BlockingQueue workQueue;

keepAliveTime是池中线程空闲时的活动时间。如果池中当前有多于 corePoolSize 的线程&#xff0c;则这些多出的线程在空闲时间超过 keepAliveTime 时将会终止&#xff08;参见 getKeepAliveTime(java.util.concurrent.TimeUnit)&#xff09;。这提供了当池处于非活动状态时减少资源消耗的方法。如果池后来变得更为活动&#xff0c;则可以创建新的线程。也可以使用方法 setKeepAliveTime(long, java.util.concurrent.TimeUnit) 动态地更改此参数。使用 Long.MAX_VALUE TimeUnit.NANOSECONDS 的值在关闭前有效地从以前的终止状态禁用空闲线程。默认情况下&#xff0c;保持活动策略只在有多于 corePoolSizeThreads 的线程时应用。但是只要 keepAliveTime 值非 0&#xff0c; allowCoreThreadTimeOut(boolean) 方法也可将此超时策略应用于核心线程。

threadFactory是一个线程集合。线程池可以使用ThreadFactory创建新线程。如果没有另外说明&#xff0c;则在同一个 ThreadGroup 中一律使用 Executors.defaultThreadFactory() 创建线程&#xff0c;并且这些线程具有相同的 NORM_PRIORITY 优先级和非守护进程状态。通过提供不同的 ThreadFactory&#xff0c;可以改变线程的名称、线程组、优先级、守护进程状态&#xff0c;等等。如果从 newThread 返回 null 时 ThreadFactory 未能创建线程&#xff0c;则执行程序将继续运行&#xff0c;但不能执行任何任务。

handler是线程池拒绝策略&#xff0c;RejectedExecutionHandler类型的对象。当 Executor 已经关闭&#xff0c;并且 Executor 将有限边界用于最大线程和工作队列容量&#xff0c;且已经饱和时&#xff0c;在方法 execute(java.lang.Runnable) 中提交的新任务将被拒绝。在以上两种情况下&#xff0c; execute 方法都将调用其 RejectedExecutionHandler 的 RejectedExecutionHandler.rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor) 方法。下面提供了四种预定义的处理程序策略&#xff1a;


  • ThreadPoolExecutor.AbortPolicy &#xff0c;默认策略&#xff0c;处理程序遭到拒绝将抛出运行时 RejectedExecutionException。
  • ThreadPoolExecutor.CallerRunsPolicy&#xff0c;线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制&#xff0c;能够减缓新任务的提交速度。
  • ThreadPoolExecutor.DiscardPolicy&#xff0c;不能执行的任务将被删除。
  • ThreadPoolExecutor.DiscardOldestPolicy&#xff0c;如果执行程序尚未关闭&#xff0c;则位于工作队列头部的任务将被删除&#xff0c;然后重试执行程序&#xff08;如果再次失败&#xff0c;则重复此过程&#xff09;。

除了上面的几个属性外&#xff0c;ThreadPoolExecutor还有下面的几个参数。

//ctl是一个AtomicInteger类型的原子对象。ctl记录了线程池中的任务数量和线程池状态。
private final AtomicInteger ctl &#61; new AtomicInteger(ctlOf(RUNNING, 0));
//线程池的锁
private final ReentrantLock mainLock &#61; new ReentrantLock();
//所有工作的线程
private final HashSet workers &#61; new HashSet();
//支持的等待condition
private final Condition termination &#61; mainLock.newCondition();
//线程池中线程数量曾经达到过的最大值
private int largestPoolSize;
//已完成任务数量
private long completedTaskCount;
//是否允许为核心线程设置存活时间
private volatile boolean allowCoreThreadTimeOut;

排队策略

排队有三种通用策略&#xff1a;
SynchronousQueue
它将任务直接传输给工作队列workers&#xff0c;而不保持任务。如果不存在空闲线程&#xff0c;则会新建一个线程来执行任务。比如&#xff0c;在Executors.newCachedThreadPool()方法中使用的就是此策略。

public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue());
}

LinkedBlockingQueue
有界队列&#xff0c;使用此队列会导致在所有corePoolSize线程都忙时新任务在队列中等待。这样&#xff0c;创建的线程就不会超过corePoolSize。比如&#xff0c;在Executors.newFixedThreadPool()和Executors.newSingleThreadExecutor()方法中使用的就是此策略。

public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue());
}
public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue()));
}

ArrayBlockingQueue
有界队列&#xff0c;没见到在哪里用到了这种策略。


线程池状态

源码已经告诉了我们线程池有几个状态。

private final AtomicInteger ctl &#61; new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS &#61; Integer.SIZE - 3;
private static final int CAPACITY &#61; (1 <1;// runState is stored in the high-order bits
private static final int RUNNING &#61; -1 <private static final int SHUTDOWN &#61; 0 <private static final int STOP &#61; 1 <private static final int TIDYING &#61; 2 <private static final int TERMINATED &#61; 3 <

可以看出&#xff0c;一共有RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED五种状态。ctl对象一共32位&#xff0c;高3位保存线程池状态信息&#xff0c;后29位保存线程池容量信息。线程池的初始化状态是RUNNING&#xff0c;在源码中体现为private final AtomicInteger ctl &#61; new AtomicInteger(ctlOf(RUNNING, 0));


状态高三位工作队列workers中的任务阻塞队列workQueue中的任务未添加的任务
RUNNING111继续处理继续处理添加
SHUTDOWN000继续处理继续处理不添加
STOP001尝试中断不处理不添加
TIDYING010处理完了如果由SHUTDOWN - TIDYING &#xff0c;那就是处理完了&#xff1b;如果由STOP - TIDYING &#xff0c;那就是不处理不添加
TERMINATED011同TIDYING同TIDYING同TIDYING

各个状态的转换图如下所示
MarkdownPhotos/master/CSDNBlogs/concurrency/0602/status.png


执行任务

execute(Runnable)

/** 在将来某个时间执行给定任务。* 可以在新线程中或者在现有池线程中执行该任务。* 如果无法将任务提交执行&#xff0c;或者因为此执行程序已关闭&#xff0c;或者因为已达到其容量&#xff0c;* 则该任务由当前 RejectedExecutionHandler 处理。*/
public void execute(Runnable command) {if (command &#61;&#61; null)throw new NullPointerException();int c &#61; ctl.get();//分三种情况处理//case1&#xff1a;如果线程池中运行的线程数量if (workerCountOf(c) //创建新线程来处理请求&#xff0c;即使其他辅助线程是空闲的if (addWorker(command, true))return;c &#61; ctl.get();}//case2&#xff1a;如果线程池中运行的线程数量>&#61;corePoolSize&#xff0c;且线程池处于RUNNING状态&#xff0c;且把提交的任务成功放入阻塞队列中if (isRunning(c) && workQueue.offer(command)) {int recheck &#61; ctl.get();//再次检查线程池的状态&#xff0c;如果线程池不是RUNNING状态&#xff0c;且成功从阻塞队列中删除任务if (! isRunning(recheck) && remove(command))//该任务由当前 RejectedExecutionHandler 处理reject(command);//如果线程池中运行的线程数量为0else if (workerCountOf(recheck) &#61;&#61; 0)//则通过addWorker(null, false)尝试新建一个线程&#xff0c;新建线程对应的任务为null。addWorker(null, false);}//case3&#xff1a;如果以上两种case不成立&#xff0c;即没能将任务成功放入阻塞队列中&#xff0c;且addWoker新建线程失败else if (!addWorker(command, false))//该任务由当前 RejectedExecutionHandler 处理reject(command);
}

看完后&#xff0c;我们知道execute()分三种情况处理任务

case1&#xff1a;如果线程池中运行的线程数量 case2&#xff1a;如果线程池中运行的线程数量>&#61;corePoolSize&#xff0c;且线程池处于RUNNING状态&#xff0c;且把提交的任务成功放入阻塞队列中&#xff0c;就再次检查线程池的状态&#xff0c;1.如果线程池不是RUNNING状态&#xff0c;且成功从阻塞队列中删除任务&#xff0c;则该任务由当前 RejectedExecutionHandler 处理。2.否则如果线程池中运行的线程数量为0&#xff0c;则通过addWorker(null, false)尝试新建一个线程&#xff0c;新建线程对应的任务为null。
case3&#xff1a;如果以上两种case不成立&#xff0c;即没能将任务成功放入阻塞队列中&#xff0c;且addWoker新建线程失败&#xff0c;则该任务由当前 RejectedExecutionHandler 处理。

submit

/*** 提交一个 Runnable 任务用于执行&#xff0c;并返回一个表示该任务的 Future。* 该 Future 的 get 方法在 成功 完成时将会返回 null。*/
public Future submit(Runnable task) {if (task &#61;&#61; null) throw new NullPointerException();RunnableFuture ftask &#61; newTaskFor(task, null);execute(ftask);return ftask;
}

可以看到此方法是通过调用execute(Runnable)实现的。


关闭线程池

ThreadPoolExecutor提供了shutdown()和shutdownNow()两个方法来关闭线程池。shutdown() 按过去执行已提交任务的顺序发起一个有序的关闭&#xff0c;但是不接受新任务。shutdownNow()尝试停止所有的活动执行任务、暂停等待任务的处理&#xff0c;并返回等待执行的任务列表。

shutdown()

/** 按过去执行已提交任务的顺序发起一个有序的关闭&#xff0c;不接受新任务。* 如果已经关闭&#xff0c;则调用没有其他作用。*/
public void shutdown() {final ReentrantLock mainLock &#61; this.mainLock;//step1.获取独占锁mainLock.lock();try {//step2.如果有安全管理器&#xff0c;使用安全管理器检查当前线程是否有权限关闭线程池checkShutdownAccess();//step3.将线程池状态设为SHUTDOWNadvanceRunState(SHUTDOWN);//step4.中断所有空闲线程interruptIdleWorkers();//step5.钩子函数&#xff0c;没有执行任何操作onShutdown(); // hook for ScheduledThreadPoolExecutor} finally {//step6.释放锁mainLock.unlock();}//step7.将线程池状态设置为TERMINATEDtryTerminate();
}

将shutdown()方法总结如下
step1.获取独占锁
step2.如果有安全管理器&#xff0c;使用安全管理器检查当前线程是否有权限关闭线程池&#xff0c;如果没有权限则抛出SecurityException。
step3.将线程池状态设为SHUTDOWN。一旦将线程池状态设为SHUTDOWN&#xff0c;就不能像线程池中添加新任务了。
step4.中断所有空闲线程
step5.钩子函数&#xff0c;没有执行任何操作
step6.释放锁
step7.将线程池状态设置为TERMINATED

shutdownNow()

/** 尝试停止所有的活动执行任务、暂停等待任务的处理&#xff0c;并返回等待执行的任务列表。* /
public List shutdownNow() {List tasks;final ReentrantLock mainLock &#61; this.mainLock;//step1.获取独占锁mainLock.lock();try {//step2.如果有安全管理器&#xff0c;使用安全管理器检查当前线程是否有权限关闭线程池checkShutdownAccess();//step3.尝试停止所有的活动执行任务advanceRunState(STOP);//step4.暂停等待任务的处理interruptWorkers();//step5.获取等待执行的任务列表tasks &#61; drainQueue();} finally {//step6.释放锁mainLock.unlock();}//step7.将线程池状态设置为TERMINATEDtryTerminate();//step8.返回等待执行的任务列表return tasks;
}

将shutdownNow()方法总结如下
step1.获取独占锁
step2.如果有安全管理器&#xff0c;使用安全管理器检查当前线程是否有权限关闭线程池&#xff0c;如果没有权限则抛出SecurityException。
step3.尝试停止所有的活动执行任务
step4.暂停等待任务的处理
step5.获取等待执行的任务列表
step6.释放锁
step7.将线程池状态设置为TERMINATED
step8.返回等待执行的任务列表

shutdown()和shutdownNow()的区别


  • 调用shutdown()后&#xff0c;线程池状态立刻变为SHUTDOWN&#xff0c;而调用shutdownNow()&#xff0c;线程池状态立刻变为STOP。
  • shutdown()通过中断空闲线程、不接受新任务的方式按过去执行已提交任务的顺序发起一个有序的关闭&#xff0c;shutdownNow()无差别地停止所有的活动执行任务&#xff0c;暂停等待任务的处理。也就是说&#xff0c;shutdown()等待任务执行完才中断线程&#xff0c;而shutdownNow()不等任务执行完就中断了线程。

本文就先讲到这里&#xff0c;想了解Java并发编程更多内容请参考&#xff1a;


  • Java并发编程札记-目录

推荐阅读
  • 本文详细介绍了优化DB2数据库性能的多种方法,涵盖统计信息更新、缓冲池调整、日志缓冲区配置、应用程序堆大小设置、排序堆参数调整、代理程序管理、锁机制优化、活动应用程序限制、页清除程序配置、I/O服务器数量设定以及编入组提交数调整等方面。通过这些技术手段,可以显著提升数据库的运行效率和响应速度。 ... [详细]
  • 深入解析Java枚举及其高级特性
    本文详细介绍了Java枚举的概念、语法、使用规则和应用场景,并探讨了其在实际编程中的高级应用。所有相关内容已收录于GitHub仓库[JavaLearningmanual](https://github.com/Ziphtracks/JavaLearningmanual),欢迎Star并持续关注。 ... [详细]
  • Java多线程实现:从1到100分段求和并汇总结果
    本文介绍如何使用Java编写一个程序,通过10个线程分别计算不同区间的和,并最终汇总所有线程的结果。每个线程负责计算一段连续的整数之和,最后将所有线程的结果相加。 ... [详细]
  • 深入理解Java多线程并发处理:基础与实践
    本文探讨了Java中的多线程并发处理机制,从基本概念到实际应用,帮助读者全面理解并掌握多线程编程技巧。通过实例解析和理论阐述,确保初学者也能轻松入门。 ... [详细]
  • 本文详细介绍如何使用 Python 集成微信支付的三种主要方式:Native 支付、APP 支付和 JSAPI 支付。每种方式适用于不同的应用场景,如 PC 网站、移动端应用和公众号内支付等。 ... [详细]
  • 第十一章 Python基本数据类型及内置方法
    一、概述数据类型是用来记录事物状态的,而事物的状态是不断变化的(如:一个人年龄的增长(操作int类型),单个人名的修改(操作str类型),学生列表中增加学生(操作list类型)等) ... [详细]
  • 本文介绍了如何使用JavaScript的Fetch API与Express服务器进行交互,涵盖了GET、POST、PUT和DELETE请求的实现,并展示了如何处理JSON响应。 ... [详细]
  • Java项目分层架构设计与实践
    本文探讨了Java项目中应用分层的最佳实践,不仅介绍了常见的三层架构(Controller、Service、DAO),还深入分析了各层的职责划分及优化建议。通过合理的分层设计,可以提高代码的可维护性、扩展性和团队协作效率。 ... [详细]
  • 本文探讨了在Java中如何正确地将多个不同的数组插入到ArrayList中,避免所有数组在插入后变得相同的问题。我们将分析代码中的问题,并提供解决方案。 ... [详细]
  • 深入理解Vue.js:从入门到精通
    本文详细介绍了Vue.js的基础知识、安装方法、核心概念及实战案例,帮助开发者全面掌握这一流行的前端框架。 ... [详细]
  • 本文深入探讨了面向切面编程(AOP)的概念及其在Spring框架中的应用。通过详细解释AOP的核心术语和实现机制,帮助读者理解如何利用AOP提高代码的可维护性和开发效率。 ... [详细]
  • Redux入门指南
    本文介绍Redux的基本概念和工作原理,帮助初学者理解如何使用Redux管理应用程序的状态。Redux是一个用于JavaScript应用的状态管理库,特别适用于React项目。 ... [详细]
  • LeetCode 690:计算员工的重要性评分
    在解决LeetCode第690题时,我记录了详细的解题思路和方法。该问题要求根据员工的ID计算其重要性评分,包括直接和间接下属的重要性。本文将深入探讨如何使用哈希表(Map)来高效地实现这一目标。 ... [详细]
  • 本文介绍如何从字符串中移除大写、小写、特殊、数字和非数字字符,并提供了多种编程语言的实现示例。 ... [详细]
  • 利用Python实现自动化群发邮件
    本文详细介绍如何使用Python语言来实现邮件的自动群发功能,适合希望提高工作效率的技术爱好者和开发者。 ... [详细]
author-avatar
刘海龙_高安大城
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有