作者:WINNIE双双围脖_370 | 来源:互联网 | 2023-08-31 11:43
目录一.使用maven命令构建flink工程(或者通过idea构建也可以)1.flink工程构建命令2.项目工程目录结构如下二.编写wordcount的代码1.项目结构介绍
目录
一.使用maven 命令构建flink工程(或者通过idea构建也可以)
1.flink工程构建命令
2.项目工程目录结构如下
二.编写wordcount的代码
1.项目结构介绍
2.流处理步骤介绍
3.具体代码
4.测试
一.使用maven 命令构建flink工程(或者通过idea构建也可以)
1.flink工程构建命令
mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.9.0
2.项目工程目录结构如下
二.编写wordcount的代码
1.项目结构介绍
BatchJob为批处理
StreamingJob为流处理
我们这里使用流处理
2.流处理步骤介绍
//1.设置执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.定义数据源(可以监听端口或者定义kafka数据源等等)
//win下打开cmd 执行nc -lp 9000打开端口,输入数据即可
DataStream text = env.socketTextStream("127.0.0.1",9000,"\n"); //3.数据的转换、处理
//4.执行flink程序
env.execute("Flink Streaming Java API Skeleton");
3.具体代码
/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file* distributed with this work for additional information* regarding copyright ownership. The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package com.tangbb.cold.flink;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;/*** Skeleton for a Flink Streaming Job.** For a tutorial how to write a Flink streaming application, check the* tutorials and examples on the Flink Website.**
To package your application into a JAR file for execution, run* 'mvn clean package' on the command line.**
If you change the name of the main class (with the public static void main(String[] args))* method, change the respective entry in the POM.xml file (simply search for 'mainClass').*/
public class StreamingJob {public static void main(String[] args) throws Exception {// set up the streaming execution environment 流的运行时环境//1.设置执行环境final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2.定义数据源(可以监听端口或者定义kafka数据源等等)//win下打开cmd 执行nc -lp 9000打开端口,输入数据即可DataStream text = env.socketTextStream("127.0.0.1",9000,"\n");//3.数据的转换、处理DataStream wordWithCountDataStream = text.flatMap(new FlatMapFunction() {@Overridepublic void flatMap(String s, Collector collector) throws Exception {for (String word: s.split("\\s")){collector.collect(new WordWithCount(word,1));}}}).keyBy("word").timeWindow(Time.seconds(5)).reduce(new ReduceFunction() {@Overridepublic WordWithCount reduce(WordWithCount wordWithCount, WordWithCount t1) throws Exception {return new WordWithCount(wordWithCount.word,wordWithCount.count+t1.count);}});//3.打印wordWithCountDataStream.print().setParallelism(1);//4.执行flink程序env.execute("Flink Streaming Java API Skeleton");}/*** POJO*/public static class WordWithCount{public String word;public long count;public WordWithCount(){}public WordWithCount(String word, long count) {this.word = word;this.count = count;}public String getWord() {return word;}public void setWord(String word) {this.word = word;}public long getCount() {return count;}public void setCount(long count) {this.count = count;}@Overridepublic String toString() {return "WordWithCount{" +"word='" + word + '\'' +", count=" + count +'}';}}
}
4.测试
win下安装nc命令,参考链接如下
在Windows操作系统中怎样使用nc命令_百思不得小赵的博客-CSDN博客_windows nc
1.cmd打开9000端口发送数据测试
2.启动idea单词统计程序
idea控制台展示统计结果