热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Flink中Transform怎么用

小编给大家分享一下Flink中Transform怎么用,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,

小编给大家分享一下Flink中Transform怎么用,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!

分组聚合
  String path = "E:\\GIT\\flink-learn\\flink-learn\\telemetering.txt";
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        TupleTypeInfo> typeInfo = new TupleTypeInfo<>(Types.STRING, Types.DOUBLE, Types.LONG);

        TupleCsvInputFormat> tupleCsvInputFormat =
                new TupleCsvInputFormat<>(new Path(path), typeInfo);

        DataStreamSource> dataStreamSource = env.createInput(tupleCsvInputFormat, typeInfo);
        //或   DataStreamSource> dataStreamSource = env.readFile(tupleCsvInputFormat, path);

        SingleOutputStreamOperator> operator = dataStreamSource
                .filter(Objects::nonNull)
//                .map()
//                .flatMap()
//                .keyBy(0)
                .keyBy(tuple -> tuple.f0)
                .minBy(1);
//                .min()
//                .max(1);
//                .maxBy(1, false);
//                .sum(1);
//                .reduce();
//                .process();
        operator.print().setParallelism(1);
        env.execute();
分流/合流
String path = "E:\\GIT\\flink-learn\\flink-learn\\telemetering.txt";
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        PojoTypeInfo typeInfo = (PojoTypeInfo) Types.POJO(TelemeterDTO.class);
        PojoCsvInputFormat inputFormat = new PojoCsvInputFormat<>(new Path(path), typeInfo, new String[]{"code", "value", "timestamp"});
        DataStreamSource dataStreamSource = env.createInput(inputFormat, typeInfo);

        //分流
        SplitStream splitStream = dataStreamSource
                .split(item -> {
                    if (item.getValue() > 100) {
                        return Collections.singletonList("high");
                    }
                    return Collections.singletonList("low");
                });

        DataStream highStream = splitStream.select("high");
        DataStream lowStream = splitStream.select("low");

        //合流
        ConnectedStreams connectedStreams = lowStream.connect(highStream);
//        DataStream unionDataStream = lowStream.union(highStream); //需要类型一致

        SingleOutputStreamOperator> operator = connectedStreams
                .map(new CoMapFunction>() {
                    @Override
                    public Tuple3 map1(TelemeterDTO value) {
                        return Tuple3.of(value.getCode(), value.getValue(), value.getTimestamp());
                    }

                    @Override
                    public Tuple3 map2(TelemeterDTO value) {
                        return Tuple3.of(value.getCode(), value.getValue(), value.getTimestamp());
                    }
                });

        operator.print();
        env.execute();
UDF函数,提供底层支持
  • MapFunction

  • FilterFunction

  • ReduceFunction

  • ProcessFunction

  • SourceFunction

  • SinkFunction

富函数

富函数 包含了生命周期,及上下文相关信息,如

  • open() 可以在算子创建之初建立数据库连接

  • close() 在在算子生命结束之前关闭资源

以上是“Flink中Transform怎么用”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注编程笔记行业资讯频道!


推荐阅读
  • 如何使用 net.sf.extjwnl.data.Word 类及其代码示例详解 ... [详细]
  • 探索聚类分析中的K-Means与DBSCAN算法及其应用
    聚类分析是一种用于解决样本或特征分类问题的统计分析方法,也是数据挖掘领域的重要算法之一。本文主要探讨了K-Means和DBSCAN两种聚类算法的原理及其应用场景。K-Means算法通过迭代优化簇中心来实现数据点的划分,适用于球形分布的数据集;而DBSCAN算法则基于密度进行聚类,能够有效识别任意形状的簇,并且对噪声数据具有较好的鲁棒性。通过对这两种算法的对比分析,本文旨在为实际应用中选择合适的聚类方法提供参考。 ... [详细]
  • 本文详细解析了使用C++实现的键盘输入记录程序的源代码,该程序在Windows应用程序开发中具有很高的实用价值。键盘记录功能不仅在远程控制软件中广泛应用,还为开发者提供了强大的调试和监控工具。通过具体实例,本文深入探讨了C++键盘记录程序的设计与实现,适合需要相关技术的开发者参考。 ... [详细]
  • 在List和Set集合中存储Object类型的数据元素 ... [详细]
  • 在Ubuntu上安装MySQL时解决缺少libaio.so.1错误及libaio在MySQL中的重要性分析
    在Ubuntu系统上安装MySQL时,遇到了缺少libaio.so.1的错误。本文详细介绍了如何解决这一问题,并深入探讨了libaio库在MySQL性能优化中的重要作用。对于初学者而言,理解这些依赖关系和配置步骤是成功安装和运行MySQL的关键。通过本文的指导,读者可以顺利解决相关问题,并更好地掌握MySQL在Linux环境下的部署与管理。 ... [详细]
  • 通过使用CIFAR-10数据集,本文详细介绍了如何快速掌握Mixup数据增强技术,并展示了该方法在图像分类任务中的显著效果。实验结果表明,Mixup能够有效提高模型的泛化能力和分类精度,为图像识别领域的研究提供了有价值的参考。 ... [详细]
  • 如何高效启动大数据应用之旅?
    在前一篇文章中,我探讨了大数据的定义及其与数据挖掘的区别。本文将重点介绍如何高效启动大数据应用项目,涵盖关键步骤和最佳实践,帮助读者快速踏上大数据之旅。 ... [详细]
  • 利用 Python 中的 Altair 库实现数据抖动的水平剥离分析 ... [详细]
  • 技术日志:深入探讨Spark Streaming与Spark SQL的融合应用
    技术日志:深入探讨Spark Streaming与Spark SQL的融合应用 ... [详细]
  • 在第七天的深度学习课程中,我们将重点探讨DGL框架的高级应用,特别是在官方文档指导下进行数据集的下载与预处理。通过详细的步骤说明和实用技巧,帮助读者高效地构建和优化图神经网络的数据管道。此外,我们还将介绍如何利用DGL提供的模块化工具,实现数据的快速加载和预处理,以提升模型训练的效率和准确性。 ... [详细]
  • 本文详细介绍了批处理技术的基本概念及其在实际应用中的重要性。首先,对简单的批处理内部命令进行了概述,重点讲解了Echo命令的功能,包括如何打开或关闭回显功能以及显示消息。如果没有指定任何参数,Echo命令会显示当前的回显设置。此外,文章还探讨了批处理技术在自动化任务执行、系统管理等领域的广泛应用,为读者提供了丰富的实践案例和技术指导。 ... [详细]
  • 在关系型数据库中,数据约束是指在向数据表中插入数据时必须遵循的限制条件。在MySQL和MariaDB中,常见的数据约束包括主键约束、唯一键约束、外键约束以及非空约束等。这些约束确保了数据的完整性和一致性,是数据库管理中的重要组成部分。通过合理设置和使用这些约束,可以有效防止数据冗余和错误,提升数据库的可靠性和性能。 ... [详细]
  • 每年,意甲、德甲、英超和西甲等各大足球联赛的赛程表都是球迷们关注的焦点。本文通过 Python 编程实现了一种生成赛程表的方法,该方法基于蛇形环算法。具体而言,将所有球队排列成两列的环形结构,左侧球队对阵右侧球队,首支队伍固定不动,其余队伍按顺时针方向循环移动,从而确保每场比赛不重复。此算法不仅高效,而且易于实现,为赛程安排提供了可靠的解决方案。 ... [详细]
  • 本文将深入探讨生成对抗网络(GAN)在计算机视觉领域的应用。作为该领域的经典模型,GAN通过生成器和判别器的对抗训练,能够高效地生成高质量的图像。本文不仅回顾了GAN的基本原理,还将介绍一些最新的进展和技术优化方法,帮助读者全面掌握这一重要工具。 ... [详细]
  • 在处理多个玩家的相机控制时,我遇到了一个挑战,即无法在运行时动态添加播放器子对象以转换数组类型。为了解决这个问题,我在 `CameraControl.cs` 脚本中采取了临时措施。该脚本负责根据玩家的数量动态调整相机的缩放范围,确保所有玩家都能被相机捕捉到。 ... [详细]
author-avatar
湘刘涛
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有