Flink Table Api & SQL 初体验,Blink的使用
概述
Flink具有Table API和SQL-用于统一流和批处理。
Table API是用于Scala和Java的语言集成查询API,它允许以非常直观的方式组合来自关系运算符(例如选择,过滤和联接)的查询。
Flink的SQL支持基于实现SQL标准的Apache Calcite。无论输入是批处理输入(DataSet)还是流输入(DataStream),在两个接口中指定的查询都具有相同的语义并指定相同的结果。
Table API和SQL尚未完成所有功能,正在积极开发中,支持程度需查看 官方文档
使用
多表连接案例
pom依赖
flink 版本为:1.9.3
org.apache.flinkflink-java${flink.version}providedorg.apache.flinkflink-streaming-java_${scala.binary.version}${flink.version}org.apache.flinkflink-table-api-java-bridge_2.11${flink.version}org.apache.flinkflink-table-planner-blink_2.11${flink.version}org.apache.flinkflink-table-api-java${flink.version}
模拟一个实时流
import lombok.Data;
@Data
public class Product {public Integer id;public String seasonType;
}
自定义Source
import common.Product;
import org.apache.flink.streaming.api.functions.source.SourceFunction;import java.util.ArrayList;
import java.util.Random;public class ProductStremingSource implements SourceFunction {private boolean isRunning = true;@Overridepublic void run(SourceContext ctx) throws Exception {while (isRunning){// 每一秒钟产生一条数据Product product = generateProduct();ctx.collect(product);Thread.sleep(1000);}}private Product generateProduct(){int i = new Random().nextInt(100);ArrayList list = new ArrayList();list.add("spring");list.add("summer");list.add("autumn");list.add("winter");Product product = new Product();product.setSeasonType(list.get(new Random().nextInt(4)));product.setId(i);return product;}@Overridepublic void cancel() {}
}
主程序
public class TableStremingDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();// 使用BlinkEnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);SingleOutputStreamOperator- source = bsEnv.addSource(new MyStremingSource()).map(new MapFunction
- () {@Overridepublic Item map(Item value) throws Exception {return value;}});// 分割流final OutputTag
- even = new OutputTag
- ("even") {};final OutputTag
- old = new OutputTag
- ("old") {};SingleOutputStreamOperator
- sideOutputData = source.process(new ProcessFunction
- () {@Overridepublic void processElement(Item value, Context ctx, Collector
- out) throws Exception {if (value.getId() % 2 == 0) {ctx.output(even,value);}else{ctx.output(old,value);}}});DataStream
- evenStream = sideOutputData.getSideOutput(even);DataStream
- oldStream = sideOutputData.getSideOutput(old);// 注册两个 表 : evenTable,oddTablebsTableEnv.registerDataStream("evenTable",evenStream , "name,id");bsTableEnv.registerDataStream("oddTable", oldStream, "name,id");// 执行sql 输出TableTable queryTable = bsTableEnv.sqlQuery("select a.id,a.name,b.id,b.name from evenTable as a join oddTable as b on a.name = b.name");queryTable.printSchema();;// 获取流DataStream>> dataStream = bsTableEnv.toRetractStream(queryTable, TypeInformation.of(new TypeHint>(){}));dataStream.print();bsEnv.execute("demo");}
}
结果打印
输出name相同的元素。
总结
简单的介绍了Flink Table Api & SQL和实现了两表连接的示例。
更多文章:www.ipooli.com
扫码关注公众号《ipoo》