热门标签 | HotTags
当前位置:  开发笔记 > 开发工具 > 正文

java中的forkjoin框架的使用

这篇文章主要介绍了java中的forkjoin框架的使用,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

fork join框架是java 7中引入框架,这个框架的引入主要是为了提升并行计算的能力。

fork join主要有两个步骤,第一就是fork,将一个大任务分成很多个小任务,第二就是join,将第一个任务的结果join起来,生成最后的结果。如果第一步中并没有任何返回值,join将会等到所有的小任务都结束。

还记得之前的文章我们讲到了thread pool的基本结构吗?

  • ExecutorService - ForkJoinPool 用来调用任务执行。
  • workerThread - ForkJoinWorkerThread 工作线程,用来执行具体的任务。
  • task - ForkJoinTask 用来定义要执行的任务。

下面我们从这三个方面来详细讲解fork join框架。

ForkJoinPool

ForkJoinPool是一个ExecutorService的一个实现,它提供了对工作线程和线程池的一些便利管理方法。

public class ForkJoinPool extends AbstractExecutorService

一个work thread一次只能处理一个任务,但是ForkJoinPool并不会为每个任务都创建一个单独的线程,它会使用一个特殊的数据结构double-ended queue来存储任务。这样的结构可以方便的进行工作窃取(work-stealing)。

什么是work-stealing呢?

默认情况下,work thread从分配给自己的那个队列头中取出任务。如果这个队列是空的,那么这个work thread会从其他的任务队列尾部取出任务来执行,或者从全局队列中取出。这样的设计可以充分利用work thread的性能,提升并发能力。

下面看下怎么创建一个ForkJoinPool。

最常见的方法就是使用ForkJoinPool.commonPool()来创建,commonPool()为所有的ForkJoinTask提供了一个公共默认的线程池。

ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();

另外一种方式是使用构造函数:

ForkJoinPool forkJoinPool = new ForkJoinPool(2);

这里的参数是并行级别,2指的是线程池将会使用2个处理器核心。

ForkJoinWorkerThread

ForkJoinWorkerThread是使用在ForkJoinPool的工作线程。

public class ForkJoinWorkerThread extends Thread
}

和一般的线程不一样的是它定义了两个变量:

 final ForkJoinPool pool;    // the pool this thread works in
 final ForkJoinPool.WorkQueue workQueue; // work-stealing mechanics

一个是该worker thread所属的ForkJoinPool。 另外一个是支持 work-stealing机制的Queue。

再看一下它的run方法:

 public void run() {
  if (workQueue.array == null) { // only run once
   Throwable exception = null;
   try {
    onStart();
    pool.runWorker(workQueue);
   } catch (Throwable ex) {
    exception = ex;
   } finally {
    try {
     onTermination(exception);
    } catch (Throwable ex) {
     if (exception == null)
      exception = ex;
    } finally {
     pool.deregisterWorker(this, exception);
    }
   }
  }
 }

简单点讲就是从Queue中取出任务执行。

ForkJoinTask

ForkJoinTask是ForkJoinPool中运行的任务类型。通常我们会用到它的两个子类:RecursiveActionRecursiveTask

他们都定义了一个需要实现的compute()方法用来实现具体的业务逻辑。不同的是RecursiveAction只是用来执行任务,而RecursiveTask可以有返回值。

既然两个类都带了Recursive,那么具体的实现逻辑也会跟递归有关,我们举个使用RecursiveAction来打印字符串的例子:

public class CustomRecursiveAction extends RecursiveAction {

 private String workload = "";
 private static final int THRESHOLD = 4;

 private static Logger logger =
   Logger.getAnonymousLogger();

 public CustomRecursiveAction(String workload) {
  this.workload = workload;
 }

 @Override
 protected void compute() {
  if (workload.length() > THRESHOLD) {
   ForkJoinTask.invokeAll(createSubtasks());
  } else {
   processing(workload);
  }
 }

 private List createSubtasks() {
  List subtasks = new ArrayList<>();

  String partOne= workload.substring(0, workload.length() / 2);
  String partTwo = workload.substring(workload.length() / 2, workload.length());

  subtasks.add(new CustomRecursiveAction(partOne));
  subtasks.add(new CustomRecursiveAction(partTwo));

  return subtasks;
 }

 private void processing(String work) {
  String result = work.toUpperCase();
  logger.info("This result - (" + result + ") - was processed by "
    + Thread.currentThread().getName());
 }
}

上面的例子使用了二分法来打印字符串。

我们再看一个RecursiveTask的例子:

public class CustomRecursiveTask extends RecursiveTask {
 private int[] arr;

 private static final int THRESHOLD = 20;

 public CustomRecursiveTask(int[] arr) {
  this.arr = arr;
 }

 @Override
 protected Integer compute() {
  if (arr.length > THRESHOLD) {
   return ForkJoinTask.invokeAll(createSubtasks())
     .stream()
     .mapToInt(ForkJoinTask::join)
     .sum();
  } else {
   return processing(arr);
  }
 }

 private Collection createSubtasks() {
  List dividedTasks = new ArrayList<>();
  dividedTasks.add(new CustomRecursiveTask(
    Arrays.copyOfRange(arr, 0, arr.length / 2)));
  dividedTasks.add(new CustomRecursiveTask(
    Arrays.copyOfRange(arr, arr.length / 2, arr.length)));
  return dividedTasks;
 }

 private Integer processing(int[] arr) {
  return Arrays.stream(arr)
    .filter(a -> a > 10 && a <27)
    .map(a -> a * 10)
    .sum();
 }
}

和上面的例子很像,不过这里我们需要有返回值。

在ForkJoinPool中提交Task

有了上面的两个任务,我们就可以在ForkJoinPool中提交了:

int[] intArray= {12,12,13,14,15};
  CustomRecursiveTask customRecursiveTask= new CustomRecursiveTask(intArray);

  int result = forkJoinPool.invoke(customRecursiveTask);
  System.out.println(result);

上面的例子中,我们使用invoke来提交,invoke将会等待任务的执行结果。

如果不使用invoke,我们也可以将其替换成fork()和join():

customRecursiveTask.fork();
  int result2= customRecursiveTask.join();
  System.out.println(result2);

fork() 是将任务提交给pool,但是并不触发执行, join()将会真正的执行并且得到返回结果。

本文的例子可以参考https://github.com/ddean2009/learn-java-concurrency/tree/master/forkjoin

到此这篇关于java中的fork join框架的使用的文章就介绍到这了,更多相关java fork join框架内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!


推荐阅读
  • 探索PWA H5 Web App优化之路(Service Worker与Lighthouse的应用)
    本文探讨了如何通过Service Worker和Lighthouse工具来优化PWA H5 Web App,旨在提升用户体验,包括提高加载速度、增强离线访问能力等方面。 ... [详细]
  • Flutter 高德地图插件使用指南
    本文档详细介绍了如何在Flutter项目中集成和使用高德地图插件,包括安装、配置及基本使用方法。 ... [详细]
  • Iris 开发环境配置指南 (最新 Go & IntelliJ IDEA & Iris V12)
    本指南详细介绍了如何在最新的 Go 语言环境及 IntelliJ IDEA 中配置 Iris V12 框架,适合初学者和有经验的开发者。文章提供了详细的步骤说明和示例代码,帮助读者快速搭建开发环境。 ... [详细]
  • WorldWind源代码解析:瓦片调度机制详解
    本文深入探讨了WorldWind项目中的关键组件——瓦片调度策略。通过源代码分析,我们将了解摄像头移动时如何动态调整瓦片的加载与卸载,确保地图渲染的高效与流畅。 ... [详细]
  • scrapyredis分布式爬虫 ... [详细]
  • 深入探讨配置文件的管理与优化
    尽管配置文件的重要性不言而喻,但其管理和安全性问题却常被忽视。本文将详细讨论配置文件的不同管理策略及其优缺点。 ... [详细]
  • 本文档详细介绍了在Vue2项目中集成Vant v2组件库的方法,包括安装步骤、版本选择以及如何参考官方文档进行开发。同时提供了其他常用Vue组件库的链接,供开发者对比选择。 ... [详细]
  • 使用H5在前端生成Excel文件的方法
    本文介绍了一种利用HTML5和JavaScript库在浏览器端直接生成并下载Excel文件的技术方案。通过引入alasql.js和xlsx.core.min.js两个库,可以轻松实现数据导出功能。 ... [详细]
  • 十大排序算法JavaScript实现总结
    十大排序算法JavaScript实现总结,Go语言社区,Golang程序员人脉社 ... [详细]
  • Linux 5.3内核正式发布并标记为稳定
    知名Linux内核开发者Greg Kroah-Hartman宣布,最新的Linux 5.3内核已正式标记为稳定版本,适用于大规模部署。该版本带来了多项新特性和改进,包括对最新硬件的支持和性能优化。 ... [详细]
  • 在上一期文章中,我们探讨了FastDev4Android项目中PullToRefreshListView组件的使用方法。本期将继续探讨该框架中的另一个重要组件——ACache数据缓存器,详细介绍其工作原理及如何在项目中有效利用。 ... [详细]
  • 本系列课程全面覆盖Java基础知识,适合初学者系统学习。通过与《JavaGuider-Java篇》结合使用,可有效查漏补缺。后续将深入探讨框架和项目实践。 ... [详细]
  • 本文详细介绍如何在Android模拟器上安装TaintDroid的过程,包括解决源代码链接失效及服务器文件变动等问题,旨在帮助后续用户避免不必要的麻烦。 ... [详细]
  • 中文分词是自然语言处理中的一个重要环节,由于中文没有明确的词边界标识,因此分词的准确性直接影响了后续处理的效果。常见的中文分词工具包括IK、jieba和THULAC等。本文将详细介绍如何在Elasticsearch中安装和使用IK分词器。 ... [详细]
  • Java类加载详解(类的生命周期)
    https:www.cnblogs.comjhxxbp10900405.html类从被加载到虚拟机内存开始,到卸载出内存为止。解析阶段在某些情况下可以在初始化后再 ... [详细]
author-avatar
ghost
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有