Flink使用java实现读取csv文件简单实例
- 首先我们来看官方文档中给出的几种方法:
首先我们来看官方文档中给出的几种方法:
第一种:
DataSet> csvInput = env.readCsvFile(“hdfs:///the/CSV/file”)
.types(Integer.class, String.class, Double.class);
第二种:
DataSet> csvInput = env.readCsvFile(“hdfs:///the/CSV/file”)
.includeFields(“10010”)
.types(String.class, Double.class);
第三种:
DataSet> csvInput = env.readCsvFile(“hdfs:///the/CSV/file”)
.pojoType(Person.class, “name”, “age”, “zipcode”);
其中第一种和第二种比较类似,无非就是后面要使用types定义接受的数据类型,对于includeFields() 方法,里面有两种传参方式,一种是includeFields(“10010”),另一种是includeFields(true,false。。。)。表示我们对于这个文件需要第几列,就写上1或者true即可,但是一定要注意一定要对应上即可
这里我们演示第三种,并且扩展一些参数的使用:
第三种是用一个POJO类型来接受数据,所以我们一定先要定义一个POJO的类,这里有很坑爹的地方就是,我们在定义属性之后,一定要加上这三个东西,
1.getter,setter方法
2.construct构造器
3.还有就是一个空的构造器,不带任何参数,这个一定要加上,否则java不识别这是一个POJO
4.把tostring方法也加上
其实我们文件中的内容如下:
name,age,add
fanglei,20,nishi
liujun,29,lalala,
woshi,18,tashi
但是我们假设不需要中间那一列,所以我们定义的POJO只有两个属性!
public class newPerson {private String name;private String address;public newPerson(){}public newPerson(String name, String address) {this.name = name;this.address = address;}public String getName() {return name;}public void setName(String name) {this.name = name;}public String getAddress() {return address;}public void setAddress(String address) {this.address = address;}@Overridepublic String toString() {return "newPerson{" +"name='" + name + '\'' +", address='" + address + '\'' +'}';}
直接上例子
public class JavaDataSetDataSourceApp {public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();csvfile(env);}public static void csvfile(ExecutionEnvironment env) throws Exception {String filePath = "C:\\Users\\77051\\Desktop\\hello.csv";DataSet csvInput = env.readCsvFile(filePath).fieldDelimiter(",").ignoreFirstLine().includeFields(true,false,true).pojoType(newPerson.class,"name","address");csvInput.print();}
}
fieldDelimiter(",")表示分隔符
ignoreFirstLine()表示是否忽略第一行
includeFields(true,false,true)表示不需要第二列
pojoType(newPerson.class,“name”,“address”);表示数据类型是newPerson,使用后面这两个属性
结果输出
newPerson{name='woshi', address='tashi'}
newPerson{name='fanglei', address='nishi'}
newPerson{name='liujun', address='lalala'}
但是有一个问题就是,当我的POJO中如果定义多了一个属性,用这种方法读取的话,结果也会把所有的age读出来,但是全部赋了0 结果如下:
Person{name=‘fanglei’, age=0, address=‘nishi’}
Person{name=‘liujun’, age=0, address=‘lalala’}
Person{name=‘woshi’, age=0, address=‘tashi’}
所以解决方法其实很简单,重写tostring方法即可,不要输出age就行了!