热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

javaflink使用详细教程

本文讲述了关于Javaflink使用详细教程!具有很好的参考价值,希望对大家有所帮助。一起跟随六星小编过

attachments-2023-03-SEyJwf58640a94c6dbc09.jpg

本文讲述了关于Java flink使用详细教程!具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧,具体如下:

首先介绍Flink DataSet API实现统计单词频次程序,然后简要看下用于实时流式数据处理的DataStream API。

maven依赖

    org.apache.flink

    flink-java

    1.2.0

    org.apache.flink

    flink-test-utils_2.10

    1.2.0

    test

核心API概念

使用Flink时,选哟知道一些API相关的概念:

每个在分布式集合数据执行转换程序,需要使用多个转换数据函数,包括:filtering, mapping, joining, grouping, and aggregating。

Flink中sink操作触发流执行产生程序期望的结果,例如,将结果保存到文件系统或打印到标准输出。

Flink转换是懒执行,意味着知道sink操作执行才会真正执行。

Flink API支持两种模式——批处理和实时处理。对于有限数据源使用批模式,使用DataSet API;处理无界实时流数据,应该DataStream API。

DataSet API转换数据

Flink程序的入口点是ExecutionEnvironment 类的实例, 它定义了程序执行的上下文。下面创建ExecutionEnvironment对下并开始处理数据:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

注:当你在本地机器上启动程序,则仅在本地JVM上执行处理。如果需要在集群环境中启动处理,则应该在集群中每个服务器上按照Apache Flink并配置相应ExecutionEnvironment。

创建数据集(DataSet)

要执行数据转换,需要提供数据。下面使用ExecutionEnvironement创建DataSet class :

DataSet amounts = env.fromElements(1, 29, 40, 50);

也可以从其他数据源创建数据集,如Apache Kafka、CSV文件或其他数据源。

过滤和归约

准备好数据集,就可以进行过滤和转换。假设我们需要根据某阈值进行过滤,然后对过滤后的数据进行累加。则可以使用 filter() 和 reduce() 函数实现:

int threshold = 30;

List collect = amounts

  .filter(a -> a > threshold)

  .reduce((integer, t1) -> integer + t1)

  .collect();

assertThat(collect.get(0)).isEqualTo(90);

注:collect()方法是sink操作,它实际触发数据转换。

map映射

假设我们有Person对象数据集:

private static class Person {

    private int age;

    private String name;

    // standard constructors/getters/setters

}

接着创建该对象的数据集:

DataSet persOnDataSource= env.fromCollection(

  Arrays.asList(

    new Person(23, "Tom"),

    new Person(75, "Michael")));

如果我们仅需要每个对象的age属性,可以使用map转换方法实现:

List ages = personDataSource

  .map(p -> p.age)

  .collect();

assertThat(ages).hasSize(2);

assertThat(ages).contains(23, 75);

join方法

可以对两个数据集基于ID字段进行关联操作,实现连接转换。下面创建用户的事务和地址数据集:

Tuple3 address

  = new Tuple3<>(1, "5th Avenue", "London");

DataSet> addresses

  = env.fromElements(address);

Tuple2 firstTransaction 

  = new Tuple2<>(1, "Transaction_1");

DataSet> transactions 

  = env.fromElements(firstTransaction, new Tuple2<>(12, "Transaction_2"));

两个元组的第一个字段都是整型,这是连接两个数据集的ID字段。为了执行实际连接逻辑,需要实现地址和事务数据集的KeySelector接口:

private static class IdKeySelectorTransaction 

  implements KeySelector, Integer> {

    @Override

    public Integer getKey(Tuple2 value) {

        return value.f0;

    }

}

private static class IdKeySelectorAddress 

  implements KeySelector, Integer> {

    @Override

    public Integer getKey(Tuple3 value) {

        return value.f0;

    }

}

每个选择器只返回应该执行联接的字段。不幸的是不能使用lambda表达式简化实现,应该Flink需要泛型类型信息。

接着使用选择器实现合并逻辑:

List, Tuple3>>

  joined = transactions.join(addresses)

  .where(new IdKeySelectorTransaction())

  .equalTo(new IdKeySelectorAddress())

  .collect();

assertThat(joined).hasSize(1);

assertThat(joined).contains(new Tuple2<>(firstTransaction, address));

排序

首先准备一些实例数据,Tuple2类型集合:

Tuple2 secOndPerson= new Tuple2<>(4, "Tom");

Tuple2 thirdPerson = new Tuple2<>(5, "Scott");

Tuple2 fourthPerson = new Tuple2<>(200, "Michael");

Tuple2 firstPerson = new Tuple2<>(1, "Jack");

DataSet> transactiOns= env.fromElements(

  fourthPerson, secondPerson, thirdPerson, firstPerson);

如何需要按Tuple2中第一个字段进行排序,需要使用sortPartition方法执行转换:

List> sorted = transactions

  .sortPartition(new IdKeySelectorTransaction(), Order.ASCENDING)

  .collect();

assertThat(sorted)

  .containsExactly(firstPerson, secondPerson, thirdPerson, fourthPerson);

经典示例

单词计数是现实大数据处理框架的经典示例,主要对数据文本的内容处理计算单词频数。本节提供Flink实现版本。首先创建LineSplitter 类分割输入为单词,收集每个单词的Tuple2类型(key-value), key即输入中发现的每个单词,value为常数1。

该类实现FlatMapFunction接口,它接收字符串作为输入,产生 Tuple2作为输出:

public class LineSplitter implements FlatMapFunction> {

    @Override

    public void flatMap(String value, Collector> out) {

        Stream.of(value.toLowerCase().split("\\W+"))

          .filter(t -> t.length() > 0)

          .forEach(token -> out.collect(new Tuple2<>(token, 1)));

    }

}

然后调用Collector类的collect方法,推送数据至处理流水线。接着按第一个元素(单词)对元组进行分组并执行sum聚集方法对元组的第二个元素进行求和计算单词的频数。

public static DataSet> startWordCount(

  ExecutionEnvironment env, List lines) throws Exception {

    DataSet text = env.fromCollection(lines);

    return text.flatMap(new LineSplitter())

      .groupBy(0)

      .aggregate(Aggregations.SUM, 1);

}

我们使用了三种Flink转换类型:flatMap(), groupBy() 和 aggregate()。下面写完整测试是否与期望一致:

List lines = Arrays.asList(

  "This is a first sentence",

  "This is a second sentence with a one word");

DataSet> result = WordCount.startWordCount(env, lines);

List> collect = result.collect();

assertThat(collect).containsExactlyInAnyOrder(

  new Tuple2<>("a", 3), new Tuple2<>("sentence", 2), new Tuple2<>("word", 1),

  new Tuple2<>("is", 2), new Tuple2<>("this", 2), new Tuple2<>("second", 1),

  new Tuple2<>("first", 1), new Tuple2<>("with", 1), new Tuple2<>("one", 1));

DataStream API 转换数据

创建DataStream

Apache Flink 通过DataStream API支持事件流处理。首先需要使用StreamExecutionEnvironment 类消费事件:

StreamExecutionEnvironment executionEnvironment

 = StreamExecutionEnvironment.getExecutionEnvironment();

接着使用executionEnvironment从不同来源创建事件流,它可以是消息总线,如Apache Kafka,但我们简单创建一组字符串元素:

DataStream dataStream = executionEnvironment.fromElements(

  "This is a first sentence", 

  "This is a second sentence with a one word");

和DataSet类一样,可以对DataStream中的元素应用转换:

SingleOutputStreamOperator upperCase = text.map(String::toUpperCase);

为了触发执行,需要执行sink操作,如print()方法,把转换结果打印至控制台,接着执行StreamExecutionEnvironment 类的execute方法:

upperCase.print();

env.execute();

程序会产生下面输出结果:

1> THIS IS A FIRST SENTENCE

2> THIS IS A SECOND SENTENCE WITH A ONE WORD

窗口事件

当实时处理事件流时,可能需要把一些事件分为组,基于这些事件窗口进行计算。

假设事件流中每个事件发送至我们系统中,其中包括事件量和时间戳。我们可以容许事件无序到达,但前提是它们的延迟不超过20秒。对于这种场景首先创建一个流来模拟两个相隔几分钟的事件,并定义一个时间戳提取器来指定延迟阈值:

SingleOutputStreamOperator> windowed

  = env.fromElements(

  new Tuple2<>(16, ZonedDateTime.now().plusMinutes(25).toInstant().getEpochSecond()),

  new Tuple2<>(15, ZonedDateTime.now().plusMinutes(2).toInstant().getEpochSecond()))

  .assignTimestampsAndWatermarks(

    new BoundedOutOfOrdernessTimestampExtractor

      >(Time.seconds(20)) {

 

        @Override

        public long extractTimestamp(Tuple2 element) {

          return element.f1 * 1000;

        }

    });

接下来定义一个窗口操作,将事件分组到5秒的窗口中,并对这些事件应用转换:

SingleOutputStreamOperator> reduced = windowed

  .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))

  .maxBy(0, true);

reduced.print();

它将获得每5秒窗口的最后一个元素,因此它输出:

1> (15,1491221519)

请注意,我们没有看到第二个事件,因为它的到达时间晚于指定的延迟阈值。

总结

本文简要介绍了Apache Flink框架,并通过示例展示如何使用一些转换API,包括利用DataSet API实现单词频次计算,利用DataStream API 实现简单实时事件流转换。


推荐阅读
  • 本文探讨了 Kafka 集群的高效部署与优化策略。首先介绍了 Kafka 的下载与安装步骤,包括从官方网站获取最新版本的压缩包并进行解压。随后详细讨论了集群配置的最佳实践,涵盖节点选择、网络优化和性能调优等方面,旨在提升系统的稳定性和处理能力。此外,还提供了常见的故障排查方法和监控方案,帮助运维人员更好地管理和维护 Kafka 集群。 ... [详细]
  • HBase Java API 进阶:过滤器详解与应用实例
    本文详细探讨了HBase 1.2.6版本中Java API的高级应用,重点介绍了过滤器的使用方法和实际案例。首先,文章对几种常见的HBase过滤器进行了概述,包括列前缀过滤器(ColumnPrefixFilter)和时间戳过滤器(TimestampsFilter)。此外,还详细讲解了分页过滤器(PageFilter)的实现原理及其在大数据查询中的应用场景。通过具体的代码示例,读者可以更好地理解和掌握这些过滤器的使用技巧,从而提高数据处理的效率和灵活性。 ... [详细]
  • 本文介绍了如何利用Apache POI库高效读取Excel文件中的数据。通过实际测试,除了分数被转换为小数存储外,其他数据均能正确读取。若在使用过程中发现任何问题,请及时留言反馈,以便我们进行更新和改进。 ... [详细]
  • 在探讨C语言编程文本编辑器的最佳选择与专业推荐时,本文将引导读者构建一个基础的文本编辑器程序。该程序不仅能够打开并显示文本文件的内容及其路径,还集成了菜单和工具栏功能,为用户提供更加便捷的操作体验。通过本案例的学习,读者可以深入了解文本编辑器的核心实现机制。 ... [详细]
  • 使用 ListView 浏览安卓系统中的回收站文件 ... [详细]
  • 在Java Web服务开发中,Apache CXF 和 Axis2 是两个广泛使用的框架。CXF 由于其与 Spring 框架的无缝集成能力,以及更简便的部署方式,成为了许多开发者的首选。本文将详细介绍如何使用 CXF 框架进行 Web 服务的开发,包括环境搭建、服务发布和客户端调用等关键步骤,为开发者提供一个全面的实践指南。 ... [详细]
  • 在Java项目中,当两个文件进行互相调用时出现了函数错误。具体问题出现在 `MainFrame.java` 文件中,该文件位于 `cn.javass.bookmgr` 包下,并且导入了 `java.awt.BorderLayout` 和 `java.awt.Event` 等相关类。为了确保项目的正常运行,请求提供专业的解决方案,以解决函数调用中的错误。建议从类路径、依赖关系和方法签名等方面入手,进行全面排查和调试。 ... [详细]
  • 本文探讨了 Java 中 Pair 类的历史与现状。虽然 Java 标准库中没有内置的 Pair 类,但社区和第三方库提供了多种实现方式,如 Apache Commons 的 Pair 类和 JavaFX 的 javafx.util.Pair 类。这些实现为需要处理成对数据的开发者提供了便利。此外,文章还讨论了为何标准库未包含 Pair 类的原因,以及在现代 Java 开发中使用 Pair 类的最佳实践。 ... [详细]
  • 在前文探讨了Spring如何为特定的bean选择合适的通知器后,本文将进一步深入分析Spring AOP框架中代理对象的生成机制。具体而言,我们将详细解析如何通过代理技术将通知器(Advisor)中包含的通知(Advice)应用到目标bean上,以实现切面编程的核心功能。 ... [详细]
  • 在Eclipse中批量转换Java源代码文件的编码格式从GBK到UTF-8是一项常见的需求。通过编写简单的Java代码,可以高效地实现这一任务。该方法不仅适用于Java文件,还可以用于其他类型的文本文件编码转换。具体实现可以通过导入`java.io.File`类来操作文件系统,从而完成批量转换。此外,建议在转换过程中添加异常处理机制,以确保代码的健壮性和可靠性。 ... [详细]
  • 本文介绍了一种自定义的Android圆形进度条视图,支持在进度条上显示数字,并在圆心位置展示文字内容。通过自定义绘图和组件组合的方式实现,详细展示了自定义View的开发流程和关键技术点。示例代码和效果展示将在文章末尾提供。 ... [详细]
  • 本文介绍了如何在iOS平台上使用GLSL着色器将YV12格式的视频帧数据转换为RGB格式,并展示了转换后的图像效果。通过详细的技术实现步骤和代码示例,读者可以轻松掌握这一过程,适用于需要进行视频处理的应用开发。 ... [详细]
  • 在Android 4.4系统中,通过使用 `Intent` 对象并设置动作 `ACTION_GET_CONTENT` 或 `ACTION_OPEN_DOCUMENT`,可以从相册中选择图片并获取其路径。具体实现时,需要为 `Intent` 添加相应的类别,并处理返回的 Uri 以提取图片的文件路径。此方法适用于需要从用户相册中选择图片的应用场景,能够确保兼容性和用户体验。 ... [详细]
  • 深入解析:React与Webpack配置进阶指南(第二部分)
    在本篇进阶指南的第二部分中,我们将继续探讨 React 与 Webpack 的高级配置技巧。通过实际案例,我们将展示如何使用 React 和 Webpack 构建一个简单的 Todo 应用程序,具体包括 `TodoApp.js` 文件中的代码实现,如导入 React 和自定义组件 `TodoList`。此外,我们还将深入讲解 Webpack 配置文件的优化方法,以提升开发效率和应用性能。 ... [详细]
  • 在处理遗留数据库的映射时,反向工程是一个重要的初始步骤。由于实体模式已经在数据库系统中存在,Hibernate 提供了自动化工具来简化这一过程,帮助开发人员快速生成持久化类和映射文件。通过反向工程,可以显著提高开发效率并减少手动配置的错误。此外,该工具还支持对现有数据库结构进行分析,自动生成符合 Hibernate 规范的配置文件,从而加速项目的启动和开发周期。 ... [详细]
author-avatar
西门庆重生727
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有