热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

如何彻底搞懂jdk8线程池

这篇文章将为大家详细讲解有关如何彻底搞懂jdk8线程池,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有

这篇文章将为大家详细讲解有关如何彻底搞懂jdk8线程池,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。

顶层设计,定义执行接口

Interface Executor(){
    void execute(Runnable command);

}

ExecutorService,定义控制接口

interface ExecutorService extends Executor{
    
}

如何彻底搞懂jdk8线程池

抽象实现ExecutorService中的大部分方法

abstract class AbstractExecutorService implements ExecutorService{    //此处把ExecutorService中的提交方法都实现了
}

如何彻底搞懂jdk8线程池

我们看下提交中的核心

 public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) { // ① 
            //核心线程数没有满就继续添加核心线程
            if (addWorker(command, true)) // ②
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) { // ③
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))// ④
                reject(command); //⑦
            else if (workerCountOf(recheck) == 0) // ⑤
                //如果worker为0,则添加一个非核心worker,所以线程池里至少有一个线程
                addWorker(null, false);// ⑥
        }
        //队列满了以后,添加非核心线程
        else if (!addWorker(command, false))// ⑧
            reject(command);//⑦
    }

如何彻底搞懂jdk8线程池

这里就会有几道常见的面试题

1,什么时候用核心线程,什么时候启用非核心线程?

添加任务时优先使用核心线程,核心线程满了以后,任务放入队列中。只要队列不填满,就一直使用核心线程执行任务(代码①②)。

当队列满了以后开始使用增加非核心线程来执行队列中的任务(代码⑧)。

2,0个核心线程,2个非核心线程,队列100,添加99个任务是否会执行?

会执行,添加队列成功后,如果worker的数量为0,会添加非核心线程执行任务(见代码⑤⑥)

3,队列满了会怎么样?

队列满了,会优先启用非核心线程执行任务,如果非核心线程也满了,那就执行拒绝策略。

4,submit 和execute的区别是?

submit将执行任务包装成了RunnableFuture,最终返回了Future,executor 方法执行无返回值。

addworker实现

ThreadPoolExecutor extends AbstractExecutorService{
    //保存所有的执行线程(worker)
    HashSet workers = new HashSet();
    //存放待执行的任务,这块具体由指定的队列实现

    BlockingQueue workQueue;
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler){
    }
    //添加执行worker
    private boolean addWorker(Runnable firstTask, boolean core) {
        //这里每次都会基础校验和cas校验,防止并发无法创建线程,
        retry:
        for(;;){
            for(;;){
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
            }
        }
        try{
            //创建一个worker
            w = new Worker(firstTask);
            final Thread t = w.thread;
            try{
                //加锁校验,添加到workers集合中
                workers.add(w);
            }
            //添加成功,将对应的线程启动,执行任务
            t.start();
        }finally{
             //失败执行进行释放资源
            addWorkerFailed(Worker w) 
        }
  
       
    }
    //Worker 是对任务和线程的封装
    private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
        //线程启动后会循环执行任务
        public void run() {
            runWorker(this);
        }

    }

    //循环执行
    final void runWorker(Worker w) {
        try{
            while (task != null || (task = getTask()) != null) {
                //执行前的可扩展点
                beforeExecute(wt, task);
                try{
                     //执行任务
                    task.run();
                }finally{
                    //执行后的可扩展点,这块也把异常给吃了
                    afterExecute(task, thrown);
                }
            }
            //这里会对执行的任务进行统计
        }finally{
             //异常或者是循环退出都会走这里
             processWorkerExit(w, completedAbruptly);
        }
    }
    //获取执行任务,此处决定runWorker的状态
    private Runnable getTask() {
        //worker的淘汰策略:允许超时或者工作线程>核心线程
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        //满足淘汰策略且...,就返回null,交由processWorkerExit去处理线程
        if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
        // 满足淘汰策略,就等一定的时间poll(),不满足,就一直等待take()
        Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();
    }
    //处理任务退出(循环获取不到任务的时候)
    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        //异常退出的,不能调整线程数的
        if (completedAbruptly) // If abrupt, then workerCount wasn&#39;t adjusted
            decrementWorkerCount();
        
        //不管成功或失败,都执行以下逻辑
        //1,计数,2,减去一个线程

        completedTaskCount += w.completedTasks;
        //将线程移除,并不关心是否非核心
        workers.remove(w);
        //如果是还是运行状态

        if (!completedAbruptly) {
            //正常终止的,处理逻辑
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            //核心线程为0 ,最小值也是1
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            //总线程数大于min就不再添加
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }        //异常退出一定还会添加worker,正常退出一般不会再添加线程,除非核心线程数为0
        addWorker(null, false);
    }
    
}

如何彻底搞懂jdk8线程池

这里涉及到几个点:

1,任务异常以后虽然有throw异常,但是外面有好几个finally代码;

2,在finally中,进行了任务的统计以及worker移除;

3,如果还有等待处理的任务,最少添加一个worker(不管核心线程数是否为0)

这里会引申出来几个面试题:

1, 线程池中核心线程数如何设置?

cpu密集型:一般为核心线程数+1,尽可能减少cpu的并行;

IO密集型:可以设置核心线程数稍微多些,将IO等待期间的空闲cpu充分利用起来。

2,线程池使用队列的意义?

a)线程的资源是有限的,且线程的创建成本比较高;

b)  要保证cpu资源的合理利用(不能直接给cpu提一堆任务,cpu处理不过来,大家都慢了)

c) 利用了削峰填谷的思想(保证任务执行的可用性);

d) 队列过大也会把内存撑爆。

3,为什么要用阻塞队列?而不是非阻塞队列?

a) 利用阻塞的特性,在没有任务时阻塞一定的时间,防止资源被释放(getTask和processWorkExit);

b) 阻塞队列在阻塞时,CPU状态是wait,等有任务时,会被唤醒,不会占用太多的资源;

线程池有两个地方:

1,在execute方法中(提交任务时),只要工作线程为0,就至少添加一个Worker;

2,在processWorkerExit中(正常或异常结束时),只要有待处理的任务,就会增加Worker

所以正常情况下线程池一定会保证所有任务的执行。

我们在看下ThreadPoolExecutor中以下几个方法

public boolean prestartCoreThread() {
        return workerCountOf(ctl.get()) < corePoolSize &&
            addWorker(null, true);
    }
    void ensurePrestart() {
        int wc = workerCountOf(ctl.get());
        if (wc < corePoolSize)
            addWorker(null, true);
        else if (wc == 0)
            addWorker(null, false);
    }

    public int prestartAllCoreThreads() {
        int n = 0;
        while (addWorker(null, true))
            ++n;
        return n;
    }

确保了核心线程数必须是满的,这些方法特别是在批处理的时候,或者动态调整核心线程数的大小时很有用。

我们再看下Executors中常见的创建线程池的方法:

一、newFixedThreadPool 与newSingleThreadExecutor

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue());
    }
     public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue()));
    }
    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

特点:

1,核心线程数和最大线程数大小一样(唯一不同的是,一个是1,一个是自定义);

2,队列用的是LinkedBlockingQueue(长度是Integer.Max_VALUE)

当任务的生产速度大于消费速度后,很容易将系统内存撑爆。

二、 newCachedThreadPool 和

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue());
    }

特点:最大线程数为Integer.MAX_VALUE

当任务提交过多时,线程创建过多容易导致无法创建

三、 newWorkStealingPool

public static ExecutorService newWorkStealingPool(int parallelism) {
        return new ForkJoinPool
            (parallelism,
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

这个主要是并行度,默认为cpu的核数。

四、newScheduledThreadPool

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

封装起来的要么最大线程数不可控,要么队列长度不可控,所以阿里规约里也不建议使用Executors方法创建线程池。

ps:

生产上使用线程池,最好是将关键任务和非关键任务分开设立线程池,非关键业务影响关键业务的执行。

关于如何彻底搞懂jdk8线程池就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。


推荐阅读
  • 如何利用Java 5 Executor框架高效构建和管理线程池
    Java 5 引入了 Executor 框架,为开发人员提供了一种高效管理和构建线程池的方法。该框架通过将任务提交与任务执行分离,简化了多线程编程的复杂性。利用 Executor 框架,开发人员可以更灵活地控制线程的创建、分配和管理,从而提高服务器端应用的性能和响应能力。此外,该框架还提供了多种线程池实现,如固定线程池、缓存线程池和单线程池,以适应不同的应用场景和需求。 ... [详细]
  • 本文探讨了如何将个人经历,特别是非传统的职业路径,转化为职业生涯中的优势。通过作者的亲身经历,展示了舞蹈生涯对商业思维的影响。 ... [详细]
  • 本文详细介绍了 `org.apache.tinkerpop.gremlin.structure.VertexProperty` 类中的 `key()` 方法,并提供了多个实际应用的代码示例。通过这些示例,读者可以更好地理解该方法在图数据库操作中的具体用途。 ... [详细]
  • 问题场景用Java进行web开发过程当中,当遇到很多很多个字段的实体时,最苦恼的莫过于编辑字段的查看和修改界面,发现2个页面存在很多重复信息,能不能写一遍?有没有轮子用都不如自己造。解决方式笔者根据自 ... [详细]
  • 本文详细介绍了 Java 中 org.w3c.dom.Node 类的 isEqualNode() 方法的功能、参数及返回值,并通过多个实际代码示例来展示其具体应用。此方法用于检测两个节点是否相等,而不仅仅是判断它们是否为同一个对象。 ... [详细]
  • 根据官方定义,RxJava是一种用于异步编程和可观察数据流的API。其核心特性在于流式处理能力和丰富的操作符支持。 ... [详细]
  • Java高并发与多线程(二):线程的实现方式详解
    本文将深入探讨Java中线程的三种主要实现方式,包括继承Thread类、实现Runnable接口和实现Callable接口,并分析它们之间的异同及其应用场景。 ... [详细]
  • 本文是Java并发编程系列的开篇之作,将详细解析Java 1.5及以上版本中提供的并发工具。文章假设读者已经具备同步和易失性关键字的基本知识,重点介绍信号量机制的内部工作原理及其在实际开发中的应用。 ... [详细]
  • POJ 2482 星空中的星星:利用线段树与扫描线算法解决
    在《POJ 2482 星空中的星星》问题中,通过运用线段树和扫描线算法,可以高效地解决星星在窗口内的计数问题。该方法不仅能够快速处理大规模数据,还能确保时间复杂度的最优性,适用于各种复杂的星空模拟场景。 ... [详细]
  • 长期从事ABAP开发工作的专业人士,在面对行业新趋势时,往往需要重新审视自己的发展方向。本文探讨了几位资深专家对ABAP未来走向的看法,以及开发者应如何调整技能以适应新的技术环境。 ... [详细]
  • 本文详细介绍了JQuery Mobile框架中特有的事件和方法,帮助开发者更好地理解和应用这些特性,提升移动Web开发的效率。 ... [详细]
  • 深入理解Java多线程与并发机制
    本文探讨了Java多线程和并发机制的核心概念,包括多线程类的分类、执行器框架、并发容器及控制工具。通过详细解析这些组件,帮助开发者更好地理解和应用多线程技术。 ... [详细]
  • 面试题总结_2019年全网最热门的123个Java并发面试题总结
    面试题总结_2019年全网最热门的123个Java并发面试题总结 ... [详细]
  • Java 并发编程:RunnableScheduledFuture 接口详解
    本文深入解析了 Java 并发编程中 RunnableScheduledFuture 接口的源代码及其在标准线程池中的应用。 ... [详细]
  • IOS Run loop详解
    为什么80%的码农都做不了架构师?转自http:blog.csdn.netztp800201articledetails9240913感谢作者分享Objecti ... [详细]
author-avatar
冰妞qb_424
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有