热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Flink入门单词统计程序开发

目录一.使用maven命令构建flink工程(或者通过idea构建也可以)1.flink工程构建命令2.项目工程目录结构如下二.编写wordcount的代码1.项目结构介绍

目录

一.使用maven 命令构建flink工程(或者通过idea构建也可以)

1.flink工程构建命令

2.项目工程目录结构如下

二.编写wordcount的代码 

1.项目结构介绍

2.流处理步骤介绍

3.具体代码

4.测试




一.使用maven 命令构建flink工程(或者通过idea构建也可以)


1.flink工程构建命令

mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.9.0

2.项目工程目录结构如下


二.编写wordcount的代码 


1.项目结构介绍

BatchJob为批处理

StreamingJob为流处理

我们这里使用流处理


2.流处理步骤介绍

//1.设置执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//2.定义数据源(可以监听端口或者定义kafka数据源等等)
//win下打开cmd 执行nc -lp 9000打开端口,输入数据即可
DataStream text = env.socketTextStream("127.0.0.1",9000,"\n");

//3.数据的转换、处理

//4.执行flink程序
env.execute("Flink Streaming Java API Skeleton");

3.具体代码

/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file* distributed with this work for additional information* regarding copyright ownership. The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package com.tangbb.cold.flink;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;/*** Skeleton for a Flink Streaming Job.**

For a tutorial how to write a Flink streaming application, check the* tutorials and examples on the Flink Website.**

To package your application into a JAR file for execution, run* 'mvn clean package' on the command line.**

If you change the name of the main class (with the public static void main(String[] args))* method, change the respective entry in the POM.xml file (simply search for 'mainClass').*/
public class StreamingJob {public static void main(String[] args) throws Exception {// set up the streaming execution environment 流的运行时环境//1.设置执行环境final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2.定义数据源(可以监听端口或者定义kafka数据源等等)//win下打开cmd 执行nc -lp 9000打开端口,输入数据即可DataStream text = env.socketTextStream("127.0.0.1",9000,"\n");//3.数据的转换、处理DataStream wordWithCountDataStream = text.flatMap(new FlatMapFunction() {@Overridepublic void flatMap(String s, Collector collector) throws Exception {for (String word: s.split("\\s")){collector.collect(new WordWithCount(word,1));}}}).keyBy("word").timeWindow(Time.seconds(5)).reduce(new ReduceFunction() {@Overridepublic WordWithCount reduce(WordWithCount wordWithCount, WordWithCount t1) throws Exception {return new WordWithCount(wordWithCount.word,wordWithCount.count+t1.count);}});//3.打印wordWithCountDataStream.print().setParallelism(1);//4.执行flink程序env.execute("Flink Streaming Java API Skeleton");}/*** POJO*/public static class WordWithCount{public String word;public long count;public WordWithCount(){}public WordWithCount(String word, long count) {this.word = word;this.count = count;}public String getWord() {return word;}public void setWord(String word) {this.word = word;}public long getCount() {return count;}public void setCount(long count) {this.count = count;}@Overridepublic String toString() {return "WordWithCount{" +"word='" + word + '\'' +", count=" + count +'}';}}
}


4.测试

win下安装nc命令,参考链接如下

在Windows操作系统中怎样使用nc命令_百思不得小赵的博客-CSDN博客_windows nc

1.cmd打开9000端口发送数据测试

 2.启动idea单词统计程序

idea控制台展示统计结果


推荐阅读
  • 自动轮播,反转播放的ViewPagerAdapter的使用方法和效果展示
    本文介绍了如何使用自动轮播、反转播放的ViewPagerAdapter,并展示了其效果。该ViewPagerAdapter支持无限循环、触摸暂停、切换缩放等功能。同时提供了使用GIF.gif的示例和github地址。通过LoopFragmentPagerAdapter类的getActualCount、getActualItem和getActualPagerTitle方法可以实现自定义的循环效果和标题展示。 ... [详细]
  • 闭包一直是Java社区中争论不断的话题,很多语言都支持闭包这个语言特性,闭包定义了一个依赖于外部环境的自由变量的函数,这个函数能够访问外部环境的变量。本文以JavaScript的一个闭包为例,介绍了闭包的定义和特性。 ... [详细]
  • Java学习笔记之面向对象编程(OOP)
    本文介绍了Java学习笔记中的面向对象编程(OOP)内容,包括OOP的三大特性(封装、继承、多态)和五大原则(单一职责原则、开放封闭原则、里式替换原则、依赖倒置原则)。通过学习OOP,可以提高代码复用性、拓展性和安全性。 ... [详细]
  • IjustinheritedsomewebpageswhichusesMooTools.IneverusedMooTools.NowIneedtoaddsomef ... [详细]
  • JDK源码学习之HashTable(附带面试题)的学习笔记
    本文介绍了JDK源码学习之HashTable(附带面试题)的学习笔记,包括HashTable的定义、数据类型、与HashMap的关系和区别。文章提供了干货,并附带了其他相关主题的学习笔记。 ... [详细]
  • 本文介绍了Swing组件的用法,重点讲解了图标接口的定义和创建方法。图标接口用来将图标与各种组件相关联,可以是简单的绘画或使用磁盘上的GIF格式图像。文章详细介绍了图标接口的属性和绘制方法,并给出了一个菱形图标的实现示例。该示例可以配置图标的尺寸、颜色和填充状态。 ... [详细]
  • 基于Socket的多个客户端之间的聊天功能实现方法
    本文介绍了基于Socket的多个客户端之间实现聊天功能的方法,包括服务器端的实现和客户端的实现。服务器端通过每个用户的输出流向特定用户发送消息,而客户端通过输入流接收消息。同时,还介绍了相关的实体类和Socket的基本概念。 ... [详细]
  • Java太阳系小游戏分析和源码详解
    本文介绍了一个基于Java的太阳系小游戏的分析和源码详解。通过对面向对象的知识的学习和实践,作者实现了太阳系各行星绕太阳转的效果。文章详细介绍了游戏的设计思路和源码结构,包括工具类、常量、图片加载、面板等。通过这个小游戏的制作,读者可以巩固和应用所学的知识,如类的继承、方法的重载与重写、多态和封装等。 ... [详细]
  • Java容器中的compareto方法排序原理解析
    本文从源码解析Java容器中的compareto方法的排序原理,讲解了在使用数组存储数据时的限制以及存储效率的问题。同时提到了Redis的五大数据结构和list、set等知识点,回忆了作者大学时代的Java学习经历。文章以作者做的思维导图作为目录,展示了整个讲解过程。 ... [详细]
  • 本文介绍了OC学习笔记中的@property和@synthesize,包括属性的定义和合成的使用方法。通过示例代码详细讲解了@property和@synthesize的作用和用法。 ... [详细]
  • Spring特性实现接口多类的动态调用详解
    本文详细介绍了如何使用Spring特性实现接口多类的动态调用。通过对Spring IoC容器的基础类BeanFactory和ApplicationContext的介绍,以及getBeansOfType方法的应用,解决了在实际工作中遇到的接口及多个实现类的问题。同时,文章还提到了SPI使用的不便之处,并介绍了借助ApplicationContext实现需求的方法。阅读本文,你将了解到Spring特性的实现原理和实际应用方式。 ... [详细]
  • 本文讨论了在手机移动端如何使用HTML5和JavaScript实现视频上传并压缩视频质量,或者降低手机摄像头拍摄质量的问题。作者指出HTML5和JavaScript无法直接压缩视频,只能通过将视频传送到服务器端由后端进行压缩。对于控制相机拍摄质量,只有使用JAVA编写Android客户端才能实现压缩。此外,作者还解释了在交作业时使用zip格式压缩包导致CSS文件和图片音乐丢失的原因,并提供了解决方法。最后,作者还介绍了一个用于处理图片的类,可以实现图片剪裁处理和生成缩略图的功能。 ... [详细]
  • 本文介绍了在CentOS上安装Python2.7.2的详细步骤,包括下载、解压、编译和安装等操作。同时提供了一些注意事项,以及测试安装是否成功的方法。 ... [详细]
  • 解决nginx启动报错epoll_wait() reported that client prematurely closed connection的方法
    本文介绍了解决nginx启动报错epoll_wait() reported that client prematurely closed connection的方法,包括检查location配置是否正确、pass_proxy是否需要加“/”等。同时,还介绍了修改nginx的error.log日志级别为debug,以便查看详细日志信息。 ... [详细]
  • 纠正网上的错误:自定义一个类叫java.lang.System/String的方法
    本文纠正了网上关于自定义一个类叫java.lang.System/String的错误答案,并详细解释了为什么这种方法是错误的。作者指出,虽然双亲委托机制确实可以阻止自定义的System类被加载,但通过自定义一个特殊的类加载器,可以绕过双亲委托机制,达到自定义System类的目的。作者呼吁读者对网上的内容持怀疑态度,并带着问题来阅读文章。 ... [详细]
author-avatar
WINNIE双双围脖_370
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有