一、
二、
在这里插入代import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.junit.Test;
import java.util.Properties;
public class LookUpAsyncTest {
@Test
public void test() throws Exception {
LookUpAsyncTest.main(new String[]{});
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//env.setParallelism(1);
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
final ParameterTool params = ParameterTool.fromArgs(args);
String fileName;
if (params.get("f") != null)
fileName = params.get("f");
else
fileName = "/flink/userClick_Random_100W";
DataStream<String> source = env.readTextFile("hdfs://172.16.44.28:8020" + fileName, "UTF-8");
TypeInformation[] types = new TypeInformation[]{Types.STRING, Types.STRING, Types.LONG};
String[] fields = new String[]{"id", "user_click", "time"};
RowTypeInfo typeInformation = new RowTypeInfo(types, fields);
DataStream<Row> stream = source.map(new MapFunction<String, Row>() {
private static final long serialVersionUID = 2349572544179673349L;
@Override
public Row map(String s) {
String[] split = s.split(",");
Row row = new Row(split.length);
for (int i = 0; i < split.length; i++) {
Object value = split[i];
if (types[i].equals(Types.STRING)) {
value = split[i];
}
if (types[i].equals(Types.LONG)) {
value = Long.valueOf(split[i]);
}
row.setField(i, value);
}
return row;
}
}).returns(typeInformation);
tableEnv.registerDataStream("user_click_name", stream, String.join(",", typeInformation.getFieldNames()) + ",proctime.proctime");
RedisAsyncLookupTableSource tableSource = RedisAsyncLookupTableSource.Builder.newBuilder()
.withFieldNames(new String[]{"id", "name"})
.withFieldTypes(new TypeInformation[]{Types.STRING, Types.STRING})
.build();
tableEnv.registerTableSource("info", tableSource);
String sql = "select t1.id,t1.user_click,t2.name" +
" from user_click_name as t1" +
" join info FOR SYSTEM_TIME AS OF t1.proctime as t2" +
" on t1.id = t2.id";
Table table = tableEnv.sqlQuery(sql);
DataStream<Row> result = tableEnv.toAppendStream(table, Row.class);
// result.print().setParallelism(1);
DataStream<String> printStream = result.map(new MapFunction<Row, String>() {
@Override
public String map(Row value) throws Exception {
return value.toString();
}
});
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "172.16.12.148:9094");
FlinkKafkaProducer011<String> kafkaProducer = new FlinkKafkaProducer011<>(// broker list
"user_click_name", // target topic
new SimpleStringSchema(),
properties);
printStream.addSink(kafkaProducer);
tableEnv.execute(Thread.currentThread().getStackTrace()[1].getClassName());
}
}
三、
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
public class TableSQL {
public static void main(String[] args) throws Exception{
//1\. 获取上下文环境 table的环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = BatchTableEnvironment.getTableEnvironment(env);
//2\. 读取score.csv
DataSet<String> input = env.readTextFile("score.csv");
input.print();
DataSet<PlayerData> topInput = input.map(new MapFunction<String, PlayerData>() {
@Override
public PlayerData map(String s) throws Exception {
String[] split = s.split(",");
return new PlayerData(String.valueOf(split[0]),
String.valueOf(split[1]),
String.valueOf(split[2]),
Integer.valueOf(split[3]),
Double.valueOf(split[4]),
Double.valueOf(split[5]),
Double.valueOf(split[6]),
Double.valueOf(split[7]),
Double.valueOf(split[8])
);
}
});
//3\. 注册成内存表
Table topScore = tableEnv.fromDataSet(topInput);
tableEnv.registerTable("score", topScore);
//4\. 编写sql 然后提交执行
//select player, count(season) as num from score group by player order by num desc;
Table queryResult = tableEnv.sqlQuery("select player, count(season) as num from score group by player order by num desc limit 3");
//5\. 结果进行打印
DataSet<Result> result = tableEnv.toDataSet(queryResult, Result.class);
result.print();
}
public static class PlayerData {
/**
* 赛季,球员,出场,首发,时间,助攻,抢断,盖帽,得分
*/
public String season;
public String player;
public String play_num;
public Integer first_court;
public Double time;
public Double assists;
public Double steals;
public Double blocks;
public Double scores;
public PlayerData() {
super();
}
public PlayerData(String season,
String player,
String play_num,
Integer first_court,
Double time,
Double assists,
Double steals,
Double blocks,
Double scores
) {
this.season = season;
this.player = player;
this.play_num = play_num;
this.first_court = first_court;
this.time = time;
this.assists = assists;
this.steals = steals;
this.blocks = blocks;
this.scores = scores;
}
}
public static class Result {
public String player;
public Long num;
public Result() {
super();
}
public Result(String player, Long num) {
this.player = player;
this.num = num;
}
@Override
public String toString() {
return player + ":" + num;
}
}
}//
当然我们也可以自定义一个 Sink,将结果输出到一个文件中,例如:
TableSink sink = new CsvTableSink("/home/result.csv", ",");
String[] fieldNames = {"name", "num"};
TypeInformation[] fieldTypes = {Types.STRING, Types.INT};
tableEnv.registerTableSink("result", fieldNames, fieldTypes, sink);
sqlQuery.insertInto("result");
env.execute();
然后我们运行程序,可以看到 /home 目录下生成的 result.csv,查看结果:
迈克尔-乔丹,10
凯文-杜兰特,4
阿伦-艾弗森,4
https://blog.csdn.net/u013411339/article/details/93267838
https://blog.csdn.net/u012554509/article/details/100533749