热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Flink案例分析

Flink程序的执行过程no-desc说明详情1-env获取flink的执行环境批处理:ExecutionEnvironmentenvExecutionEnvironment.ge

Flink程序的执行过程

no-desc 说明 详情
1-env 获取flink的执行环境

批处理:ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

流处理:StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

2-source 加载数据 1) socketTextStream – 读取Socket 数据流
​2) readTextFile() – 逐行读取文本文件获取数据流,每行都返回字符串
3) fromCollection() – 从集合中创建数据流
​4) fromElements() – 从给定的数据对象创建数据流,所有数据类型要一致
​5) addSource() – 添加新的源函数,例如从kafka 中读取数据,参见读取kafka 数据案例
3-transformation 对加载的数据进行转换  
4-sink 对结果进行保存或者打印 1) writeAsText() – 以字符串的形式逐行写入文件,调用每个元素的toString()得到写入的字符串
2) writeAsCsv() – 将元组写出以逗号分隔的csv 文件。注意:只能作用到元组数据上
​3) print() – 控制台直接输出结果,调用对象的toString()方法得到输出结果。
​4) addSink() – 自定义接收函数。例如将结果保存到kafka 中,参见kafka 案例
5-execute 触发flink程序的执行 代码流程必须符合 source ->transformation -> sink transformation 都是执行,需要最后使用env.execute()或者使用 print(),count(),collect() 触发执行

Flink 案例分析

注意

Flink编程不是基于K,V格式的编程,通过某些方式来指定虚拟key

Flink中的tuple最多支持25个元素,每个元素是从0开始

算子

中间处理、转换的环节是通过不同的算子完成的。

算子将一个或多个DataStream转换为新的DataStream

转型 描述
Map
DataStream→DataStream

采用一个数据元并生成一个数据元。一个map函数,它将输入流的值加倍:

DataStream<Integer> dataStream = //...
dataStream.map(new MapFunction<Integer, Integer>() {
    @Override
    public Integer map(Integer value) throws Exception {
        return 2 * value;
    }
});
FlatMap
DataStream→DataStream

采用一个数据元并生成零个,一个或多个数据元。将句子分割为单词的flatmap函数:

dataStream.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public void flatMap(String value, Collector<String> out)
        throws Exception {
        for(String word: value.split(" ")){
            out.collect(word);
        }
    }
});
Filter
DataStream→DataStream

计算每个数据元的布尔函数,并保存函数返回true的数据元。过滤掉零值的过滤器:

dataStream.filter(new FilterFunction<Integer>() {
    @Override
    public boolean filter(Integer value) throws Exception {
        return value != 0;
    }
});    
KeyBy
DataStream→KeyedStream

逻辑上将流分区为不相交的分区。具有相同Keys的所有记录都分配给同一分区。在内部,keyBy()是使用散列分区实现的。指定键有不同的方法

此转换返回KeyedStream,其中包括使用被Keys化状态所需KeyedStream

dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple
    

注意 如果出现以下情况,则类型不能成为关键

  1. 它是POJO类型但不覆盖hashCode()方法并依赖于Object.hashCode()实现。
  2. 它是任何类型的数组。
Reduce
KeyedStream→DataStream

被Keys化数据流上的“滚动”Reduce。将当前数据元与最后一个Reduce的值组合并发出新值。

reduce函数,用于创建部分和的流:

keyedStream.reduce(new ReduceFunction<Integer>() {
    @Override
    public Integer reduce(Integer value1, Integer value2)
    throws Exception {
        return value1 + value2;
    }
});

案例1: 元素处理

env: 批

Source:fromElements

Sink:print

算子:Map

public class MapTest {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSet dataSet = env.fromElements(1, 2, -3, 0, 5, -9, 8);
        DataSet dataSet2 = dataSet.map(new Tokenizer());
//        DataSet dataSet2 = dataSet.map(i->i * 2);
        dataSet2.print();
    }

    public static class Tokenizer implements MapFunction {
        @Override
        public Integer map(Integer in) {
            return in * 2;
        }
    }
}

案例2: 词频统计

env: 批

Source:readTextFile 

Sink:writeAsCsv

算子:Map

public class SocketWindowWordCountJava {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSet dataSet = env.readTextFile("/yourpath/in.txt");

        DataSet> counts =
                // split up the lines in pairs (2-tuples) containing: (word,1)
                dataSet.flatMap(new Tokenizer())
                        // group by the tuple field "0" and sum up tuple field "1"
                        .groupBy(0)
                        .sum(1);

        String outputPath = "/yourpath/out.txt";
        counts.writeAsCsv(outputPath, "\n", " ");
        env.execute("myflink");
    }

    public static class Tokenizer implements FlatMapFunction> {
        @Override
        public void flatMap(String value, Collector> out) {
            String[] tokens = value.split(" ");
            // emit the pairs
            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Tuple2(token, 1));
                }
            }
        }
    }
}

案例3:数据流汇总

env: 流

Source:addSource

Sink:print

算子:keyBy、Reduce

public class ReduceTest {
    private static final Logger LOG = LoggerFactory.getLogger(ReduceTest.class);
    private static final String[] TYPE = {"苹果", "梨", "西瓜", "葡萄", "火龙果"};

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //添加自定义数据源,每秒发出一笔订单信息{商品名称,商品数量}
        DataStreamSource> orderSource = env.addSource(new SourceFunction>() {
            private volatile boolean isRunning = true;
            private final Random random = new Random();

            @Override
            public void run(SourceContext> ctx) throws Exception {
                while (isRunning) {
                    TimeUnit.SECONDS.sleep(1);
                    ctx.collect(Tuple2.of(TYPE[random.nextInt(TYPE.length)], 1));
                }
            }

            @Override
            public void cancel() {
                isRunning = false;
            }

        }, "order-info");

        orderSource.keyBy(0)
                //将上一元素与当前元素相加后,返回给下一元素处理
                .reduce(new ReduceFunction>() {
                    @Override
                    public Tuple2 reduce(Tuple2 value1, Tuple2 value2)
                            throws Exception {
                        return Tuple2.of(value1.f0, value1.f1 + value2.f1);
                    }
                })
                .print();

        env.execute("Flink Streaming Java API Skeleton");
    }
}

 

Source:readTextFile 

Sink:writeAsCsv

算子:Map

参考

https://blog.csdn.net/qq_40929921/article/details/99603150

https://flink.sojb.cn/dev/stream/operators/


推荐阅读
  • 本指南从零开始介绍Scala编程语言的基础知识,重点讲解了Scala解释器REPL(读取-求值-打印-循环)的使用方法。REPL是Scala开发中的重要工具,能够帮助初学者快速理解和实践Scala的基本语法和特性。通过详细的示例和练习,读者将能够熟练掌握Scala的基础概念和编程技巧。 ... [详细]
  • 在尝试对 QQmlPropertyMap 类进行测试驱动开发时,发现其派生类中无法正常调用槽函数或 Q_INVOKABLE 方法。这可能是由于 QQmlPropertyMap 的内部实现机制导致的,需要进一步研究以找到解决方案。 ... [详细]
  • 如何使用 `org.eclipse.rdf4j.query.impl.MapBindingSet.getValue()` 方法及其代码示例详解 ... [详细]
  • 本文详细解析了使用C++实现的键盘输入记录程序的源代码,该程序在Windows应用程序开发中具有很高的实用价值。键盘记录功能不仅在远程控制软件中广泛应用,还为开发者提供了强大的调试和监控工具。通过具体实例,本文深入探讨了C++键盘记录程序的设计与实现,适合需要相关技术的开发者参考。 ... [详细]
  • 在对WordPress Duplicator插件0.4.4版本的安全评估中,发现其存在跨站脚本(XSS)攻击漏洞。此漏洞可能被利用进行恶意操作,建议用户及时更新至最新版本以确保系统安全。测试方法仅限于安全研究和教学目的,使用时需自行承担风险。漏洞编号:HTB23162。 ... [详细]
  • 本文介绍了如何利用Struts1框架构建一个简易的四则运算计算器。通过采用DispatchAction来处理不同类型的计算请求,并使用动态Form来优化开发流程,确保代码的简洁性和可维护性。同时,系统提供了用户友好的错误提示,以增强用户体验。 ... [详细]
  • 本文详细介绍了 Java 中遍历 Map 对象的几种常见方法及其应用场景。首先,通过 `entrySet` 方法结合增强型 for 循环进行遍历是最常用的方式,适用于需要同时访问键和值的场景。此外,还探讨了使用 `keySet` 和 `values` 方法分别遍历键和值的技巧,以及使用迭代器(Iterator)进行更灵活的遍历操作。每种方法都附有示例代码和具体的应用实例,帮助开发者更好地理解和选择合适的遍历策略。 ... [详细]
  • 在使用 Qt 进行 YUV420 图像渲染时,由于 Qt 本身不支持直接绘制 YUV 数据,因此需要借助 QOpenGLWidget 和 OpenGL 技术来实现。通过继承 QOpenGLWidget 类并重写其绘图方法,可以利用 GPU 的高效渲染能力,实现高质量的 YUV420 图像显示。此外,这种方法还能显著提高图像处理的性能和流畅性。 ... [详细]
  • 本文将继续探讨 JavaScript 函数式编程的高级技巧及其实际应用。通过一个具体的寻路算法示例,我们将深入分析如何利用函数式编程的思想解决复杂问题。示例中,节点之间的连线代表路径,连线上的数字表示两点间的距离。我们将详细讲解如何通过递归和高阶函数等技术实现高效的寻路算法。 ... [详细]
  • QT框架中事件循环机制及事件分发类详解
    在QT框架中,QCoreApplication类作为事件循环的核心组件,为应用程序提供了基础的事件处理机制。该类继承自QObject,负责管理和调度各种事件,确保程序能够响应用户操作和其他系统事件。通过事件循环,QCoreApplication实现了高效的事件分发和处理,使得应用程序能够保持流畅的运行状态。此外,QCoreApplication还提供了多种方法和信号槽机制,方便开发者进行事件的定制和扩展。 ... [详细]
  • 本文介绍了如何在iOS平台上使用GLSL着色器将YV12格式的视频帧数据转换为RGB格式,并展示了转换后的图像效果。通过详细的技术实现步骤和代码示例,读者可以轻松掌握这一过程,适用于需要进行视频处理的应用开发。 ... [详细]
  • 本文对常见的字符串哈希函数进行了全面分析,涵盖了BKDRHash、APHash、DJBHash、JSHash、RSHash、SDBMHash、PJWHash和ELFHash等多种算法。这些哈希函数在不同的应用场景中表现出各异的性能特点,通过对比其算法原理、计算效率和碰撞概率,为实际应用提供了有价值的参考。 ... [详细]
  • 第二章:Kafka基础入门与核心概念解析
    本章节主要介绍了Kafka的基本概念及其核心特性。Kafka是一种分布式消息发布和订阅系统,以其卓越的性能和高吞吐量而著称。最初,Kafka被设计用于LinkedIn的活动流和运营数据处理,旨在高效地管理和传输大规模的数据流。这些数据主要包括用户活动记录、系统日志和其他实时信息。通过深入解析Kafka的设计原理和应用场景,读者将能够更好地理解其在现代大数据架构中的重要地位。 ... [详细]
  • 在Android 4.4系统中,通过使用 `Intent` 对象并设置动作 `ACTION_GET_CONTENT` 或 `ACTION_OPEN_DOCUMENT`,可以从相册中选择图片并获取其路径。具体实现时,需要为 `Intent` 添加相应的类别,并处理返回的 Uri 以提取图片的文件路径。此方法适用于需要从用户相册中选择图片的应用场景,能够确保兼容性和用户体验。 ... [详细]
  • 在前文探讨了Spring如何为特定的bean选择合适的通知器后,本文将进一步深入分析Spring AOP框架中代理对象的生成机制。具体而言,我们将详细解析如何通过代理技术将通知器(Advisor)中包含的通知(Advice)应用到目标bean上,以实现切面编程的核心功能。 ... [详细]
author-avatar
大大醯_804_224
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有