1. 创建maven项目,添加依赖
org.apache.flinkflink-java1.10.1org.apache.flinkflink-streaming-java_2.121.10.1
2. wordcount代码
package com.demo;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;public class FileWordCount {public static void main(String[] args) throws Exception {//创建执行环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//从文件中读取数据String inputPath = "data/wordcount.txt";DataSet inputDataSet = env.readTextFile(inputPath);//对数据集进行处理,按照空格进行分词,转换成(word,1)二元组进行统计DataSet> resultSet = inputDataSet.flatMap(new MyflatMapper()).groupBy(0) //按照第一个位置的word分组.sum(1); //讲第二个位置上的数据求和resultSet.print();}//自定义类,实现FlatMapFunction接口public static class MyflatMapper implements FlatMapFunction> {public void flatMap(String s, Collector> collector) throws Exception {//按空格分词String[] words = s.split(" ");//遍历所有word,包成二元组输出for(String word:words){collector.collect(new Tuple2(word, 1));}}}
}
3. 数据文件wordcount.txt
hello world
hello spark
hello flink
在项目目录下创建data目录,并在data目录下创建wordcount.txt文件。
4. 运行程序
输出如下: