热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

聊聊面试中的Java线程池

​背景关于Java的线程池我想大家肯定不会陌生,在工作中或者自己平时的学习中多多少少都会用到,那你真的有了解过底层的实现原理吗?还是说只停留在用的阶段呢?而且关于Java线程池也是

​背景

关于 Java 的线程池我想大家肯定不会陌生,在工作中或者自己平时的学习中多多少少都会用到,那你真的有了解过底层的实现原理吗?还是说只停留在用的阶段呢?而且关于 Java 线程池也是在面试中的一个高频的面试题,就像 HashMap 的实现原理一样,基本上面试必问,估计都已经被问烂大街了。


题外话:HashMap 的实现原理真的已经被问烂了,在我自身的多次面试中都不知道被问了几遍了,有的时候想想很奇怪,为什么这个被问的烂大街的问题还是会一直被问呢?但是从面试官的角度来想一下,如果一个被问的都烂大街的问题你都不好好准备对待,那怎么能好好的对待工作呢(个人愚见)。



常用的几种线程池

我们先来看下常用的几种线程池的创建方式,以及底层采用的实现原理

单个线程: Executors.newSingleThreadExecutor();

public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())); }

缓存线程: Executors.newCachedThreadPool();

public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue()); }

固定线程Executors.newFixedThreadPool(2);

public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); }

定时线程: Executors.newScheduledThreadPool(3);(父类中)

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }

核心 ThreadPoolExecutor

通过上面的几个线程池的底层实现,我们可以发现底层都是通过 ThreadPoolExecutor 类来实现的,只是参数不一样,那我们就很有必要来看一下ThreadPoolExecutor 这个类了

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize <0 || maximumPoolSize <= 0 || maximumPoolSize keepAliveTime <0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }

通过 JDK 的源码我们可以看到 ThreadPoolExecutor 在 Java 的 concurrent 包下面,并且有四个构造方法,下面依次介绍下各个参数的含义:

•corePoolSize: 核心线程数的大小•maximumPoolSize: 线程池中允许的最大线程数•keepAliveTime: 空闲线程允许的最大的存活时间•unit: 存活时间的单位•workQueue: 阻塞任务队列•threadFactory: 线程工厂用来创建线程•handler: 拒绝策略,针对当队列满了时新来任务的处理方式

通过上面参数的分析,我们可以知道,单个线程的线程池就是线程池中只有一个线程负责任务,所以 corePoolSize 和 maximumPoolSize 的数值都是为 1;当这个线程出现任何异常后,线程池会自动创建一个线程,始终保持线程池中有且只有一个存活的线程。而且其他线程池也只是参数的设置不一样而已。 我们还需要知道几个常见的线程池类和接口的关系,以及一些方法,如下图


ThreadPoolExecutor 继承 AbstractExecutorServiceAbstractExecutorService 实现 ExecutorService, ExecutorService 继承 Executor



源码分析

根据源码可以发现整个线程池大致分为 3 个部分,1. 是创建 worker 线程,2. 添加任务到 workQueue; 3.worker 线程执行具体任务

创建 worker 线程,现在我们来看下核心的 execute(Runnable command) 方法,如果工作线程小于指定的核心线程数时会尝试去创建新的线程,

public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); //如果工作线程比核心线程数少,则创建新线程 if (workerCountOf(c) if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command);}

再看下addWorker(Runnable firstTask, boolean core) 方法

private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted;}

添加任务到 workQueue,这个阻塞队列内部的方法

public boolean offer(E e) { if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count; if (count.get() == capacity) return false; int c = -1; Node node = new Node(e); final ReentrantLock putLock = this.putLock; putLock.lock(); try { if (count.get() enqueue(node); c = count.getAndIncrement(); if (c + 1 notFull.signal(); } } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return c >= 0;}

worker 线程执行具体任务,阻塞或者超时去获取队列中的任务,进行执行

final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { //阻塞循环获取任务 while (task != null || (task = getTask()) != null) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); }}private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } }}

在刚刚创建线程池的时候,内部线程的数量是 0,当首个任务进行添加的时候,会根据参数的配置进行线程的创建,并随着任务数的增加,会逐渐创建新的线程直到不能创建新的线程为止。不能创建新的线程后,会将来的任务存放到阻塞队列中,让空闲的线程去处理。当没有空闲线程并且队列满了时候就会采用拒绝策略去丢弃或者其他策略来处理。 拒绝策略主要有四种,不同的拒绝策略有不同的使用场景,需要根据情况决定使用。

CallerRunsPolicy : 调用线程处理任务•AbortPolicy : 抛出异常•DiscardPolicy : 直接丢弃•DiscardOldestPolicy : 丢弃队列中最老的任务,执行新任务


小结

线程池在工作中的使用必不可少,如何优雅的使用线程池能很大程度的提升性能和效率。根据实际的应用场景,配置合适的线程池参数可以很大的提升项目的性能,也可以充分利用服务器的性能。

 



 



 

Java 极客技术公众号,是由一群热爱 Java 开发的技术人组建成立,专注分享原创、高质量的 Java 文章。如果您觉得我们的文章还不错,请帮忙赞赏、在看、转发支持,鼓励我们分享出更好的文章。

关注公众号,大家可以在公众号后台回复“博客园”,免费获得作者 Java 知识体系/面试必看资料。


 



推荐阅读
  • 并发编程 12—— 任务取消与关闭 之 shutdownNow 的局限性
    Java并发编程实践目录并发编程01——ThreadLocal并发编程02——ConcurrentHashMap并发编程03——阻塞队列和生产者-消费者模式并发编程04——闭锁Co ... [详细]
  • 本文提供了使用Java实现Bellman-Ford算法解决POJ 3259问题的代码示例,详细解释了如何通过该算法检测负权环来判断时间旅行的可能性。 ... [详细]
  • 实体映射最强工具类:MapStruct真香 ... [详细]
  • 在 Flutter 开发过程中,开发者经常会遇到 Widget 构造函数中的可选参数 Key。对于初学者来说,理解 Key 的作用和使用场景可能是一个挑战。本文将详细探讨 Key 的概念及其应用场景,并通过实例帮助你更好地掌握这一重要工具。 ... [详细]
  • ElasticSearch 集群监控与优化
    本文详细介绍了如何有效地监控 ElasticSearch 集群,涵盖了关键性能指标、集群健康状况、统计信息以及内存和垃圾回收的监控方法。 ... [详细]
  • 深入理解Java泛型:JDK 5的新特性
    本文详细介绍了Java泛型的概念及其在JDK 5中的应用,通过具体代码示例解释了泛型的引入、作用和优势。同时,探讨了泛型类、泛型方法和泛型接口的实现,并深入讲解了通配符的使用。 ... [详细]
  • 本文介绍如何在Java项目中使用Log4j库进行日志记录。我们将详细说明Log4j库的引入、配置及简单应用,帮助开发者快速上手。 ... [详细]
  • 深入探讨CPU虚拟化与KVM内存管理
    本文详细介绍了现代服务器架构中的CPU虚拟化技术,包括SMP、NUMA和MPP三种多处理器结构,并深入探讨了KVM的内存虚拟化机制。通过对比不同架构的特点和应用场景,帮助读者理解如何选择最适合的架构以优化性能。 ... [详细]
  • 在多线程编程环境中,线程之间共享全局变量可能导致数据竞争和不一致性。为了解决这一问题,Linux提供了线程局部存储(TLS),使每个线程可以拥有独立的变量副本,确保线程间的数据隔离与安全。 ... [详细]
  • 本文介绍了几种不同的编程方法来计算从1到n的自然数之和,包括循环、递归、面向对象以及模板元编程等技术。每种方法都有其特点和适用场景。 ... [详细]
  • 本文探讨了在Java多线程环境下,如何确保具有相同key值的线程能够互斥执行并按顺序输出结果。通过优化代码结构和使用线程安全的数据结构,我们解决了线程同步问题,并实现了预期的并发行为。 ... [详细]
  • Startup 类配置服务和应用的请求管道。Startup类ASP.NETCore应用使用 Startup 类,按照约定命名为 Startup。 Startup 类:可选择性地包括 ... [详细]
  • Kubernetes 持久化存储与数据卷详解
    本文深入探讨 Kubernetes 中持久化存储的使用场景、PV/PVC/StorageClass 的基本操作及其实现原理,旨在帮助读者理解如何高效管理容器化应用的数据持久化需求。 ... [详细]
  • 本文详细介绍了优化DB2数据库性能的多种方法,涵盖统计信息更新、缓冲池调整、日志缓冲区配置、应用程序堆大小设置、排序堆参数调整、代理程序管理、锁机制优化、活动应用程序限制、页清除程序配置、I/O服务器数量设定以及编入组提交数调整等方面。通过这些技术手段,可以显著提升数据库的运行效率和响应速度。 ... [详细]
  • 深入解析Java虚拟机(JVM)架构与原理
    本文旨在为读者提供对Java虚拟机(JVM)的全面理解,涵盖其主要组成部分、工作原理及其在不同平台上的实现。通过详细探讨JVM的结构和内部机制,帮助开发者更好地掌握Java编程的核心技术。 ... [详细]
author-avatar
醣荳_448
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有