热门标签 | HotTags
当前位置:  开发笔记 > 前端 > 正文

浅谈java.util.concurrent包中的线程池和消息队列

这篇文章主要介绍了浅谈java.util.concurrent包中的线程池和消息队列,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

1.java并发包介绍

JDK5.0(JDK1.5更名后)以后的版本引入高级并发特性,大多数的特性在java.util.concurrent包中,是专门用于多线程编程的,充分利用了现代多处理器和多核心系统的功能以编写大规模并发应用程序。主要包括原子量、并发集合、同步器、可重入锁,并对线程池的构造提供了强力的支持

2.线程池

java.util.concurrent.Executors提供了一个 java.util.concurrent.Executor接口的实现用于创建线程池
多线程技术主要解决处理器单元内多个线程执行的问题,它可以显著减少处理器单元的闲置时间,增加处理器单元的吞吐能力。

假设服务器完成一项任务所需时间为:T1 创建线程时间,T2 在线程中执行任务的时间,T3 销毁线程时间。如果T1 + T3 远大于 T2,则可以采用线程池,以提高服务器性能,减少创建和销毁线程所需消耗的时间。

一个线程池由以下四个基本部分组成:

  1. 线程池管理器(ThreadPool):用于创建并管理线程池,包括 创建线程池,销毁线程池,添加新任务;
  2. 工作线程(PoolWorker):线程池中线程,在没有任务时处于等待状态,可以循环的执行任务;
  3. 任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行,它主要规定了任务的入口,任务执行完后的收尾工作,任务的执行状态等;
  4. 任务队列(taskQueue):用于存放没有处理的任务。提供一种缓冲机制。

线程池技术正是关心如何缩短或调整T1,T3时间从而提高服务器程序性能的技术。它把T1,T3分别安排在服务器程序的启动和结束的时间段或者一些空闲的时间段,这样在服务器程序处理客户请求时,免去了线程创建和销毁的开销。

线程池不仅调整T1,T3产生的时间段,而且它还显著减少了创建线程的数目,看一个例子:

假设一个服务器一天要处理100000个请求,并且每个请求需要一个单独的线程完成。在线程池中,线程数一般是固定的,
一般线程池大小是远小于100000。所以利用线程池的服务器程序不会为了创建100000而在处理请求时浪费时间,从而提高效率。

线程池的五种创建方式

  1. Single Thread Executor:只有一个线程的线程池,因此所提交的任务是顺序执行,Executors.newSingleThreadExecutor();
  2. Cached Thread Pool:线程池里有很多线程需同时进行,旧的可用线程将被新的任务触发从而重新执行,如果线程超过60秒内没有执行,那么将被终止并从池中删除Executors.newCachedThreadPool();
  3. Fixed Thread Pool:拥有固定线程数的线程池,如果没有任务执行,那么线程会一直等待,Executors.newFixedThreadPool(10);在构造函数中的参数10是线程池的大小,你可以随意设置,也可以和cpu的数量保持一致,获取cpu的数量int cpuNums = Runtime.getRuntime().availableProcessors();
  4. Scheduled Thread Pool:用来调度即将执行的任务的线程池Executors.newScheduledThreadPool();
  5. Sing Thread Scheduled Pool:只有一个线程,用来调度任务在指定时间执行Executors.newSingleThreadScheduledExecutor();

3.线程池的使用

以下用Fixed Thread Pool作为示范,提供一个使用参考

LogNumVo

package com.ithzk.threadpool;

/**
 * 用作返回 执行的数量的
 * @author hzk
 * @date 2018/3/29
 */
public class LogNumVo {
  private static final long serialVersiOnUID= -5541722936350755569L;
  private Integer dataNum;
  private Integer successNum;
  private Integer waitNum;

  public Integer getDataNum() {
    return dataNum;
  }
  public void setDataNum(Integer dataNum) {
    this.dataNum = dataNum;
  }
  public Integer getSuccessNum() {
    return successNum;
  }
  public void setSuccessNum(Integer successNum) {
    this.successNum = successNum;
  }
  public Integer getWaitNum() {
    return waitNum;
  }
  public void setWaitNum(Integer waitNum) {
    this.waitNum = waitNum;
  }
}
 

DealObject

package com.ithzk.threadpool;

/**
 * @author hzk
 * @date 2018/3/29
 */
public class DealObject {

  private Integer identifyId;

  private String data;

  public DealObject(Integer identifyId, String data) {
    this.identifyId = identifyId;
    this.data = data;
  }

  public DealObject() {
  }

  public Integer getIdentifyId() {
    return identifyId;
  }

  public void setIdentifyId(Integer identifyId) {
    this.identifyId = identifyId;
  }

  public String getData() {
    return data;
  }

  public void setData(String data) {
    this.data = data;
  }
}

AbstractCalculateThread

package com.ithzk.threadpool;

import java.util.Collection;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;

/**
 * @author hzk
 * @date 2018/3/29
 */
public class AbstractCalculateThread implements Callable {

  protected Collection insertList;

  protected CountDownLatch countd;

  protected String threadCode;

  protected String batchNumber;

  public Collection getInsertList() {
    return insertList;
  }

  public void setInsertList(Collection insertList) {
    this.insertList = insertList;
  }

  public CountDownLatch getCountd() {
    return countd;
  }

  public void setCountd(CountDownLatch countd) {
    this.countd = countd;
  }

  public String getThreadCode() {
    return threadCode;
  }

  public void setThreadCode(String threadCode) {
    this.threadCode = threadCode;
  }

  public String getBatchNumber() {
    return batchNumber;
  }

  public void setBatchNumber(String batchNumber) {
    this.batchNumber = batchNumber;
  }


  public AbstractCalculateThread() {
    super();
  }

  public AbstractCalculateThread(Collection insertList, CountDownLatch countd, String threadCode,String batchNumber) {
    super();
    this.insertList = insertList;
    this.countd = countd;
    this.threadCode = threadCode;
    this.batchNumber = batchNumber;
  }

  public String call() throws Exception {
    return null;
  }
}

CalculateDealThread

package com.ithzk.threadpool;

import java.util.Collection;
import java.util.concurrent.CountDownLatch;

/**
 * @author hzk
 * @date 2018/3/29
 */
public class CalculateDealThread extends AbstractCalculateThread {

  private ExecutorPool executorPool = SpringContextUtil.getBean(ExecutorPool.class);

  @Override
  public String call() throws Exception {
    try {
      System.out.println("========开始跑线程【"+threadCode+"】");
      return executorPool.syncBatchDealObject(insertList,batchNumber);
    } catch (Exception e) {
      e.printStackTrace();
      System.out.println("========开始跑线程【"+threadCode+"】:"+e.getMessage());
    }finally {
      countd.countDown();
    }
    return null;
  }

  public CalculateDealThread() {
    super();
  }

  public CalculateDealThread(Collection insertList, CountDownLatch countd, String threadCode,String batchNumber) {
    super(insertList, countd, threadCode, batchNumber);
  }

}

ExecutorPool

package com.ithzk.threadpool;

import java.util.*;
import java.util.concurrent.*;

/**
 * @author hzk
 * @date 2018/3/29
 */
public class ExecutorPool {

  /**
   * 模拟需要处理数据的大小
   */
  private static final int ARRAY_COUNT = 50000;
  /**
   * 开启多线程处理的条件
   */
  private static final int MULTI_THREAD_STARTCOUNT = 10000;
  /**
   * 批量处理的大小
   */
  private static final int BATCH_DEAL_SIZE = 100;
  /**
   * 每次开启线程数量
   */
  public static final int THREAD_POOL_NUM=10;

  public static void main(String[] args){
    testExecutorPool();
  }

  public static void testExecutorPool(){
    ArrayList dealObjects = new ArrayList();
    for (int i = 0;i MULTI_THREAD_STARTCOUNT) {
      try {
        System.out.println("===================dataNum > 1000 | Multiple Thread Run=======================");
        // 每次新增处理的条数
        int batchInsertSize = BATCH_DEAL_SIZE;
        // 定义保存的线程池
        ExecutorService executorInsert = Executors.newFixedThreadPool(THREAD_POOL_NUM);
        // 定义保存过程中返回的线程执行返回参数
        List> futureListIsert = new ArrayList>();
        // 线程 修改list
        List> listDealObjects = new ArrayList>();
        List> listLiveSyncLogInsert = pointDateClassify(dealObjects, batchInsertSize, listDealObjects);
        if (null != listLiveSyncLogInsert && !listDealObjects.isEmpty()) {
          System.out.println("===================切割后的大小:"+listLiveSyncLogInsert.size()+"=======================");
          //配合使用CountDownLatch为了保证在执行完所有子程序之后再执行主程序
          CountDownLatch countd = new CountDownLatch(listLiveSyncLogInsert.size());
          for (int j = 0; j  insert = listLiveSyncLogInsert.get(j);
            Future future = executorInsert.submit(new CalculateDealThread(insert.values(), countd,"executor_pool_test_thread", null));
            futureListIsert.add(future);
          }
        }
        // 等待线程执行完成
        executorInsert.shutdown();
        for (Future future : futureListIsert) {
          String json = future.get();
          if (null != json && !"".equals(json)) {
            将返回的json格式数据转换为实体类 进行业务记录
            LogNumVo logNumVo = JSON.toJavaObject(JSON.parseObject(json),LogNumVo.class);
            successNum += logNumVo.getSuccessNum();
            waitNum += logNumVo.getWaitNum();
          }
        }
      } catch (InterruptedException e) {
        e.printStackTrace();
      } catch (ExecutionException e) {
        e.printStackTrace();
      }

    }
  }

  /**
   * 拆分线程数
   * 假设集合中有50000个元素 则按照100个一组切分 可切分为500组
   * 即每个线程一次处理一组(100个元素)
   *
   * @author
   * @param lPostUploadIntegralList
   * @param batchInsertSize
   * @param listPostUploadIsert
   */
  @SuppressWarnings("all")
  public static List> pointDateClassify(List lPostUploadIntegralList,int batchInsertSize, List> listJSONObjectUpdate) {
    List> listLiveSyncLogInsert = new Vector>();
    // 新增数据list
    List integralListInsert = lPostUploadIntegralList;

    System.out.println("============integralListInsert.size()=====:" + integralListInsert.size());
    // 拆分数据(拆成多个List)
    int inserti = 0;
    if (integralListInsert != null && integralListInsert.size() > 0) {
      ConcurrentHashMap integralListIns = null;
      for (int l = 0; l ();
        }
        integralListIns.put(integralListInsert.get(l).getIdentifyId(), integralListInsert.get(l));
        inserti++;
        if ((inserti % batchInsertSize) == 0) {
          listLiveSyncLogInsert.add(integralListIns);
          integralListIns = null;
        } else {
          // 最后100条或不足100条数据
          if ((l + 1) == integralListInsert.size()) {
            listLiveSyncLogInsert.add(integralListIns);
          }
        }
      }
    }
    System.out.println("=============listPostUploadInsert.size()====:" + listLiveSyncLogInsert.size());
    return listLiveSyncLogInsert;
  }

  /**
   * 多线程保存数据至数据库
   */
  public String syncBatchDealObject(Collection insertList,String batchNumber) {
    int successNum = 0, waitNum = 0;
    Date currentDate = new Date(System.currentTimeMillis());
    for (DealObject dealObject : insertList) {
      try {
        int icount = syncDealObject(dealObject,currentDate);
        if(icount > 0){
          successNum ++;
        }else {
          waitNum ++;
        }
      } catch (Exception e) {
        e.printStackTrace();
        ++waitNum;
      }
    }
    LogNumVo logNum = new LogNumVo();
    logNum.setDataNum(0);
    logNum.setSuccessNum(successNum);
    logNum.setWaitNum(waitNum);
    // 将记录实体类转为json格式反馈给线程池
    return JSON.toJSONString(logNum);
  }

  /**
   * 处理数据业务
   * @param dealObject
   * @param currentDate
   * @return
   */
  private int syncDealObject(DealObject dealObject,Date currentDate){
    int successNum = 0;
    //业务处理逻辑
    if(null != dealObject.getData()){
      successNum++;
    }
    return successNum;
  }
}

4.BlockingQueue

BlockingQueue也是java.util.concurrent下的主要用来控制线程同步的工具。主要的方法是:put、take一对阻塞存取;add、poll一对非阻塞存取。

插入:

add(anObject)
把anObject加到BlockingQueue里,如果BlockingQueue可以容纳,则返回true,否则抛出异常

offer(anObject)
把anObject加到BlockingQueue里,如果BlockingQueue可以容纳,则返回true,否则返回false.

put(anObject)
把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻塞直到BlockingQueue里面有空间再继续.

读取:

poll(time)
取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null

take()
取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到Blocking有新的对象被加入为止

其他:

int remainingCapacity()
返回理想情况下(没有内存和资源约束)此队列可接受并且不会被阻塞的附加元素数量。
该数量总是等于此队列的初始容量,小于队列的当前 size(返回队列剩余的容量)。
注意,不能总是通过检查 remainingcapacity 来断定试图插入一个元素是否成功,因为可能是另一个线程将插入或移除某个元

素。
boolean remove(Object o)
从队列移除元素,如果存在,即移除一个或者更多,队列改变了返回true

public boolean contains(Object o)
查看队列是否存在这个元素,存在返回true

int drainTo(Collection<&#63; super E> c)
传入的集合中的元素,如果在队列中存在,那么将队列中的元素移动到集合中

int drainTo(Collection<&#63; super E> c, int maxElements)
和上面方法的区别在于,制定了移动的数量

以下是一个BlockQueue的基本使用参考:

Producer

package com.ithzk.BlockingQueueTest;

import java.util.concurrent.BlockingQueue;

/**
 * @author hzk
 * @date 2018/3/31
 */
public class Producer implements Runnable{

  BlockingQueue blockingQueue;

  public Producer(BlockingQueue blockingQueue) {
    this.blockingQueue = blockingQueue;
  }

  @Override
  public void run() {
    try {
      String threadIdentify = "A Producer,生产线程"+Thread.currentThread().getName();
      blockingQueue.put(threadIdentify);
      System.out.println("Produce success! Thread:"+Thread.currentThread().getName());
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

}

Consumer

package com.ithzk.BlockingQueueTest;

import java.util.concurrent.BlockingQueue;

/**
 * @author hzk
 * @date 2018/3/31
 */
public class Consumer implements Runnable{

  BlockingQueue blockingQueue;

  public Consumer(BlockingQueue blockingQueue) {
    this.blockingQueue = blockingQueue;
  }

  @Override
  public void run() {
    try {
      String cOnsumer= Thread.currentThread().getName();
      System.out.println("Current Consumer Thread:"+consumer);
      //如果队列为空会阻塞当前线程
      String take = blockingQueue.take();
      System.out.println(consumer + " consumer get a product:"+take);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
}

BlockTest

package com.ithzk.BlockingQueueTest;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * @author hzk
 * @date 2018/3/31
 */
public class BlockTest {
  
  public static void main(String[] args) throws InterruptedException {
    // 不设置的话,LinkedBlockingQueue默认大小为Integer.MAX_VALUE
    // BlockingQueue blockingQueue = new LinkedBlockingQueue();
    // BlockingQueue blockingQueue = new ArrayBlockingQueue(2);
    BlockingQueue blockingQueue = new LinkedBlockingQueue(2);
    Consumer cOnsumer= new Consumer(blockingQueue);
    Producer producer = new Producer(blockingQueue);
    for (int i = 0; i <3; i++) {
      new Thread(producer, "Producer" + (i + 1)).start();
    }
    for (int i = 0; i <5; i++) {
      new Thread(consumer, "Consumer" + (i + 1)).start();
    }

    Thread.sleep(5000);

    new Thread(producer, "Producer" + (5)).start();

  }
}

BlockingQueue有四个具体的实现类,常用的两种实现类为:

  1. ArrayBlockingQueue:一个由数组支持的有界阻塞队列,规定大小的BlockingQueue,其构造函数必须带一个int参数来指明其大小.其所含的对象是以FIFO(先入先出)顺序排序的。
  2. LinkedBlockingQueue:大小不定的BlockingQueue,若其构造函数带一个规定大小的参数,生成的BlockingQueue有大小限制。

若不带大小参数,所生成的BlockingQueue的大小由Integer.MAX_VALUE来决定.其所含的对象是以FIFO(先入先出)顺序排序的。
LinkedBlockingQueue 可以指定容量,也可以不指定,不指定的话,默认最大是Integer.MAX_VALUE,其中主要用到put和take方法,put方法在队列满的时候会阻塞直到有队列成员被消费,take方法在队列空的时候会阻塞,直到有队列成员被放进来。

LinkedBlockingQueue和ArrayBlockingQueue区别

LinkedBlockingQueue和ArrayBlockingQueue比较起来,它们背后所用的数据结构不一样,导致LinkedBlockingQueue的数据吞吐量要大于ArrayBlockingQueue,但在线程数量很大时其性能的可预见性低于ArrayBlockingQueue.

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。


推荐阅读
  • 本文探讨了 RESTful API 和传统接口之间的关键差异,解释了为什么 RESTful API 在设计和实现上具有独特的优势。 ... [详细]
  • 1:有如下一段程序:packagea.b.c;publicclassTest{privatestaticinti0;publicintgetNext(){return ... [详细]
  • 本文探讨了在 PHP 中处理 JSON 编码时中文字符显示为 Unicode 转义序列的问题,并提供了多种有效的解决方法,包括使用正则表达式替换、URL 编码以及利用 PHP 5.4 及以上版本提供的 JSON_UNESCAPED_UNICODE 选项。 ... [详细]
  • 本文介绍如何使用阿里云的fastjson库解析包含时间戳、IP地址和参数等信息的JSON格式文本,并进行数据处理和保存。 ... [详细]
  • 本文详细介绍了中央电视台电影频道的节目预告,并通过专业工具分析了其加载方式,确保用户能够获取最准确的电视节目信息。 ... [详细]
  • Composer Registry Manager:PHP的源切换管理工具
    本文介绍了一个用于Composer的源切换管理工具——Composer Registry Manager。该项目旨在简化Composer包源的管理和切换,避免与常见的CRM系统混淆,并提供了详细的安装和使用指南。 ... [详细]
  • 本文详细介绍了Git分布式版本控制系统中远程仓库的概念和操作方法。通过具体案例,帮助读者更好地理解和掌握如何高效管理代码库。 ... [详细]
  • 最近团队在部署DLP,作为一个技术人员对于黑盒看不到的地方还是充满了好奇心。多次咨询乙方人员DLP的算法原理是什么,他们都以商业秘密为由避而不谈,不得已只能自己查资料学习,于是有了下面的浅见。身为甲方,虽然不需要开发DLP产品,但是也有必要弄明白DLP基本的原理。俗话说工欲善其事必先利其器,只有在懂这个工具的原理之后才能更加灵活地使用这个工具,即使出现意外情况也能快速排错,越接近底层,越接近真相。根据DLP的实际用途,本文将DLP检测分为2部分,泄露关键字检测和近似重复文档检测。 ... [详细]
  • 本文介绍了如何利用npm脚本和concurrently工具,实现本地开发环境中多个监听服务的同时启动,包括HTTP服务、自动刷新、Sass和ES6支持。 ... [详细]
  • 本文探讨了在通过 API 端点调用时,使用猫鼬(Mongoose)的 findOne 方法总是返回 null 的问题,并提供了详细的解决方案和建议。 ... [详细]
  • 本文详细介绍如何在VSCode中配置自定义代码片段,使其具备与IDEA相似的代码生成快捷键功能。通过具体的Java和HTML代码片段示例,展示配置步骤及效果。 ... [详细]
  • 在网页开发中,页面加载速度是一个关键的用户体验因素。为了提升加载效率,避免在PageLoad事件中进行大量数据绑定操作,可以采用异步加载和特定控件来优化页面加载过程。 ... [详细]
  • 探讨在循环中调用$.post()时,回调函数为何会在循环结束后才开始执行,并提供解决方案和优化建议。 ... [详细]
  • 深入解析JMeter中的JSON提取器及其应用
    本文详细介绍了如何在JMeter中使用JSON提取器来获取和处理API响应中的数据。特别是在需要将一个接口返回的数据作为下一个接口的输入时,JSON提取器是一个非常有用的工具。 ... [详细]
  • 本文探讨了在 Vue 2.0 项目中使用 Axios 获取数据时可能出现的错误,并提供详细的解决方案和最佳实践。 ... [详细]
author-avatar
手机用户2502861455
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有