点击蓝字 关注我们
Flink之DataSet数据源(七)
01
Flink Source之Text数据源
DataSet<String> localLines = env.readTextFlie("file:///path/to/my/textfile");
支持本地文件地址或者HDFS文件地址
//Tuple类
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class TextSourceJava1Demo {
public static void main(String[] args) throws Exception{
//批处理
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource<String> source = env.readTextFile("file:///Users/zhangjingyu/Desktop/flink/src/main/resources/wordcount");
DataSet
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String s, Collector
String[] words = s.split(" ");
for(String word : words){
collector.collect(new Tuple2<>(word,1));
}
}
})
.groupBy("f0")
.reduce(new ReduceFunction
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {
return new Tuple2<>(t1.f0,t1.f1+t2.f1);
}
});
dataSet.print();
}
//POJO类
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.util.Collector;
public class TextSourceJava2Demo {
public static void main(String[] args) throws Exception{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource
DataSet
.flatMap(new FlatMapFunction
@Override
public void flatMap(String s, Collector
String[] words = s.split(" ");
for(String word : words ){
collector.collect(new WordCount(word,1));
}
}
})
.groupBy("word")
.reduce(new ReduceFunction
@Override
public WordCount reduce(WordCount t1, WordCount t2) throws Exception {
return new WordCount(t1.word,t1.count+t2.count);
}
});
dataSet.print();
}
public static class WordCount{
private String word;
private int count;
public WordCount(String word, int count) {
this.word = word;
this.count = count;
}
public WordCount(){
}
@Override
public String toString() {
return "WordCount{" +
"word='" + word + '\'' +
", count=" + count +
'}';
}
}
}
02
Flink Source之Json数据源
DataSet<String> localLines = env.readTextFlie("file:///path/to/my/jsonfile");
支持本地文件地址或者HDFS文件地址
■ 解析JSON,需要添加依赖包jackson-databind
pom.xml中添加:
<dependency>
<groupId>com.fasterxml.jackson.coregroupId>
<artifactId>jackson-databindartifactId>
<version>2.10.0version>
dependency>
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
public class JsonSourceJavaDemo {
//调用Json解析类
final static ObjectMapper objectMapper = new ObjectMapper();
public static void main(String[] args) throws Exception{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource
DataSet
//与text的flatmap不同,需要对json格式进行转换,所以用map
.map(new MapFunction
@Override
public WordCountPOJO map(String line) throws Exception {
//调用objectMapper解析Json行数据
WordCountPOJO wordCountPOJO = objectMapper.readValue(line,WordCountPOJO.class);
return wordCountPOJO;
}
})
.groupBy("word")
.reduce(new ReduceFunction
@Override
public WordCountPOJO reduce(WordCountPOJO t1, WordCountPOJO t2) throws Exception {
return new WordCountPOJO(t1.word,t1.count+t2.count);
}
});
dataSet.print();
}
public static class WordCountPOJO{
public String word ;
public int count;
public WordCountPOJO(String word, int count) {
this.word = word;
this.count = count;
}
public WordCountPOJO() {
}
@Override
public String toString() {
return "WordCountPOJO{" +
"word='" + word + '\'' +
", count=" + count +
'}';
}
}
}
03
Flink Source之Csv数据源
DataSet<String> localLines = env.readCsvFile("file:///path/to/my/csvfile");
支持本地文件地址或者HDFS文件地址
//Tuple类
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
public class CsvSourceJava1Demo {
public static void main(String[] args) throws Exception{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<String,Integer>> dataSet = env
.readCsvFile("file:///Users/zhangjingyu/Desktop/flink/src/main/resources/wordcount.csv")
//1表示输出,0表示不输出;1的个数与types类型个数要匹配
.includeFields("110")
.types(String.class,Integer.class)
.groupBy("f0")
.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {
return new Tuple2<>(t1.f0,t1.f1+t2.f1);
}
});
dataSet.print();
}
}
//POJO类
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
public class CsvSourceJava2Demo {
public static void main(String[] args) throws Exception{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet
.readCsvFile("file:///Users/zhangjingyu/Desktop/flink/src/main/resources/wordcount.csv")
.pojoType(WordCountPOJO.class,"word","count")
.groupBy("word")
.reduce(new ReduceFunction
@Override
public WordCountPOJO reduce(WordCountPOJO t1, WordCountPOJO t2) throws Exception {
return new WordCountPOJO(t1.word,t1.count+t2.count);
}
});
dataSet.print();
}
public static class WordCountPOJO{
public String word ;
public int count;
public WordCountPOJO(String word, int count) {
this.word = word;
this.count = count;
}
public WordCountPOJO() {
}
@Override
public String toString() {
return "WordCountPOJO{" +
"word='" + word + '\'' +
", count=" + count +
'}';
}
}
}
04
Flink Source之Mysql数据源
■ Flink-JDBC依赖包
pom.xml添加:
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-jdbc_2.12artifactId>
<version>1.7.2version>
dependency>
■ 添加mysql-connector-java依赖包
pom.xml添加:
<dependency>
<groupId>mysqlgroupId>
<artifactId>mysql-connector-javaartifactId>
<version>5.1.25version>
dependency>
■ 创建MySQL表
create database flink;
create table wordcount(word varchar(100) NOT NULL,count int NOT NULL)
insert into wordcount(word,count) values("spark",1);
insert into wordcount(word,count) values("hbase",10);
insert into wordcount(word,count) values("hbase",20);
insert into wordcount(word,count) values("java",10);
//Tuple类
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.types.Row;
public class MysqlSourceJava1Demo {
public static void main(String[] args) throws Exception{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Row> dbData =
env.createInput(
JDBCInputFormat.buildJDBCInputFormat()
.setDrivername("com.mysql.jdbc.Driver")
.setDBUrl("jdbc:mysql://bigdata-pro-m01.kfk.com/flink")
.setUsername("root")
.setPassword("12345678")
.setQuery("select * from wordcount")
.setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO))
.finish()
);
DataSet<Tuple2<String,Integer>> result = dbData.map(new MapFunction<Row, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(Row row) throws Exception {
return new Tuple2<>(String.valueOf(row.getField(0)),(Integer)row.getField(1));
}
})
.groupBy("f0")
.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {
return new Tuple2<>(t1.f0,t1.f1 + t2.f1);
}
});
result.print();
}
}
//POJO
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.types.Row;
public class MysqlSourceJava2Demo {
public static void main(String[] args) throws Exception{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet
env.createInput(
JDBCInputFormat.buildJDBCInputFormat()
.setDrivername("com.mysql.jdbc.Driver")
.setDBUrl("jdbc:mysql://bigdata-pro-m01.kfk.com/flink")
.setUsername("root")
.setPassword("12345678")
.setQuery("select * from wordcount")
.setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO))
.finish()
);
DataSet
@Override
public WordCountPOJO map(Row row) throws Exception {
return new WordCountPOJO(String.valueOf(row.getField(0)),(Integer) row.getField(1));
}
})
.groupBy("word")
.reduce(new ReduceFunction
@Override
public WordCountPOJO reduce(WordCountPOJO t1, WordCountPOJO t2) throws Exception {
return new WordCountPOJO(t1.word,t1.count + t2.count);
}
});
result.print();
}
public static class WordCountPOJO{
private String word ;
private int count ;
public WordCountPOJO(String word, int count) {
this.word = word;
this.count = count;
}
public WordCountPOJO() {
}
@Override
public String toString() {
return "WordCountPOJO{" +
"word='" + word + '\'' +
", count=" + count +
'}';
}
}
}
扫描二维码
关注我们
微信号 : BIGDT_IN