热门标签 | HotTags
当前位置:  开发笔记 > 运维 > 正文

java8中Stream的使用以及分割list案例

这篇文章主要介绍了java8中Stream的使用以及分割list案例,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧

一、Steam的优势

java8中Stream配合Lambda表达式极大提高了编程效率,代码简洁易懂(可能刚接触的人会觉得晦涩难懂),不需要写传统的多线程代码就能写出高性能的并发程序

二、项目中遇到的问题

由于微信接口限制,每次导入code只能100个,所以需要分割list。但是由于code数量可能很大,这样执行效率就会很低。

1.首先想到是用多线程写传统并行程序,但是博主不是很熟练,写出代码可能会出现不可预料的结果,容易出错也难以维护。

2.然后就想到Steam中的parallel,能提高性能又能利用java8的特性,何乐而不为。

三、废话不多说,直接先贴代码,然后再解释(java8分割list代码在标题四)。

1.该方法是根据传入数量生成codes,private String getGeneratorCode(int tenantId)是我根据编码规则生成唯一code这个不需要管,我们要看的是Stream.iterate

2.iterate()第一个参数为起始值,第二个函数表达式(看自己想要生成什么样的流关键在这里),http://write.blog.csdn.net/postedit然后必须要通过limit方法来限制自己生成的Stream大小。parallel()是开启并行处理。map()就是一对一的把Stream中的元素映射成ouput Steam中的 元素。最后用collect收集,

2.1 构造流的方法还有Stream.of(),结合或者数组可直接list.stream();

String[] array = new String[]{"1","2","3"} ;

stream = Stream.of(array)或者Arrays.Stream(array);

2.2 数值流IntStream

int[] array = new int[]{1,2,3};

IntStream.of(array)或者IntStream.ranage(0,3)

3.以上构造流的方法都是已经知道大小,对于通过入参确定的应该图中方法自己生成流。

四、java8分割list,利用StreamApi实现。

没用java8前代码,做个鲜明对比():

1.list是我的编码集合(codes)。MAX_SEND为100(即每次100的大小去分割list),limit为按编码集合大小算出的本次需要分割多少次。

2.我们可以看到其实就是多了个skip跟limit方法。skip就是舍弃stream前多少个元素,那么limit就是返回流前面多少个元素(如果流里元素少于该值,则返回全部)。然后开启并行处理。通过循环我们的分割list的目标就达到了,每次取到的sendList就是100,100这样子的。

3.因为我这里业务就只需要到这里,如果我们分割之后需要收集之后再做处理,那只需要改写一下就ok;如:

List> splitList = Stream.iterate(0,n->n+1).limit(limit).parallel().map(a->{

 List sendList = list.stream().skip(a*MAX_SEND).limit(MAX_SEND).parallel().collect(Collectors.toList());

}).collect(Collectors.toList());

五、java8流里好像拿不到下标,所以我才用到构造一个递增数列当下标用,这就是我用java8分割list的过程,比以前的for循环看的爽心悦目,优雅些,性能功也提高了。

如果各位有更好的实现方式,欢迎留言指教。

补充知识:聊聊flink DataStream的split操作

本文主要研究一下flink DataStream的split操作

实例

SplitStream split = someDataStream.split(new OutputSelector() {
  @Override
  public Iterable select(Integer value) {
    List output = new ArrayList();
    if (value % 2 == 0) {
      output.add("even");
    }
    else {
      output.add("odd");
    }
    return output;
  }
});

本实例将dataStream split为两个dataStream,一个outputName为even,另一个outputName为odd

DataStream.split

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.java

@Public
public class DataStream {
 
 //......
 
 public SplitStream split(OutputSelector outputSelector) {
 return new SplitStream<>(this, clean(outputSelector));
 }
 
 //......
}

DataStream的split操作接收OutputSelector参数,然后创建并返回SplitStream

OutputSelector

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/collector/selector/OutputSelector.java

@PublicEvolving
public interface OutputSelector extends Serializable {
 
 Iterable select(OUT value);
 
}

OutputSelector定义了select方法用于给element打上outputNames

SplitStream

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/SplitStream.java

@PublicEvolving
public class SplitStream extends DataStream {
 
 protected SplitStream(DataStream dataStream, OutputSelector outputSelector) {
 super(dataStream.getExecutionEnvironment(), new SplitTransformation(dataStream.getTransformation(), outputSelector));
 }
 
 public DataStream select(String... outputNames) {
 return selectOutput(outputNames);
 }
 
 private DataStream selectOutput(String[] outputNames) {
 for (String outName : outputNames) {
  if (outName == null) {
  throw new RuntimeException("Selected names must not be null");
  }
 }
 
 SelectTransformation selectTransform = new SelectTransformation(this.getTransformation(), Lists.newArrayList(outputNames));
 return new DataStream(this.getExecutionEnvironment(), selectTransform);
 }
 
}

SplitStream继承了DataStream,它定义了select方法,可以用来根据outputNames选择split出来的dataStream;select方法创建了SelectTransformation

StreamGraphGenerator

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java

@Internal
public class StreamGraphGenerator {
 
 //......
 
 private Collection transform(StreamTransformation<&#63;> transform) {
 
 if (alreadyTransformed.containsKey(transform)) {
  return alreadyTransformed.get(transform);
 }
 
 LOG.debug("Transforming " + transform);
 
 if (transform.getMaxParallelism() <= 0) {
 
  // if the max parallelism hasn't been set, then first use the job wide max parallelism
  // from theExecutionConfig.
  int globalMaxParallelismFromCOnfig= env.getConfig().getMaxParallelism();
  if (globalMaxParallelismFromConfig > 0) {
  transform.setMaxParallelism(globalMaxParallelismFromConfig);
  }
 }
 
 // call at least once to trigger exceptions about MissingTypeInfo
 transform.getOutputType();
 
 Collection transformedIds;
 if (transform instanceof OneInputTransformation<&#63;, &#63;>) {
  transformedIds = transformOneInputTransform((OneInputTransformation<&#63;, &#63;>) transform);
 } else if (transform instanceof TwoInputTransformation<&#63;, &#63;, &#63;>) {
  transformedIds = transformTwoInputTransform((TwoInputTransformation<&#63;, &#63;, &#63;>) transform);
 } else if (transform instanceof SourceTransformation<&#63;>) {
  transformedIds = transformSource((SourceTransformation<&#63;>) transform);
 } else if (transform instanceof SinkTransformation<&#63;>) {
  transformedIds = transformSink((SinkTransformation<&#63;>) transform);
 } else if (transform instanceof UnionTransformation<&#63;>) {
  transformedIds = transformUnion((UnionTransformation<&#63;>) transform);
 } else if (transform instanceof SplitTransformation<&#63;>) {
  transformedIds = transformSplit((SplitTransformation<&#63;>) transform);
 } else if (transform instanceof SelectTransformation<&#63;>) {
  transformedIds = transformSelect((SelectTransformation<&#63;>) transform);
 } else if (transform instanceof FeedbackTransformation<&#63;>) {
  transformedIds = transformFeedback((FeedbackTransformation<&#63;>) transform);
 } else if (transform instanceof CoFeedbackTransformation<&#63;>) {
  transformedIds = transformCoFeedback((CoFeedbackTransformation<&#63;>) transform);
 } else if (transform instanceof PartitionTransformation<&#63;>) {
  transformedIds = transformPartition((PartitionTransformation<&#63;>) transform);
 } else if (transform instanceof SideOutputTransformation<&#63;>) {
  transformedIds = transformSideOutput((SideOutputTransformation<&#63;>) transform);
 } else {
  throw new IllegalStateException("Unknown transformation: " + transform);
 }
 
 // need this check because the iterate transformation adds itself before
 // transforming the feedback edges
 if (!alreadyTransformed.containsKey(transform)) {
  alreadyTransformed.put(transform, transformedIds);
 }
 
 if (transform.getBufferTimeout() >= 0) {
  streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());
 }
 if (transform.getUid() != null) {
  streamGraph.setTransformationUID(transform.getId(), transform.getUid());
 }
 if (transform.getUserProvidedNodeHash() != null) {
  streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash());
 }
 
 if (transform.getMinResources() != null && transform.getPreferredResources() != null) {
  streamGraph.setResources(transform.getId(), transform.getMinResources(), transform.getPreferredResources());
 }
 
 return transformedIds;
 }
 
 private  Collection transformSelect(SelectTransformation select) {
 StreamTransformation input = select.getInput();
 Collection resultIds = transform(input);
 
 // the recursive transform might have already transformed this
 if (alreadyTransformed.containsKey(select)) {
  return alreadyTransformed.get(select);
 }
 
 List virtualResultIds = new ArrayList<>();
 
 for (int inputId : resultIds) {
  int virtualId = StreamTransformation.getNewNodeId();
  streamGraph.addVirtualSelectNode(inputId, virtualId, select.getSelectedNames());
  virtualResultIds.add(virtualId);
 }
 return virtualResultIds;
 }
 
 private  Collection transformSplit(SplitTransformation split) {
 
 StreamTransformation input = split.getInput();
 Collection resultIds = transform(input);
 
 // the recursive transform call might have transformed this already
 if (alreadyTransformed.containsKey(split)) {
  return alreadyTransformed.get(split);
 }
 
 for (int inputId : resultIds) {
  streamGraph.addOutputSelector(inputId, split.getOutputSelector());
 }
 
 return resultIds;
 }
 
 //......
}

StreamGraphGenerator里头的transform会对SelectTransformation以及SplitTransformation进行相应的处理

transformSelect方法会根据select.getSelectedNames()来addVirtualSelectNode

transformSplit方法则根据split.getOutputSelector()来addOutputSelector

小结

DataStream的split操作接收OutputSelector参数,然后创建并返回SplitStream

OutputSelector定义了select方法用于给element打上outputNames

SplitStream继承了DataStream,它定义了select方法,可以用来根据outputNames选择split出来的dataStream

doc

DataStream Transformations

以上这篇java8中Stream的使用以及分割list案例就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持。


推荐阅读
  • 在本周的白板演练中,Apache Flink 的 PMC 成员及数据工匠首席技术官 Stephan Ewen 深入探讨了如何利用保存点功能进行流处理中的数据重新处理、错误修复、系统升级和 A/B 测试。本文将详细解释保存点的工作原理及其应用场景。 ... [详细]
  • 本文详细介绍了 Flink 和 YARN 的交互机制。YARN 是 Hadoop 生态系统中的资源管理组件,类似于 Spark on YARN 的配置方式。我们将基于官方文档,深入探讨如何在 YARN 上部署和运行 Flink 任务。 ... [详细]
  • 全面解读Apache Flink的核心架构与优势
    Apache Flink作为大数据处理领域的新兴力量,凭借其独特的流处理能力和高效的批处理性能,迅速获得了广泛的关注。本文旨在深入探讨Flink的关键技术特点及其应用场景,为大数据处理提供新的视角。 ... [详细]
  • 本文探讨了在使用Apache Flink向Kafka发送数据过程中遇到的事务频繁失败问题,并提供了详细的解决方案,包括必要的配置调整和最佳实践。 ... [详细]
  • 流处理中的计数挑战与解决方案
    本文探讨了在流处理中进行计数的各种技术和挑战,并基于作者在2016年圣何塞举行的Hadoop World大会上的演讲进行了深入分析。文章不仅介绍了传统批处理和Lambda架构的局限性,还详细探讨了流处理架构的优势及其在现代大数据应用中的重要作用。 ... [详细]
  • 从0到1搭建大数据平台
    从0到1搭建大数据平台 ... [详细]
  • 在本地环境中部署了两个不同版本的 Flink 集群,分别为 1.9.1 和 1.9.2。近期在尝试启动 1.9.1 版本的 Flink 任务时,遇到了 TaskExecutor 启动失败的问题。尽管 TaskManager 日志显示正常,但任务仍无法成功启动。经过详细分析,发现该问题是由 Kafka 版本不兼容引起的。通过调整 Kafka 客户端配置并升级相关依赖,最终成功解决了这一故障。 ... [详细]
  • Storm集成Kakfa
    一、整合说明Storm官方对Kafka的整合分为两个版本,官方说明文档分别如下:StormKafkaIntegratio ... [详细]
  • Flink1.10定义UDAGG遇到SQL
    按照以下代码测试定义的UDAGG会一直出现org.apache.flink.table.api.ValidationException:SQLvalidationfailed.nu ... [详细]
  • 以Flink为例,消除流处理常见的六大谬见
    以Flink为例,消除流处理常见的六大谬见 ... [详细]
  • 探讨了小型企业在构建安全网络和软件时所面临的挑战和机遇。本文介绍了如何通过合理的方法和工具,确保小型企业能够有效提升其软件的安全性,从而保护客户数据并增强市场竞争力。 ... [详细]
  • HBase运维工具全解析
    本文深入探讨了HBase常用的运维工具,详细介绍了每种工具的功能、使用场景及操作示例。对于HBase的开发人员和运维工程师来说,这些工具是日常管理和故障排查的重要手段。 ... [详细]
  • 本文详细介绍了 Java 中 org.apache.qpid.server.model.VirtualHost 类的 closeAsync() 方法,提供了具体的代码示例和应用场景。通过这些示例,读者可以更好地理解和使用该方法。 ... [详细]
  • 本文详细介绍如何通过修改配置文件来隐藏Apache、Nginx和PHP的版本号,从而增强网站的安全性。我们将提供具体的配置步骤,并解释这些设置的重要性。 ... [详细]
  • 本文详细介绍如何利用已搭建的LAMP(Linux、Apache、MySQL、PHP)环境,快速创建一个基于WordPress的内容管理系统(CMS)。WordPress是一款流行的开源博客平台,适用于个人或小型团队使用。 ... [详细]
author-avatar
oz法卡山英雄营
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有