热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Fork/Join框架与Java8StreamAPI之并行流的速度比较

ForkJoin框架有特定的ExecutorService和线程池构成。ExecutorService可以运行任务,并且这个任务会被分解成较小的任务,它们从线程池中被fork(被不

  Fork/Join 框架有特定的ExecutorService和线程池构成。ExecutorService可以运行任务,并且这个任务会被分解成较小的任务,它们从线程池中被fork(被不同的线程执行)出来,在join(即它的所有的子任务都完成了)之前会一直等待。

  Fork/Join 使用了任务窃取来最小化线程的征用和开销。线程池中的每条工作线程都有自己的双端工作队列并且会将新任务放到这个队列中去。它从队列的头部读取任务。如果队列是空的,工作线程就尝试从另外一个队列的末尾获取一个任务。窃取操作不会很频繁,因为工作线程会采用后进先出的顺序将任务放入它们的队列中,同时工作项的规模会随着问题分割成子问题而变小。你一开始把任务交给一个中心的工作线程,之后它会继续将这个任务分解成更小的任务。最终所有的工作线程都只会设计很少量的同步操作。

  Stream介绍(引)

  Stream 作为 Java 8 的一大亮点,它与 java.io 包里的 InputStream 和 OutputStream 是完全不同的概念。它也不同于 StAX 对 XML 解析的 Stream,也不是 Amazon Kinesis 对大数据实时处理的 Stream。Java 8 中的 Stream 是对集合(Collection)对象功能的增强,它专注于对集合对象进行各种非常便利、高效的聚合操作(aggregate operation),或者大批量数据操作 (bulk data operation)。Stream API 借助于同样新出现的 Lambda 表达式,极大的提高编程效率和程序可读性。同时它提供串行和并行两种模式进行汇聚操作,并发模式能够充分利用多核处理器的优势,使用 fork/join 并行方式来拆分任务和加速处理过程。通常编写并行代码很难而且容易出错, 但使用 Stream API 无需编写一行多线程的代码,就可以很方便地写出高性能的并发程序。所以说,Java 8 中首次出现的 java.util.stream 是一个函数式语言+多核时代综合影响的产物。

  Stream 不是集合元素,它不是数据结构并不保存数据,它是有关算法和计算的,它更像一个高级版本的 Iterator。原始版本的 Iterator,用户只能显式地一个一个遍历元素并对其执行某些操作;高级版本的 Stream,用户只要给出需要对其包含的元素执行什么操作,比如 “过滤掉长度大于 10 的字符串”、“获取每个字符串的首字母”等,Stream 会隐式地在内部进行遍历,做出相应的数据转换。

  Stream 就如同一个迭代器(Iterator),单向,不可往复,数据只能遍历一次,遍历过一次后即用尽了,就好比流水从面前流过,一去不复返。

  而和迭代器又不同的是,Stream 可以并行化操作,迭代器只能命令式地、串行化操作。顾名思义,当使用串行方式去遍历时,每个 item 读完后再读下一个 item。而使用并行去遍历时,数据会被分成多个段,其中每一个都在不同的线程中处理,然后将结果一起输出。Stream 的并行操作依赖于 Java7 中引入的 Fork/Join 框架(JSR166y)来拆分任务和加速处理过程。

 

  所以说,实际上Stream并行流实际上就是一个帮你fork/join 后的API,为了验证效率,我编写了一个对1000_000个数进行排序的程序

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

public class ParallelMergeSort {

    public static void main(String[] args) {
        final int SIZE = 10000000;
        int[] list1 = new int[SIZE];
        int[] list2 = new int[SIZE];
        Integer[] list3 = new Integer[SIZE];
        
        for (int i = 0; i ) {
            list1[i] = list2[i] = (int)(Math.random() * 10000000);
            list3[i] = list1[i];
        }
        
        long startTime = System.currentTimeMillis();
        parallelMergeSort(list1);
        long endTime = System.currentTimeMillis();
        System.out.println("Parallel time with " + Runtime.getRuntime().availableProcessors() + " processors is " + (endTime - startTime) + " milliseconds");
        
        startTime = System.currentTimeMillis();
        MergeSort.mergeSort(list2);
        endTime = System.currentTimeMillis();
        System.out.println("Sequent time is " + (endTime - startTime) + " milliseconds");
        
        
        List tmp = new ArrayList();
        Collections.addAll(tmp, list3);
        startTime = System.currentTimeMillis();
        IntStream tmp1 = tmp.stream().parallel().mapToInt(Integer::intValue).sorted();
        endTime = System.currentTimeMillis();
        System.out.println("ParallelStream time is " + (endTime - startTime) + " milliseconds");
        
        tmp1.limit(100).forEachOrdered(System.out::println);
    
        /*
        for(int i = 0; i <100; i++) {
            System.out.println(tmp2.get(i));
        }*/
    }


    public static void parallelMergeSort(int[] list) {
        RecursiveAction mainTask = new SortTask(list);
        ForkJoinPool pool = new ForkJoinPool();
        pool.invoke(mainTask);
    }
    
    public static class SortTask extends RecursiveAction{
        /**
         * 
         */
        private static final long serialVersiOnUID= 1L;
        private final int THRESHOLD = 500;
        private int[] list;
        
        SortTask(int[] list){
            this.list = list;
        }
        @Override
        protected void compute() {
            if (list.length < THRESHOLD)
                java.util.Arrays.sort(list);
            else {
                //Obtain the first half
                int[] firstHalf = new int[list.length / 2];
                System.arraycopy(list, 0, firstHalf, 0, list.length / 2);
                
                //Obtain the second half
                int secOndHalfLength= list.length - list.length / 2;
                int[] secOndHalf= new int[secondHalfLength];
                System.arraycopy(list, list.length /2, secondHalf, 0, secondHalfLength);
                
                //Recursively sort the two halves
                invokeAll(new SortTask(firstHalf), new SortTask(secondHalf));
                
                //Merge firstHalf with second
                MergeSort.merge(firstHalf, secondHalf, list);
            }
        }
    }
    
    public static class MergeSort {
          /** The method for sorting the numbers */
          public static void mergeSort(int[] list) {
            if (list.length > 1) {
              // Merge sort the first half
              int[] firstHalf = new int[list.length / 2];
              System.arraycopy(list, 0, firstHalf, 0, list.length / 2);
              mergeSort(firstHalf);

              // Merge sort the second half
              int secOndHalfLength= list.length - list.length / 2;
              int[] secOndHalf= new int[secondHalfLength];
              System.arraycopy(list, list.length / 2,
                secondHalf, 0, secondHalfLength);
              mergeSort(secondHalf);

              // Merge firstHalf with secondHalf into list
              merge(firstHalf, secondHalf, list);
            }
          }

          /** Merge two sorted lists */
          public static void merge(int[] list1, int[] list2, int[] temp) {
            int current1 = 0; // Current index in list1
            int current2 = 0; // Current index in list2
            int current3 = 0; // Current index in temp

            while (current1  list2.length) {
              if (list1[current1] < list2[current2])
                temp[current3++] = list1[current1++];
              else
                temp[current3++] = list2[current2++];
            }

            while (current1 < list1.length)
              temp[current3++] = list1[current1++];

            while (current2 < list2.length)
              temp[current3++] = list2[current2++];
          }
    }
}

代码可以看到,利用三种方法,对随机生成的 int 数据排序

第一种是自己编写的fork/join利用二分法排序

第二种是单线程下的二分法排序

第三种是并行流的排序

为了验证并行流是否排序正确,输出流前100个数

结果如图:

但是这是为没有收集器的情况,并行流很快的完成并且得到IntStream,加上收集器后:

可以看出,排序很快完成,在最后的类型转换上花费了大量的时间,

而根据Stream 的介绍,实验fork/join方法完成的时间应该不会与并行流差距太大,实际上,实验中编写的代码在fork分解阶段和join阶段花费了大量时间,远不如直接使用API快速

但是如果正确使用fork/join框架的话也不会很慢

但是相比单线程已经远远提升了效率

 


推荐阅读
  • 本文介绍了C#中生成随机数的三种方法,并分析了其中存在的问题。首先介绍了使用Random类生成随机数的默认方法,但在高并发情况下可能会出现重复的情况。接着通过循环生成了一系列随机数,进一步突显了这个问题。文章指出,随机数生成在任何编程语言中都是必备的功能,但Random类生成的随机数并不可靠。最后,提出了需要寻找其他可靠的随机数生成方法的建议。 ... [详细]
  • 计算机存储系统的层次结构及其优势
    本文介绍了计算机存储系统的层次结构,包括高速缓存、主存储器和辅助存储器三个层次。通过分层存储数据可以提高程序的执行效率。计算机存储系统的层次结构将各种不同存储容量、存取速度和价格的存储器有机组合成整体,形成可寻址存储空间比主存储器空间大得多的存储整体。由于辅助存储器容量大、价格低,使得整体存储系统的平均价格降低。同时,高速缓存的存取速度可以和CPU的工作速度相匹配,进一步提高程序执行效率。 ... [详细]
  • 本文介绍了闭包的定义和运转机制,重点解释了闭包如何能够接触外部函数的作用域中的变量。通过词法作用域的查找规则,闭包可以访问外部函数的作用域。同时还提到了闭包的作用和影响。 ... [详细]
  • 本文介绍了使用Java实现大数乘法的分治算法,包括输入数据的处理、普通大数乘法的结果和Karatsuba大数乘法的结果。通过改变long类型可以适应不同范围的大数乘法计算。 ... [详细]
  • Java序列化对象传给PHP的方法及原理解析
    本文介绍了Java序列化对象传给PHP的方法及原理,包括Java对象传递的方式、序列化的方式、PHP中的序列化用法介绍、Java是否能反序列化PHP的数据、Java序列化的原理以及解决Java序列化中的问题。同时还解释了序列化的概念和作用,以及代码执行序列化所需要的权限。最后指出,序列化会将对象实例的所有字段都进行序列化,使得数据能够被表示为实例的序列化数据,但只有能够解释该格式的代码才能够确定数据的内容。 ... [详细]
  • Voicewo在线语音识别转换jQuery插件的特点和示例
    本文介绍了一款名为Voicewo的在线语音识别转换jQuery插件,该插件具有快速、架构、风格、扩展和兼容等特点,适合在互联网应用中使用。同时还提供了一个快速示例供开发人员参考。 ... [详细]
  • flowable工作流 流程变量_信也科技工作流平台的技术实践
    1背景随着公司业务发展及内部业务流程诉求的增长,目前信息化系统不能够很好满足期望,主要体现如下:目前OA流程引擎无法满足企业特定业务流程需求,且移动端体 ... [详细]
  • 在Xamarin XAML语言中如何在页面级别构建ControlTemplate控件模板
    本文介绍了在Xamarin XAML语言中如何在页面级别构建ControlTemplate控件模板的方法和步骤,包括将ResourceDictionary添加到页面中以及在ResourceDictionary中实现模板的构建。通过本文的阅读,读者可以了解到在Xamarin XAML语言中构建控件模板的具体操作步骤和语法形式。 ... [详细]
  • 网络请求模块选择——axios框架的基本使用和封装
    本文介绍了选择网络请求模块axios的原因,以及axios框架的基本使用和封装方法。包括发送并发请求的演示,全局配置的设置,创建axios实例的方法,拦截器的使用,以及如何封装和请求响应劫持等内容。 ... [详细]
  • HDFS2.x新特性
    一、集群间数据拷贝scp实现两个远程主机之间的文件复制scp-rhello.txtroothadoop103:useratguiguhello.txt推pushscp-rr ... [详细]
  • Android系统移植与调试之如何修改Android设备状态条上音量加减键在横竖屏切换的时候的显示于隐藏
    本文介绍了如何修改Android设备状态条上音量加减键在横竖屏切换时的显示与隐藏。通过修改系统文件system_bar.xml实现了该功能,并分享了解决思路和经验。 ... [详细]
  • MyBatis多表查询与动态SQL使用
    本文介绍了MyBatis多表查询与动态SQL的使用方法,包括一对一查询和一对多查询。同时还介绍了动态SQL的使用,包括if标签、trim标签、where标签、set标签和foreach标签的用法。文章还提供了相关的配置信息和示例代码。 ... [详细]
  • r2dbc配置多数据源
    R2dbc配置多数据源问题根据官网配置r2dbc连接mysql多数据源所遇到的问题pom配置可以参考官网,不过我这样配置会报错我并没有这样配置将以下内容添加到pom.xml文件d ... [详细]
  • 本文讨论了如何在codeigniter中识别来自angularjs的请求,并提供了两种方法的代码示例。作者尝试了$this->input->is_ajax_request()和自定义函数is_ajax(),但都没有成功。最后,作者展示了一个ajax请求的示例代码。 ... [详细]
  • 这篇文章主要介绍了Python拼接字符串的七种方式,包括使用%、format()、join()、f-string等方法。每种方法都有其特点和限制,通过本文的介绍可以帮助读者更好地理解和运用字符串拼接的技巧。 ... [详细]
author-avatar
阿离说你是宝贝
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有