热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

开发笔记:Flink快速上手之wordCount(java)

篇首语:本文由编程笔记#小编为大家整理,主要介绍了Flink快速上手之wordCount(java)相关的知识,希望对你有一定的参考价值。创建m

篇首语:本文由编程笔记#小编为大家整理,主要介绍了Flink快速上手之wordCount(java)相关的知识,希望对你有一定的参考价值。




  • 创建maven功臣

  • pom文件


xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
MyFlink
MyFlink
1.0-SNAPSHOT



org.apache.flink
flink-java
1.9.0
compile


org.apache.flink
flink-streaming-java_2.11
1.9.0
compile





org.apache.maven.plugins
maven-shade-plugin
3.1.0

false



package

shade




com.google.code.findbugs:jsr305
org.slf4j:*
log4j:*





com.haier.cosmodata.source.MyDataStreamSourceDemo


reference.conf




*:*:*:*

META-INF/*.SF
META-INF/*.DSA
META-INF/*.RSA












  • StreamWordCount

package com.sgg.bigdata;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* 流式处理WordCount
* Created by huqian on 2020/5/23 22:24
*/
public class StreamWordCount {
public static void main(String[] args) throws Exception {
//创建一个流处理的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//接受socket数据流
DataStreamSource textDataSteam = env.socketTextStream("localhost",7777);
//逐一读取数据,打散之后进行WordCount
SingleOutputStreamOperator> wordCountDataStream = textDataSteam
.flatMap(new FlatMapFunction>() {
public void flatMap(String s, Collector> collector) throws Exception {
String[] tokens = s.split(" ");
for (String token : tokens) {
if (token.length() > 0) {
collector.collect(new Tuple2(token, 1));
}
}
}
})
.filter(new FilterFunction>() {
public boolean filter(Tuple2 stringIntegerTuple2) throws Exception {
if (stringIntegerTuple2.equals(null)) {
return false;
}
return true;
}
})
.keyBy(0)
.sum(1);
//打印输出
wordCountDataStream.print();
//执行任务
env.execute("StreamWordCountJob");
//测试需要开启端口7777
}
}

-- 测试
技术图片
技术图片


推荐阅读
  • 本文深入探讨了如何利用Maven高效管理项目中的外部依赖库。通过介绍Maven的官方依赖搜索地址(),详细讲解了依赖库的添加、版本管理和冲突解决等关键操作。此外,还提供了实用的配置示例和最佳实践,帮助开发者优化项目构建流程,提高开发效率。 ... [详细]
  • 为了在Hadoop 2.7.2中实现对Snappy压缩和解压功能的原生支持,本文详细介绍了如何重新编译Hadoop源代码,并优化其Native编译过程。通过这一优化,可以显著提升数据处理的效率和性能。此外,还探讨了编译过程中可能遇到的问题及其解决方案,为用户提供了一套完整的操作指南。 ... [详细]
  • 本文介绍了如何利用Struts1框架构建一个简易的四则运算计算器。通过采用DispatchAction来处理不同类型的计算请求,并使用动态Form来优化开发流程,确保代码的简洁性和可维护性。同时,系统提供了用户友好的错误提示,以增强用户体验。 ... [详细]
  • 本文介绍如何使用 Python 的 DOM 和 SAX 方法解析 XML 文件,并通过示例展示了如何动态创建数据库表和处理大量数据的实时插入。 ... [详细]
  • 在本地环境中部署了两个不同版本的 Flink 集群,分别为 1.9.1 和 1.9.2。近期在尝试启动 1.9.1 版本的 Flink 任务时,遇到了 TaskExecutor 启动失败的问题。尽管 TaskManager 日志显示正常,但任务仍无法成功启动。经过详细分析,发现该问题是由 Kafka 版本不兼容引起的。通过调整 Kafka 客户端配置并升级相关依赖,最终成功解决了这一故障。 ... [详细]
  • 本文介绍了如何利用ObjectMapper实现JSON与JavaBean之间的高效转换。ObjectMapper是Jackson库的核心组件,能够便捷地将Java对象序列化为JSON格式,并支持从JSON、XML以及文件等多种数据源反序列化为Java对象。此外,还探讨了在实际应用中如何优化转换性能,以提升系统整体效率。 ... [详细]
  • com.sun.javadoc.PackageDoc.exceptions()方法的使用及代码示例 ... [详细]
  • 解决Only fullscreen opaque activities can request orientation错误的方法
    本文介绍了在使用PictureSelectorLight第三方框架时遇到的Only fullscreen opaque activities can request orientation错误,并提供了一种有效的解决方案。 ... [详细]
  • 在 Ubuntu 中遇到 Samba 服务器故障时,尝试卸载并重新安装 Samba 发现配置文件未重新生成。本文介绍了解决该问题的方法。 ... [详细]
  • 本文详细介绍了 InfluxDB、collectd 和 Grafana 的安装与配置流程。首先,按照启动顺序依次安装并配置 InfluxDB、collectd 和 Grafana。InfluxDB 作为时序数据库,用于存储时间序列数据;collectd 负责数据的采集与传输;Grafana 则用于数据的可视化展示。文中提供了 collectd 的官方文档链接,便于用户参考和进一步了解其配置选项。通过本指南,读者可以轻松搭建一个高效的数据监控系统。 ... [详细]
  • 如何将TS文件转换为M3U8直播流:HLS与M3U8格式详解
    在视频传输领域,MP4虽然常见,但在直播场景中直接使用MP4格式存在诸多问题。例如,MP4文件的头部信息(如ftyp、moov)较大,导致初始加载时间较长,影响用户体验。相比之下,HLS(HTTP Live Streaming)协议及其M3U8格式更具优势。HLS通过将视频切分成多个小片段,并生成一个M3U8播放列表文件,实现低延迟和高稳定性。本文详细介绍了如何将TS文件转换为M3U8直播流,包括技术原理和具体操作步骤,帮助读者更好地理解和应用这一技术。 ... [详细]
  • 如何使用 `org.apache.tomcat.websocket.server.WsServerContainer.findMapping()` 方法及其代码示例解析 ... [详细]
  • 在对WordPress Duplicator插件0.4.4版本的安全评估中,发现其存在跨站脚本(XSS)攻击漏洞。此漏洞可能被利用进行恶意操作,建议用户及时更新至最新版本以确保系统安全。测试方法仅限于安全研究和教学目的,使用时需自行承担风险。漏洞编号:HTB23162。 ... [详细]
  • Java能否直接通过HTTP将字节流绕过HEAP写入SD卡? ... [详细]
  • 今天我开始学习Flutter,并在Android Studio 3.5.3中创建了一个新的Flutter项目。然而,在首次尝试运行时遇到了问题,Gradle任务 `assembleDebug` 执行失败,退出状态码为1。经过初步排查,发现可能是由于依赖项配置不当或Gradle版本不兼容导致的。为了解决这个问题,我计划检查项目的 `build.gradle` 文件,确保所有依赖项和插件版本都符合要求,并尝试更新Gradle版本。此外,还将验证环境变量配置是否正确,以确保开发环境的稳定性。 ... [详细]
author-avatar
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有