作者: | 来源:互联网 | 2023-08-30 16:31
篇首语:本文由编程笔记#小编为大家整理,主要介绍了Flink快速上手之wordCount(java)相关的知识,希望对你有一定的参考价值。
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
MyFlink
MyFlink
1.0-SNAPSHOT
org.apache.flink
flink-java
1.9.0
compile
org.apache.flink
flink-streaming-java_2.11
1.9.0
compile
org.apache.maven.plugins
maven-shade-plugin
3.1.0
false
package
shade
com.google.code.findbugs:jsr305
org.slf4j:*
log4j:*
com.haier.cosmodata.source.MyDataStreamSourceDemo
reference.conf
*:*:*:*
META-INF/*.SF
META-INF/*.DSA
META-INF/*.RSA
package com.sgg.bigdata;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* 流式处理WordCount
* Created by huqian on 2020/5/23 22:24
*/
public class StreamWordCount {
public static void main(String[] args) throws Exception {
//创建一个流处理的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//接受socket数据流
DataStreamSource textDataSteam = env.socketTextStream("localhost",7777);
//逐一读取数据,打散之后进行WordCount
SingleOutputStreamOperator> wordCountDataStream = textDataSteam
.flatMap(new FlatMapFunction>() {
public void flatMap(String s, Collector> collector) throws Exception {
String[] tokens = s.split(" ");
for (String token : tokens) {
if (token.length() > 0) {
collector.collect(new Tuple2(token, 1));
}
}
}
})
.filter(new FilterFunction>() {
public boolean filter(Tuple2 stringIntegerTuple2) throws Exception {
if (stringIntegerTuple2.equals(null)) {
return false;
}
return true;
}
})
.keyBy(0)
.sum(1);
//打印输出
wordCountDataStream.print();
//执行任务
env.execute("StreamWordCountJob");
//测试需要开启端口7777
}
}
-- 测试