热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Java并发编程札记(六)JUC线程池02ThreadPoolExecutor实现原理

本文通过学习ThreadPoolExecutor源码来学习线程池的实现原理。简介为什么要使用线程池许多服务器都面临着处理大量客户端远程请求的压力,如果每收到一个请

本文通过学习ThreadPoolExecutor源码来学习线程池的实现原理。


简介

为什么要使用线程池
许多服务器都面临着处理大量客户端远程请求的压力,如果每收到一个请求,就创建一个线程来处理,表面看是没有问题的,但实际上存在着很严重的缺陷。服务器应用程序中经常出现的情况是请求处理的任务很简单但客户端的数目却是庞大的,这种情况下如果还是每收到一个请求就创建一个线程来处理它,服务器在创建和销毁线程所花费的时间和资源可能比处理客户端请求处理的任务花费的时间和资源更多。为了缓解服务器压力,需要解决频繁创建和销毁线程的问题。线程池可以实现这个需求。

什么是线程池
线程池可以看做是许多线程的集合。在没有任务时线程处于空闲状态,当请求到来,线程池给这个请求分配一个空闲的线程,任务完成后回到线程池中等待下次任务。这样就实现了线程的重用。线程池会通过相应的调度策略和拒绝策略,对添加到线程池中的线程进行管理。

工作模型
MarkdownPhotos/master/CSDNBlogs/concurrency/0602/workModel.png

工作模型中一共有三种队列:正在执行的任务队列,等待被执行的阻塞队列,等待被commit进阻塞队列中的任务队列。

Java中的线程池
Java中常用的线程池有三个,最出名的当然是ThreadPoolExecutor,除此之外还有ScheduledThreadPoolExecutor、ForkJoinPool。本文主要学习ThreadPoolExecutor的实现原理。


创建ThreadPoolExecutor线程池

强烈推荐使用Executors工厂方法创建线程池,如Executors.newCachedThreadPool()(无界线程池,可以进行自动线程回收)、Executors.newFixedThreadPool(int)(固定大小线程池)和 Executors.newSingleThreadExecutor()(单个后台线程),它们均为大多数使用场景预定义了设置。本文主要学习如何手动配置,下面是ThreadPoolExecutor的一个构造方法。

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize <0 ||maximumPoolSize <&#61; 0 ||maximumPoolSize 0)throw new IllegalArgumentException();if (workQueue &#61;&#61; null || threadFactory &#61;&#61; null || handler &#61;&#61; null)throw new NullPointerException();this.corePoolSize &#61; corePoolSize;this.maximumPoolSize &#61; maximumPoolSize;this.workQueue &#61; workQueue;this.keepAliveTime &#61; unit.toNanos(keepAliveTime);this.threadFactory &#61; threadFactory;this.handler &#61; handler;
}

ThreadPoolExecutor一共有四个构造方法&#xff0c;其他三个构造方法都是通过上述的构造方法来实现的。毫无疑问手动配置线程池的关键就是学好构造方法中的几个参数如何设置。这几个参数对应着ThreadPoolExecutor中的几个成员属性。


属性

corePoolSize与maximumPoolSize分别是核心池大小与最大池大小。在源码中的声明为
private volatile int corePoolSize;private volatile int maximumPoolSize;

当新任务在方法 execute(java.lang.Runnable) 中提交时&#xff0c;如果运行的线程少于 corePoolSize&#xff0c;则创建新线程来处理请求&#xff0c;即使其他辅助线程是空闲的。如果运行的线程多于 corePoolSize 而少于 maximumPoolSize&#xff0c;则仅当队列满时才创建新线程。如果设置的 corePoolSize 和 maximumPoolSize 相同&#xff0c;则创建了固定大小的线程池。如果将 maximumPoolSize 设置为基本的无界值&#xff08;如 Integer.MAX_VALUE&#xff09;&#xff0c;则允许池适应任意数量的并发任务。在大多数情况下&#xff0c;核心和最大池大小仅基于构造来设置&#xff0c;不过也可以使用 setCorePoolSize(int) 和 setMaximumPoolSize(int) 进行动态更改。

workQueue是线程池工作模型中的阻塞队列&#xff0c;用于传输和保持提交的任务。在源码中的声明为private final BlockingQueue workQueue;

keepAliveTime是池中线程空闲时的活动时间。如果池中当前有多于 corePoolSize 的线程&#xff0c;则这些多出的线程在空闲时间超过 keepAliveTime 时将会终止&#xff08;参见 getKeepAliveTime(java.util.concurrent.TimeUnit)&#xff09;。这提供了当池处于非活动状态时减少资源消耗的方法。如果池后来变得更为活动&#xff0c;则可以创建新的线程。也可以使用方法 setKeepAliveTime(long, java.util.concurrent.TimeUnit) 动态地更改此参数。使用 Long.MAX_VALUE TimeUnit.NANOSECONDS 的值在关闭前有效地从以前的终止状态禁用空闲线程。默认情况下&#xff0c;保持活动策略只在有多于 corePoolSizeThreads 的线程时应用。但是只要 keepAliveTime 值非 0&#xff0c; allowCoreThreadTimeOut(boolean) 方法也可将此超时策略应用于核心线程。

threadFactory是一个线程集合。线程池可以使用ThreadFactory创建新线程。如果没有另外说明&#xff0c;则在同一个 ThreadGroup 中一律使用 Executors.defaultThreadFactory() 创建线程&#xff0c;并且这些线程具有相同的 NORM_PRIORITY 优先级和非守护进程状态。通过提供不同的 ThreadFactory&#xff0c;可以改变线程的名称、线程组、优先级、守护进程状态&#xff0c;等等。如果从 newThread 返回 null 时 ThreadFactory 未能创建线程&#xff0c;则执行程序将继续运行&#xff0c;但不能执行任何任务。

handler是线程池拒绝策略&#xff0c;RejectedExecutionHandler类型的对象。当 Executor 已经关闭&#xff0c;并且 Executor 将有限边界用于最大线程和工作队列容量&#xff0c;且已经饱和时&#xff0c;在方法 execute(java.lang.Runnable) 中提交的新任务将被拒绝。在以上两种情况下&#xff0c; execute 方法都将调用其 RejectedExecutionHandler 的 RejectedExecutionHandler.rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor) 方法。下面提供了四种预定义的处理程序策略&#xff1a;


  • ThreadPoolExecutor.AbortPolicy &#xff0c;默认策略&#xff0c;处理程序遭到拒绝将抛出运行时 RejectedExecutionException。
  • ThreadPoolExecutor.CallerRunsPolicy&#xff0c;线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制&#xff0c;能够减缓新任务的提交速度。
  • ThreadPoolExecutor.DiscardPolicy&#xff0c;不能执行的任务将被删除。
  • ThreadPoolExecutor.DiscardOldestPolicy&#xff0c;如果执行程序尚未关闭&#xff0c;则位于工作队列头部的任务将被删除&#xff0c;然后重试执行程序&#xff08;如果再次失败&#xff0c;则重复此过程&#xff09;。

除了上面的几个属性外&#xff0c;ThreadPoolExecutor还有下面的几个参数。

//ctl是一个AtomicInteger类型的原子对象。ctl记录了线程池中的任务数量和线程池状态。
private final AtomicInteger ctl &#61; new AtomicInteger(ctlOf(RUNNING, 0));
//线程池的锁
private final ReentrantLock mainLock &#61; new ReentrantLock();
//所有工作的线程
private final HashSet workers &#61; new HashSet();
//支持的等待condition
private final Condition termination &#61; mainLock.newCondition();
//线程池中线程数量曾经达到过的最大值
private int largestPoolSize;
//已完成任务数量
private long completedTaskCount;
//是否允许为核心线程设置存活时间
private volatile boolean allowCoreThreadTimeOut;

排队策略

排队有三种通用策略&#xff1a;
SynchronousQueue
它将任务直接传输给工作队列workers&#xff0c;而不保持任务。如果不存在空闲线程&#xff0c;则会新建一个线程来执行任务。比如&#xff0c;在Executors.newCachedThreadPool()方法中使用的就是此策略。

public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue());
}

LinkedBlockingQueue
有界队列&#xff0c;使用此队列会导致在所有corePoolSize线程都忙时新任务在队列中等待。这样&#xff0c;创建的线程就不会超过corePoolSize。比如&#xff0c;在Executors.newFixedThreadPool()和Executors.newSingleThreadExecutor()方法中使用的就是此策略。

public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue());
}
public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue()));
}

ArrayBlockingQueue
有界队列&#xff0c;没见到在哪里用到了这种策略。


线程池状态

源码已经告诉了我们线程池有几个状态。

private final AtomicInteger ctl &#61; new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS &#61; Integer.SIZE - 3;
private static final int CAPACITY &#61; (1 <1;// runState is stored in the high-order bits
private static final int RUNNING &#61; -1 <private static final int SHUTDOWN &#61; 0 <private static final int STOP &#61; 1 <private static final int TIDYING &#61; 2 <private static final int TERMINATED &#61; 3 <

可以看出&#xff0c;一共有RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED五种状态。ctl对象一共32位&#xff0c;高3位保存线程池状态信息&#xff0c;后29位保存线程池容量信息。线程池的初始化状态是RUNNING&#xff0c;在源码中体现为private final AtomicInteger ctl &#61; new AtomicInteger(ctlOf(RUNNING, 0));


状态高三位工作队列workers中的任务阻塞队列workQueue中的任务未添加的任务
RUNNING111继续处理继续处理添加
SHUTDOWN000继续处理继续处理不添加
STOP001尝试中断不处理不添加
TIDYING010处理完了如果由SHUTDOWN - TIDYING &#xff0c;那就是处理完了&#xff1b;如果由STOP - TIDYING &#xff0c;那就是不处理不添加
TERMINATED011同TIDYING同TIDYING同TIDYING

各个状态的转换图如下所示
MarkdownPhotos/master/CSDNBlogs/concurrency/0602/status.png


执行任务

execute(Runnable)

/** 在将来某个时间执行给定任务。* 可以在新线程中或者在现有池线程中执行该任务。* 如果无法将任务提交执行&#xff0c;或者因为此执行程序已关闭&#xff0c;或者因为已达到其容量&#xff0c;* 则该任务由当前 RejectedExecutionHandler 处理。*/
public void execute(Runnable command) {if (command &#61;&#61; null)throw new NullPointerException();int c &#61; ctl.get();//分三种情况处理//case1&#xff1a;如果线程池中运行的线程数量if (workerCountOf(c) //创建新线程来处理请求&#xff0c;即使其他辅助线程是空闲的if (addWorker(command, true))return;c &#61; ctl.get();}//case2&#xff1a;如果线程池中运行的线程数量>&#61;corePoolSize&#xff0c;且线程池处于RUNNING状态&#xff0c;且把提交的任务成功放入阻塞队列中if (isRunning(c) && workQueue.offer(command)) {int recheck &#61; ctl.get();//再次检查线程池的状态&#xff0c;如果线程池不是RUNNING状态&#xff0c;且成功从阻塞队列中删除任务if (! isRunning(recheck) && remove(command))//该任务由当前 RejectedExecutionHandler 处理reject(command);//如果线程池中运行的线程数量为0else if (workerCountOf(recheck) &#61;&#61; 0)//则通过addWorker(null, false)尝试新建一个线程&#xff0c;新建线程对应的任务为null。addWorker(null, false);}//case3&#xff1a;如果以上两种case不成立&#xff0c;即没能将任务成功放入阻塞队列中&#xff0c;且addWoker新建线程失败else if (!addWorker(command, false))//该任务由当前 RejectedExecutionHandler 处理reject(command);
}

看完后&#xff0c;我们知道execute()分三种情况处理任务

case1&#xff1a;如果线程池中运行的线程数量 case2&#xff1a;如果线程池中运行的线程数量>&#61;corePoolSize&#xff0c;且线程池处于RUNNING状态&#xff0c;且把提交的任务成功放入阻塞队列中&#xff0c;就再次检查线程池的状态&#xff0c;1.如果线程池不是RUNNING状态&#xff0c;且成功从阻塞队列中删除任务&#xff0c;则该任务由当前 RejectedExecutionHandler 处理。2.否则如果线程池中运行的线程数量为0&#xff0c;则通过addWorker(null, false)尝试新建一个线程&#xff0c;新建线程对应的任务为null。
case3&#xff1a;如果以上两种case不成立&#xff0c;即没能将任务成功放入阻塞队列中&#xff0c;且addWoker新建线程失败&#xff0c;则该任务由当前 RejectedExecutionHandler 处理。

submit

/*** 提交一个 Runnable 任务用于执行&#xff0c;并返回一个表示该任务的 Future。* 该 Future 的 get 方法在 成功 完成时将会返回 null。*/
public Future submit(Runnable task) {if (task &#61;&#61; null) throw new NullPointerException();RunnableFuture ftask &#61; newTaskFor(task, null);execute(ftask);return ftask;
}

可以看到此方法是通过调用execute(Runnable)实现的。


关闭线程池

ThreadPoolExecutor提供了shutdown()和shutdownNow()两个方法来关闭线程池。shutdown() 按过去执行已提交任务的顺序发起一个有序的关闭&#xff0c;但是不接受新任务。shutdownNow()尝试停止所有的活动执行任务、暂停等待任务的处理&#xff0c;并返回等待执行的任务列表。

shutdown()

/** 按过去执行已提交任务的顺序发起一个有序的关闭&#xff0c;不接受新任务。* 如果已经关闭&#xff0c;则调用没有其他作用。*/
public void shutdown() {final ReentrantLock mainLock &#61; this.mainLock;//step1.获取独占锁mainLock.lock();try {//step2.如果有安全管理器&#xff0c;使用安全管理器检查当前线程是否有权限关闭线程池checkShutdownAccess();//step3.将线程池状态设为SHUTDOWNadvanceRunState(SHUTDOWN);//step4.中断所有空闲线程interruptIdleWorkers();//step5.钩子函数&#xff0c;没有执行任何操作onShutdown(); // hook for ScheduledThreadPoolExecutor} finally {//step6.释放锁mainLock.unlock();}//step7.将线程池状态设置为TERMINATEDtryTerminate();
}

将shutdown()方法总结如下
step1.获取独占锁
step2.如果有安全管理器&#xff0c;使用安全管理器检查当前线程是否有权限关闭线程池&#xff0c;如果没有权限则抛出SecurityException。
step3.将线程池状态设为SHUTDOWN。一旦将线程池状态设为SHUTDOWN&#xff0c;就不能像线程池中添加新任务了。
step4.中断所有空闲线程
step5.钩子函数&#xff0c;没有执行任何操作
step6.释放锁
step7.将线程池状态设置为TERMINATED

shutdownNow()

/** 尝试停止所有的活动执行任务、暂停等待任务的处理&#xff0c;并返回等待执行的任务列表。* /
public List shutdownNow() {List tasks;final ReentrantLock mainLock &#61; this.mainLock;//step1.获取独占锁mainLock.lock();try {//step2.如果有安全管理器&#xff0c;使用安全管理器检查当前线程是否有权限关闭线程池checkShutdownAccess();//step3.尝试停止所有的活动执行任务advanceRunState(STOP);//step4.暂停等待任务的处理interruptWorkers();//step5.获取等待执行的任务列表tasks &#61; drainQueue();} finally {//step6.释放锁mainLock.unlock();}//step7.将线程池状态设置为TERMINATEDtryTerminate();//step8.返回等待执行的任务列表return tasks;
}

将shutdownNow()方法总结如下
step1.获取独占锁
step2.如果有安全管理器&#xff0c;使用安全管理器检查当前线程是否有权限关闭线程池&#xff0c;如果没有权限则抛出SecurityException。
step3.尝试停止所有的活动执行任务
step4.暂停等待任务的处理
step5.获取等待执行的任务列表
step6.释放锁
step7.将线程池状态设置为TERMINATED
step8.返回等待执行的任务列表

shutdown()和shutdownNow()的区别


  • 调用shutdown()后&#xff0c;线程池状态立刻变为SHUTDOWN&#xff0c;而调用shutdownNow()&#xff0c;线程池状态立刻变为STOP。
  • shutdown()通过中断空闲线程、不接受新任务的方式按过去执行已提交任务的顺序发起一个有序的关闭&#xff0c;shutdownNow()无差别地停止所有的活动执行任务&#xff0c;暂停等待任务的处理。也就是说&#xff0c;shutdown()等待任务执行完才中断线程&#xff0c;而shutdownNow()不等任务执行完就中断了线程。

本文就先讲到这里&#xff0c;想了解Java并发编程更多内容请参考&#xff1a;


  • Java并发编程札记-目录

推荐阅读
  • 本文详细介绍了Java中org.neo4j.helpers.collection.Iterators.single()方法的功能、使用场景及代码示例,帮助开发者更好地理解和应用该方法。 ... [详细]
  • Explore a common issue encountered when implementing an OAuth 1.0a API, specifically the inability to encode null objects and how to resolve it. ... [详细]
  • 本文介绍了Java并发库中的阻塞队列(BlockingQueue)及其典型应用场景。通过具体实例,展示了如何利用LinkedBlockingQueue实现线程间高效、安全的数据传递,并结合线程池和原子类优化性能。 ... [详细]
  • 本文详细介绍了Java编程语言中的核心概念和常见面试问题,包括集合类、数据结构、线程处理、Java虚拟机(JVM)、HTTP协议以及Git操作等方面的内容。通过深入分析每个主题,帮助读者更好地理解Java的关键特性和最佳实践。 ... [详细]
  • 2023年京东Android面试真题解析与经验分享
    本文由一位拥有6年Android开发经验的工程师撰写,详细解析了京东面试中常见的技术问题。涵盖引用传递、Handler机制、ListView优化、多线程控制及ANR处理等核心知识点。 ... [详细]
  • 本文深入探讨了 Java 中的 Serializable 接口,解释了其实现机制、用途及注意事项,帮助开发者更好地理解和使用序列化功能。 ... [详细]
  • 本文详细介绍了Akka中的BackoffSupervisor机制,探讨其在处理持久化失败和Actor重启时的应用。通过具体示例,展示了如何配置和使用BackoffSupervisor以实现更细粒度的异常处理。 ... [详细]
  • Android 渐变圆环加载控件实现
    本文介绍了如何在 Android 中创建一个自定义的渐变圆环加载控件,该控件已在多个知名应用中使用。我们将详细探讨其工作原理和实现方法。 ... [详细]
  • 深入解析JVM垃圾收集器
    本文基于《深入理解Java虚拟机:JVM高级特性与最佳实践》第二版,详细探讨了JVM中不同类型的垃圾收集器及其工作原理。通过介绍各种垃圾收集器的特性和应用场景,帮助读者更好地理解和优化JVM内存管理。 ... [详细]
  • Explore how Matterverse is redefining the metaverse experience, creating immersive and meaningful virtual environments that foster genuine connections and economic opportunities. ... [详细]
  • 本文介绍如何使用Objective-C结合dispatch库进行并发编程,以提高素数计数任务的效率。通过对比纯C代码与引入并发机制后的代码,展示dispatch库的强大功能。 ... [详细]
  • 本文详细介绍了 Dockerfile 的编写方法及其在网络配置中的应用,涵盖基础指令、镜像构建与发布流程,并深入探讨了 Docker 的默认网络、容器互联及自定义网络的实现。 ... [详细]
  • 本文详细介绍了Java中org.eclipse.ui.forms.widgets.ExpandableComposite类的addExpansionListener()方法,并提供了多个实际代码示例,帮助开发者更好地理解和使用该方法。这些示例来源于多个知名开源项目,具有很高的参考价值。 ... [详细]
  • 本文详细探讨了Netty中Future及其子类的设计与实现,包括其在并发编程中的作用和具体应用场景。我们将介绍Future的继承体系、关键方法的实现细节,并讨论如何通过监听器和回调机制来处理异步任务的结果。 ... [详细]
  • 本文介绍了如何通过 Maven 依赖引入 SQLiteJDBC 和 HikariCP 包,从而在 Java 应用中高效地连接和操作 SQLite 数据库。文章提供了详细的代码示例,并解释了每个步骤的实现细节。 ... [详细]
author-avatar
刘海龙_高安大城
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有