热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

【大数据开发】Flink之DataSet数据源(七)

点击蓝字 关注我们Flink之DataSet数据源(七)01Flink Source之Text数据源DataSet localLines = env.readT

点击蓝字 关注我们


Flink之DataSet数据源(七)

01

Flink Source之Text数据源

DataSet<String> localLines = env.readTextFlie("file:///path/to/my/textfile");

支持本地文件地址或者HDFS文件地址


//Tuple类


import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;


public class TextSourceJava1Demo {


public static void main(String[] args) throws Exception{
//批处理
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();


        DataSource<String> source = env.readTextFile("file:///Users/zhangjingyu/Desktop/flink/src/main/resources/wordcount");


DataSetString,Integer>> dataSet = source
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String s, CollectorString, Integer>> collector) throws Exception {
String[] words = s.split(" ");
for(String word : words){
collector.collect(new Tuple2<>(word,1));
}
}
})
.groupBy("f0")
.reduce(new ReduceFunctionString, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {
return new Tuple2<>(t1.f0,t1.f1+t2.f1);
}
});


dataSet.print();


}


//POJO类


import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.util.Collector;


public class TextSourceJava2Demo {


public static void main(String[] args) throws Exception{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();


        DataSource source = env.readTextFile("file:///Users/zhangjingyu/Desktop/flink/src/main/resources/wordcount");


DataSet dataSet = source
.flatMap(new FlatMapFunction() {
@Override
public void flatMap(String s, Collector collector) throws Exception {
String[] words = s.split(" ");
for(String word : words ){
collector.collect(new WordCount(word,1));
}
}
})
.groupBy("word")
.reduce(new ReduceFunction() {
@Override
public WordCount reduce(WordCount t1, WordCount t2) throws Exception {
return new WordCount(t1.word,t1.count+t2.count);
}
});


dataSet.print();


}




public static class WordCount{
private String word;
private int count;


public WordCount(String word, int count) {
this.word = word;
this.count = count;
}


public WordCount(){


}


@Override
public String toString() {
return "WordCount{" +
"word='" + word + '\'' +
", count=" + count +
'}';
}
}


}




02

Flink Source之Json数据源

DataSet<String> localLines = env.readTextFlie("file:///path/to/my/jsonfile");

支持本地文件地址或者HDFS文件地址


■ 解析JSON,需要添加依赖包jackson-databind

pom.xml中添加:

<dependency>
<groupId>com.fasterxml.jackson.coregroupId>
<artifactId>jackson-databindartifactId>
   <version>2.10.0version>
dependency>


import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;


public class JsonSourceJavaDemo {


//调用Json解析类
final static ObjectMapper objectMapper = new ObjectMapper();


public static void main(String[] args) throws Exception{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();


        DataSource source = env.readTextFile("file:///Users/zhangjingyu/Desktop/flink/src/main/resources/wordcount.json");
DataSet dataSet = source
//与text的flatmap不同,需要对json格式进行转换,所以用map
.map(new MapFunction() {
@Override
public WordCountPOJO map(String line) throws Exception {
//调用objectMapper解析Json行数据
WordCountPOJO wordCountPOJO = objectMapper.readValue(line,WordCountPOJO.class);
return wordCountPOJO;
}
})
.groupBy("word")
.reduce(new ReduceFunction() {
@Override
public WordCountPOJO reduce(WordCountPOJO t1, WordCountPOJO t2) throws Exception {
return new WordCountPOJO(t1.word,t1.count+t2.count);
}
});


dataSet.print();


}


public static class WordCountPOJO{
public String word ;
public int count;


public WordCountPOJO(String word, int count) {
this.word = word;
this.count = count;
}


public WordCountPOJO() {
}


@Override
public String toString() {
return "WordCountPOJO{" +
"word='" + word + '\'' +
", count=" + count +
'}';
}
}
}



03

Flink Source之Csv数据源

DataSet<String> localLines = env.readCsvFile("file:///path/to/my/csvfile");

支持本地文件地址或者HDFS文件地址


//Tuple类
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;


public class CsvSourceJava1Demo {


public static void main(String[] args) throws Exception{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();


DataSet<Tuple2<String,Integer>> dataSet = env
                .readCsvFile("file:///Users/zhangjingyu/Desktop/flink/src/main/resources/wordcount.csv")
//1表示输出,0表示不输出;1的个数与types类型个数要匹配
.includeFields("110")
.types(String.class,Integer.class)
.groupBy("f0")
.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {
return new Tuple2<>(t1.f0,t1.f1+t2.f1);
}
});


dataSet.print();


}
}

//POJO类


import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;


public class CsvSourceJava2Demo {


public static void main(String[] args) throws Exception{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();


DataSet dataSet = env
                .readCsvFile("file:///Users/zhangjingyu/Desktop/flink/src/main/resources/wordcount.csv")
.pojoType(WordCountPOJO.class,"word","count")
.groupBy("word")
.reduce(new ReduceFunction() {
@Override
public WordCountPOJO reduce(WordCountPOJO t1, WordCountPOJO t2) throws Exception {
return new WordCountPOJO(t1.word,t1.count+t2.count);
}
});
dataSet.print();
}


public static class WordCountPOJO{
public String word ;
public int count;


public WordCountPOJO(String word, int count) {
this.word = word;
this.count = count;
}


public WordCountPOJO() {
}


@Override
public String toString() {
return "WordCountPOJO{" +
"word='" + word + '\'' +
", count=" + count +
'}';
}
}
}



04

Flink Source之Mysql数据源

■ Flink-JDBC依赖包

pom.xml添加:

<dependency>
   <groupId>org.apache.flinkgroupId>
   <artifactId>flink-jdbc_2.12artifactId>
   <version>1.7.2version>
dependency>


■ 添加mysql-connector-java依赖包

pom.xml添加:

<dependency>
   <groupId>mysqlgroupId>
   <artifactId>mysql-connector-javaartifactId>
   <version>5.1.25version>
dependency>


■ 创建MySQL表

create database flink;
create table wordcount(word varchar(100) NOT NULL,count int NOT NULL)
insert into wordcount(word,count) values("spark",1);
insert into wordcount(word,count) values("hbase",10);
insert into wordcount(word,count) values("hbase",20);
insert into wordcount(word,count) values("java",10);


//Tuple类


import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.types.Row;


public class MysqlSourceJava1Demo {
public static void main(String[] args) throws Exception{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();


DataSet<Row> dbData =
env.createInput(
JDBCInputFormat.buildJDBCInputFormat()
.setDrivername("com.mysql.jdbc.Driver")
.setDBUrl("jdbc:mysql://bigdata-pro-m01.kfk.com/flink")
.setUsername("root")
.setPassword("12345678")
.setQuery("select * from wordcount")
.setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO))
.finish()
);
DataSet<Tuple2<String,Integer>> result = dbData.map(new MapFunction<Row, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(Row row) throws Exception {
return new Tuple2<>(String.valueOf(row.getField(0)),(Integer)row.getField(1));
}
})
.groupBy("f0")
.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {
return new Tuple2<>(t1.f0,t1.f1 + t2.f1);
}
});


result.print();
}
}

//POJO


import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.types.Row;


public class MysqlSourceJava2Demo {
public static void main(String[] args) throws Exception{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet dbData =
env.createInput(
JDBCInputFormat.buildJDBCInputFormat()
.setDrivername("com.mysql.jdbc.Driver")
.setDBUrl("jdbc:mysql://bigdata-pro-m01.kfk.com/flink")
.setUsername("root")
.setPassword("12345678")
.setQuery("select * from wordcount")
.setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO))
.finish()
);


DataSet result = dbData.map(new MapFunction() {
@Override
public WordCountPOJO map(Row row) throws Exception {
return new WordCountPOJO(String.valueOf(row.getField(0)),(Integer) row.getField(1));
}
})
.groupBy("word")
.reduce(new ReduceFunction() {
@Override
public WordCountPOJO reduce(WordCountPOJO t1, WordCountPOJO t2) throws Exception {
return new WordCountPOJO(t1.word,t1.count + t2.count);
}
});
result.print();
}


public static class WordCountPOJO{
private String word ;
private int count ;


public WordCountPOJO(String word, int count) {
this.word = word;
this.count = count;
}


public WordCountPOJO() {
}


@Override
public String toString() {
return "WordCountPOJO{" +
"word='" + word + '\'' +
", count=" + count +
'}';
}
}
}

扫描二维码

关注我们

微信号 : BIGDT_IN 




推荐阅读
  • XML介绍与使用的概述及标签规则
    本文介绍了XML的基本概念和用途,包括XML的可扩展性和标签的自定义特性。同时还详细解释了XML标签的规则,包括标签的尖括号和合法标识符的组成,标签必须成对出现的原则以及特殊标签的使用方法。通过本文的阅读,读者可以对XML的基本知识有一个全面的了解。 ... [详细]
  • 利用Visual Basic开发SAP接口程序初探的方法与原理
    本文介绍了利用Visual Basic开发SAP接口程序的方法与原理,以及SAP R/3系统的特点和二次开发平台ABAP的使用。通过程序接口自动读取SAP R/3的数据表或视图,在外部进行处理和利用水晶报表等工具生成符合中国人习惯的报表样式。具体介绍了RFC调用的原理和模型,并强调本文主要不讨论SAP R/3函数的开发,而是针对使用SAP的公司的非ABAP开发人员提供了初步的接口程序开发指导。 ... [详细]
  • 正则表达式及其范例
    为什么80%的码农都做不了架构师?一、前言部分控制台输入的字符串,编译成java字符串之后才送进内存,比如控制台打\, ... [详细]
  • 本文介绍了闭包的定义和运转机制,重点解释了闭包如何能够接触外部函数的作用域中的变量。通过词法作用域的查找规则,闭包可以访问外部函数的作用域。同时还提到了闭包的作用和影响。 ... [详细]
  • Iamtryingtomakeaclassthatwillreadatextfileofnamesintoanarray,thenreturnthatarra ... [详细]
  • Java容器中的compareto方法排序原理解析
    本文从源码解析Java容器中的compareto方法的排序原理,讲解了在使用数组存储数据时的限制以及存储效率的问题。同时提到了Redis的五大数据结构和list、set等知识点,回忆了作者大学时代的Java学习经历。文章以作者做的思维导图作为目录,展示了整个讲解过程。 ... [详细]
  • Spring特性实现接口多类的动态调用详解
    本文详细介绍了如何使用Spring特性实现接口多类的动态调用。通过对Spring IoC容器的基础类BeanFactory和ApplicationContext的介绍,以及getBeansOfType方法的应用,解决了在实际工作中遇到的接口及多个实现类的问题。同时,文章还提到了SPI使用的不便之处,并介绍了借助ApplicationContext实现需求的方法。阅读本文,你将了解到Spring特性的实现原理和实际应用方式。 ... [详细]
  • 本文讨论了一个关于cuowu类的问题,作者在使用cuowu类时遇到了错误提示和使用AdjustmentListener的问题。文章提供了16个解决方案,并给出了两个可能导致错误的原因。 ... [详细]
  • Voicewo在线语音识别转换jQuery插件的特点和示例
    本文介绍了一款名为Voicewo的在线语音识别转换jQuery插件,该插件具有快速、架构、风格、扩展和兼容等特点,适合在互联网应用中使用。同时还提供了一个快速示例供开发人员参考。 ... [详细]
  • 本文介绍了Web学习历程记录中关于Tomcat的基本概念和配置。首先解释了Web静态Web资源和动态Web资源的概念,以及C/S架构和B/S架构的区别。然后介绍了常见的Web服务器,包括Weblogic、WebSphere和Tomcat。接着详细讲解了Tomcat的虚拟主机、web应用和虚拟路径映射的概念和配置过程。最后简要介绍了http协议的作用。本文内容详实,适合初学者了解Tomcat的基础知识。 ... [详细]
  • 展开全部下面的代码是创建一个立方体Thisexamplescreatesanddisplaysasimplebox.#Thefirstlineloadstheinit_disp ... [详细]
  • Html5-Canvas实现简易的抽奖转盘效果
    本文介绍了如何使用Html5和Canvas标签来实现简易的抽奖转盘效果,同时使用了jQueryRotate.js旋转插件。文章中给出了主要的html和css代码,并展示了实现的基本效果。 ... [详细]
  • C++字符字符串处理及字符集编码方案
    本文介绍了C++中字符字符串处理的问题,并详细解释了字符集编码方案,包括UNICODE、Windows apps采用的UTF-16编码、ASCII、SBCS和DBCS编码方案。同时说明了ANSI C标准和Windows中的字符/字符串数据类型实现。文章还提到了在编译时需要定义UNICODE宏以支持unicode编码,否则将使用windows code page编译。最后,给出了相关的头文件和数据类型定义。 ... [详细]
  • 本文介绍了在sqoop1.4.*版本中,如何实现自定义分隔符的方法及步骤。通过修改sqoop生成的java文件,并重新编译,可以满足实际开发中对分隔符的需求。具体步骤包括修改java文件中的一行代码,重新编译所需的hadoop包等。详细步骤和编译方法在本文中都有详细说明。 ... [详细]
  • 如何优化Webpack打包后的代码分割
    本文介绍了如何通过优化Webpack的代码分割来减小打包后的文件大小。主要包括拆分业务逻辑代码和引入第三方包的代码、配置Webpack插件、异步代码的处理、代码分割重命名、配置vendors和cacheGroups等方面的内容。通过合理配置和优化,可以有效减小打包后的文件大小,提高应用的加载速度。 ... [详细]
author-avatar
Posion丶丨
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有