热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

org.apache.flink.api.java.ExecutionEnvironment.readCsvFile()方法的使用及代码示例

本文整理了Java中org.apache.flink.api.java.ExecutionEnvironment.readCsvFile()方法的一些代码示例,展示了

本文整理了Java中org.apache.flink.api.java.ExecutionEnvironment.readCsvFile()方法的一些代码示例,展示了ExecutionEnvironment.readCsvFile()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ExecutionEnvironment.readCsvFile()方法的具体详情如下:
包路径:org.apache.flink.api.java.ExecutionEnvironment
类名称:ExecutionEnvironment
方法名:readCsvFile

ExecutionEnvironment.readCsvFile介绍

[英]Creates a CSV reader to read a comma separated value (CSV) file. The reader has options to define parameters and field types and will eventually produce the DataSet that corresponds to the read and parsed CSV input.
[中]创建CSV读取器以读取逗号分隔值(CSV)文件。读取器可以选择定义参数和字段类型,并最终生成与读取和解析的CSV输入相对应的数据集。

代码示例

代码示例来源:origin: apache/flink

private static DataSet> getNationsDataSet(ExecutionEnvironment env, String nationPath) {
return env.readCsvFile(nationPath)
.fieldDelimiter("|")
.includeFields("1100")
.types(Integer.class, String.class);
}
}

代码示例来源:origin: apache/flink

private static DataSet> getOrdersDataSet(ExecutionEnvironment env, String ordersPath) {
return env.readCsvFile(ordersPath)
.fieldDelimiter("|")
.includeFields("110010000")
.types(Integer.class, Integer.class, String.class);
}

代码示例来源:origin: apache/flink

private static DataSet> getCustomerDataSet(ExecutionEnvironment env, String customerPath) {
return env.readCsvFile(customerPath)
.fieldDelimiter("|")
.includeFields("11110100")
.types(Integer.class, String.class, String.class, Integer.class, Double.class);
}

代码示例来源:origin: apache/flink

private static DataSet> getLineitemDataSet(ExecutionEnvironment env, String lineitemPath) {
return env.readCsvFile(lineitemPath)
.fieldDelimiter("|")
.includeFields("1000011010000000")
.types(Integer.class, Double.class, Double.class, String.class);
}

代码示例来源:origin: apache/flink

private static DataSet> getDocumentsDataSet(ExecutionEnvironment env, ParameterTool params) {
// Create DataSet for documents relation (URL, Doc-Text)
if (params.has("documents")) {
return env.readCsvFile(params.get("documents"))
.fieldDelimiter("|")
.types(String.class, String.class);
} else {
System.out.println("Executing WebLogAnalysis example with default documents data set.");
System.out.println("Use --documents to specify file input.");
return WebLogData.getDocumentDataSet(env);
}
}

代码示例来源:origin: apache/flink

private static DataSet> getRanksDataSet(ExecutionEnvironment env, ParameterTool params) {
// Create DataSet for ranks relation (Rank, URL, Avg-Visit-Duration)
if (params.has("ranks")) {
return env.readCsvFile(params.get("ranks"))
.fieldDelimiter("|")
.types(Integer.class, String.class, Integer.class);
} else {
System.out.println("Executing WebLogAnalysis example with default ranks data set.");
System.out.println("Use --ranks to specify file input.");
return WebLogData.getRankDataSet(env);
}
}

代码示例来源:origin: apache/flink

private static DataSet> getEdgeDataSet(ExecutionEnvironment env, ParameterTool params) {
if (params.has("edges")) {
return env.readCsvFile(params.get("edges")).fieldDelimiter(" ").types(Long.class, Long.class);
} else {
System.out.println("Executing Connected Components example with default edges data set.");
System.out.println("Use --edges to specify file input.");
return ConnectedComponentsData.getDefaultEdgeDataSet(env);
}
}
}

代码示例来源:origin: apache/flink

private static DataSet> getUserSongTripletsData(ExecutionEnvironment env) {
if (fileOutput) {
return env.readCsvFile(userSongTripletsInputPath)
.lineDelimiter("\n").fieldDelimiter("\t")
.types(String.class, String.class, Integer.class);
} else {
return MusicProfilesData.getUserSongTriplets(env);
}
}

代码示例来源:origin: apache/flink

private static DataSet getVertexDataSet(ExecutionEnvironment env, ParameterTool params) {
if (params.has("vertices")) {
return env.readCsvFile(params.get("vertices")).types(Long.class).map(
new MapFunction, Long>() {
public Long map(Tuple1 value) {
return value.f0;
}
});
} else {
System.out.println("Executing Connected Components example with default vertices data set.");
System.out.println("Use --vertices to specify file input.");
return ConnectedComponentsData.getDefaultVertexDataSet(env);
}
}

代码示例来源:origin: apache/flink

private static DataSet> getVerticesDataSet(ExecutionEnvironment env) {
if (fileOutput) {
return env.readCsvFile(verticesInputPath)
.lineDelimiter("\n")
.types(Long.class, Double.class, Double.class)
.map(new MapFunction, Vertex>() {
@Override
public Vertex map(Tuple3 value) throws Exception {
return new Vertex<>(value.f0, new Point(value.f1, value.f2));
}
});
} else {
return EuclideanGraphData.getDefaultVertexDataSet(env);
}
}

代码示例来源:origin: apache/flink

private static DataSet> getVisitsDataSet(ExecutionEnvironment env, ParameterTool params) {
// Create DataSet for visits relation (URL, Date)
if (params.has("visits")) {
return env.readCsvFile(params.get("visits"))
.fieldDelimiter("|")
.includeFields("011000000")
.types(String.class, String.class);
} else {
System.out.println("Executing WebLogAnalysis example with default visits data set.");
System.out.println("Use --visits to specify file input.");
return WebLogData.getVisitDataSet(env);
}
}

代码示例来源:origin: apache/flink

private static DataSet> getLinksDataSet(ExecutionEnvironment env, ParameterTool params) {
if (params.has("links")) {
return env.readCsvFile(params.get("links"))
.fieldDelimiter(" ")
.lineDelimiter("\n")
.types(Long.class, Long.class);
} else {
System.out.println("Executing PageRank example with default links data set.");
System.out.println("Use --links to specify file input.");
return PageRankData.getDefaultEdgeDataSet(env);
}
}
}

代码示例来源:origin: apache/flink

private static DataSet> getEdgesDataSet(ExecutionEnvironment env) {
if (fileOutput) {
return env.readCsvFile(edgesInputPath)
.lineDelimiter("\n")
.types(Long.class, Long.class)
.map(new MapFunction, Edge>() {
@Override
public Edge map(Tuple2 tuple2) throws Exception {
return new Edge<>(tuple2.f0, tuple2.f1, 0.0);
}
});
} else {
return EuclideanGraphData.getDefaultEdgeDataSet(env);
}
}
}

代码示例来源:origin: apache/flink

private static DataSet> getEdgeDataSet(ExecutionEnvironment env) {
if (fileOutput) {
return env.readCsvFile(edgesInputPath)
.fieldDelimiter("\t")
.lineDelimiter("\n")
.types(Long.class, Long.class, Double.class)
.map(new Tuple3ToEdgeMap<>());
} else {
return SingleSourceShortestPathsData.getDefaultEdgeDataSet(env);
}
}

代码示例来源:origin: apache/flink

private static DataSet> getEdgesDataSet(ExecutionEnvironment env) {
if (fileOutput) {
return env.readCsvFile(edgesInputPath)
.lineDelimiter("\n")
.fieldDelimiter("\t")
.types(Long.class, Long.class, Double.class)
.map(new Tuple3ToEdgeMap<>());
} else {
return SingleSourceShortestPathsData.getDefaultEdgeDataSet(env);
}
}

代码示例来源:origin: apache/flink

private static DataSet getPagesDataSet(ExecutionEnvironment env, ParameterTool params) {
if (params.has("pages")) {
return env.readCsvFile(params.get("pages"))
.fieldDelimiter(" ")
.lineDelimiter("\n")
.types(Long.class)
.map(new MapFunction, Long>() {
@Override
public Long map(Tuple1 v) {
return v.f0;
}
});
} else {
System.out.println("Executing PageRank example with default pages data set.");
System.out.println("Use --pages to specify file input.");
return PageRankData.getDefaultPagesDataSet(env);
}
}

代码示例来源:origin: apache/flink

private static DataSet> getEdgesDataSet(ExecutionEnvironment env) {
if (fileOutput) {
return env.readCsvFile(edgesInputPath)
.lineDelimiter("\n")
.fieldDelimiter("\t")
.ignoreComments("%")
.types(Long.class, Long.class, Double.class)
.map(new Tuple3ToEdgeMap<>());
} else {
return SingleSourceShortestPathsData.getDefaultEdgeDataSet(env);
}
}

代码示例来源:origin: apache/flink

@Test
public void testValueTypes() throws Exception {
final String inputData = "ABC,true,1,2,3,4,5.0,6.0\nBCD,false,1,2,3,4,5.0,6.0";
final String dataPath = createInputData(inputData);
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet> data =
env.readCsvFile(dataPath).types(StringValue.class, BooleanValue.class, ByteValue.class, ShortValue.class, IntValue.class, LongValue.class, FloatValue.class, DoubleValue.class);
List> result = data.collect();
expected = inputData;
compareResultAsTuples(result, expected);
}

代码示例来源:origin: apache/flink

@SuppressWarnings("serial")
public static void main(String[] args) throws Exception {
if (args.length <2) {
System.err.println("Usage: TestOptimizerPlan ");
return;
}
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet> input = env.readCsvFile(args[0])
.fieldDelimiter("\t").types(Long.class, Long.class);
DataSet> result = input.map(
new MapFunction, Tuple2>() {
public Tuple2 map(Tuple2 value){
return new Tuple2(value.f0, value.f1 + 1);
}
});
result.writeAsCsv(args[1], "\n", "\t");
env.execute();
}

代码示例来源:origin: apache/flink

@SuppressWarnings("serial")
public static void main(String[] args) throws Exception {
if (args.length <2) {
System.err.println("Usage: TestOptimizerPlan ");
return;
}
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet> input = env.readCsvFile(args[0])
.fieldDelimiter("\t").types(Long.class, Long.class);
DataSet> result = input.map(
new MapFunction, Tuple2>() {
public Tuple2 map(Tuple2 value){
return new Tuple2(value.f0, value.f1 + 1);
}
});
result.writeAsCsv(args[1], "\n", "\t");
env.execute();
}

推荐阅读
author-avatar
心情爱心_634
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有