本文整理了Java中org.apache.flink.api.java.ExecutionEnvironment.readCsvFile()
方法的一些代码示例,展示了ExecutionEnvironment.readCsvFile()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ExecutionEnvironment.readCsvFile()
方法的具体详情如下:
包路径:org.apache.flink.api.java.ExecutionEnvironment
类名称:ExecutionEnvironment
方法名:readCsvFile
[英]Creates a CSV reader to read a comma separated value (CSV) file. The reader has options to define parameters and field types and will eventually produce the DataSet that corresponds to the read and parsed CSV input.
[中]创建CSV读取器以读取逗号分隔值(CSV)文件。读取器可以选择定义参数和字段类型,并最终生成与读取和解析的CSV输入相对应的数据集。
代码示例来源:origin: apache/flink
private static DataSet
return env.readCsvFile(nationPath)
.fieldDelimiter("|")
.includeFields("1100")
.types(Integer.class, String.class);
}
}
代码示例来源:origin: apache/flink
private static DataSet
return env.readCsvFile(ordersPath)
.fieldDelimiter("|")
.includeFields("110010000")
.types(Integer.class, Integer.class, String.class);
}
代码示例来源:origin: apache/flink
private static DataSet
return env.readCsvFile(customerPath)
.fieldDelimiter("|")
.includeFields("11110100")
.types(Integer.class, String.class, String.class, Integer.class, Double.class);
}
代码示例来源:origin: apache/flink
private static DataSet
return env.readCsvFile(lineitemPath)
.fieldDelimiter("|")
.includeFields("1000011010000000")
.types(Integer.class, Double.class, Double.class, String.class);
}
代码示例来源:origin: apache/flink
private static DataSet
// Create DataSet for documents relation (URL, Doc-Text)
if (params.has("documents")) {
return env.readCsvFile(params.get("documents"))
.fieldDelimiter("|")
.types(String.class, String.class);
} else {
System.out.println("Executing WebLogAnalysis example with default documents data set.");
System.out.println("Use --documents to specify file input.");
return WebLogData.getDocumentDataSet(env);
}
}
代码示例来源:origin: apache/flink
private static DataSet
// Create DataSet for ranks relation (Rank, URL, Avg-Visit-Duration)
if (params.has("ranks")) {
return env.readCsvFile(params.get("ranks"))
.fieldDelimiter("|")
.types(Integer.class, String.class, Integer.class);
} else {
System.out.println("Executing WebLogAnalysis example with default ranks data set.");
System.out.println("Use --ranks to specify file input.");
return WebLogData.getRankDataSet(env);
}
}
代码示例来源:origin: apache/flink
private static DataSet
if (params.has("edges")) {
return env.readCsvFile(params.get("edges")).fieldDelimiter(" ").types(Long.class, Long.class);
} else {
System.out.println("Executing Connected Components example with default edges data set.");
System.out.println("Use --edges to specify file input.");
return ConnectedComponentsData.getDefaultEdgeDataSet(env);
}
}
}
代码示例来源:origin: apache/flink
private static DataSet
if (fileOutput) {
return env.readCsvFile(userSongTripletsInputPath)
.lineDelimiter("\n").fieldDelimiter("\t")
.types(String.class, String.class, Integer.class);
} else {
return MusicProfilesData.getUserSongTriplets(env);
}
}
代码示例来源:origin: apache/flink
private static DataSet
if (params.has("vertices")) {
return env.readCsvFile(params.get("vertices")).types(Long.class).map(
new MapFunction
public Long map(Tuple1
return value.f0;
}
});
} else {
System.out.println("Executing Connected Components example with default vertices data set.");
System.out.println("Use --vertices to specify file input.");
return ConnectedComponentsData.getDefaultVertexDataSet(env);
}
}
代码示例来源:origin: apache/flink
private static DataSet
if (fileOutput) {
return env.readCsvFile(verticesInputPath)
.lineDelimiter("\n")
.types(Long.class, Double.class, Double.class)
.map(new MapFunction
@Override
public Vertex
return new Vertex<>(value.f0, new Point(value.f1, value.f2));
}
});
} else {
return EuclideanGraphData.getDefaultVertexDataSet(env);
}
}
代码示例来源:origin: apache/flink
private static DataSet
// Create DataSet for visits relation (URL, Date)
if (params.has("visits")) {
return env.readCsvFile(params.get("visits"))
.fieldDelimiter("|")
.includeFields("011000000")
.types(String.class, String.class);
} else {
System.out.println("Executing WebLogAnalysis example with default visits data set.");
System.out.println("Use --visits to specify file input.");
return WebLogData.getVisitDataSet(env);
}
}
代码示例来源:origin: apache/flink
private static DataSet
if (params.has("links")) {
return env.readCsvFile(params.get("links"))
.fieldDelimiter(" ")
.lineDelimiter("\n")
.types(Long.class, Long.class);
} else {
System.out.println("Executing PageRank example with default links data set.");
System.out.println("Use --links to specify file input.");
return PageRankData.getDefaultEdgeDataSet(env);
}
}
}
代码示例来源:origin: apache/flink
private static DataSet
if (fileOutput) {
return env.readCsvFile(edgesInputPath)
.lineDelimiter("\n")
.types(Long.class, Long.class)
.map(new MapFunction
@Override
public Edge
return new Edge<>(tuple2.f0, tuple2.f1, 0.0);
}
});
} else {
return EuclideanGraphData.getDefaultEdgeDataSet(env);
}
}
}
代码示例来源:origin: apache/flink
private static DataSet
if (fileOutput) {
return env.readCsvFile(edgesInputPath)
.fieldDelimiter("\t")
.lineDelimiter("\n")
.types(Long.class, Long.class, Double.class)
.map(new Tuple3ToEdgeMap<>());
} else {
return SingleSourceShortestPathsData.getDefaultEdgeDataSet(env);
}
}
代码示例来源:origin: apache/flink
private static DataSet
if (fileOutput) {
return env.readCsvFile(edgesInputPath)
.lineDelimiter("\n")
.fieldDelimiter("\t")
.types(Long.class, Long.class, Double.class)
.map(new Tuple3ToEdgeMap<>());
} else {
return SingleSourceShortestPathsData.getDefaultEdgeDataSet(env);
}
}
代码示例来源:origin: apache/flink
private static DataSet
if (params.has("pages")) {
return env.readCsvFile(params.get("pages"))
.fieldDelimiter(" ")
.lineDelimiter("\n")
.types(Long.class)
.map(new MapFunction
@Override
public Long map(Tuple1
return v.f0;
}
});
} else {
System.out.println("Executing PageRank example with default pages data set.");
System.out.println("Use --pages to specify file input.");
return PageRankData.getDefaultPagesDataSet(env);
}
}
代码示例来源:origin: apache/flink
private static DataSet
if (fileOutput) {
return env.readCsvFile(edgesInputPath)
.lineDelimiter("\n")
.fieldDelimiter("\t")
.ignoreComments("%")
.types(Long.class, Long.class, Double.class)
.map(new Tuple3ToEdgeMap<>());
} else {
return SingleSourceShortestPathsData.getDefaultEdgeDataSet(env);
}
}
代码示例来源:origin: apache/flink
@Test
public void testValueTypes() throws Exception {
final String inputData = "ABC,true,1,2,3,4,5.0,6.0\nBCD,false,1,2,3,4,5.0,6.0";
final String dataPath = createInputData(inputData);
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet
env.readCsvFile(dataPath).types(StringValue.class, BooleanValue.class, ByteValue.class, ShortValue.class, IntValue.class, LongValue.class, FloatValue.class, DoubleValue.class);
List
expected = inputData;
compareResultAsTuples(result, expected);
}
代码示例来源:origin: apache/flink
@SuppressWarnings("serial")
public static void main(String[] args) throws Exception {
if (args.length <2) {
System.err.println("Usage: TestOptimizerPlan
return;
}
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet
.fieldDelimiter("\t").types(Long.class, Long.class);
DataSet
new MapFunction
public Tuple2
return new Tuple2
}
});
result.writeAsCsv(args[1], "\n", "\t");
env.execute();
}
代码示例来源:origin: apache/flink
@SuppressWarnings("serial")
public static void main(String[] args) throws Exception {
if (args.length <2) {
System.err.println("Usage: TestOptimizerPlan
return;
}
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet
.fieldDelimiter("\t").types(Long.class, Long.class);
DataSet
new MapFunction
public Tuple2
return new Tuple2
}
});
result.writeAsCsv(args[1], "\n", "\t");
env.execute();
}