热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

几个flinksql例子

一、

一、在这里插入图片描述
二、

在这里插入代import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.junit.Test;

import java.util.Properties;

public class LookUpAsyncTest {

@Test
public void test() throws Exception {
LookUpAsyncTest.main(new String[]{});
}

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//env.setParallelism(1);
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

final ParameterTool params = ParameterTool.fromArgs(args);
String fileName;
if (params.get("f") != null)
fileName = params.get("f");
else
fileName = "/flink/userClick_Random_100W";

DataStream<String> source = env.readTextFile("hdfs://172.16.44.28:8020" + fileName, "UTF-8");

TypeInformation[] types = new TypeInformation[]{Types.STRING, Types.STRING, Types.LONG};
String[] fields = new String[]{"id", "user_click", "time"};
RowTypeInfo typeInformation = new RowTypeInfo(types, fields);

DataStream<Row> stream = source.map(new MapFunction<String, Row>() {
private static final long serialVersionUID = 2349572544179673349L;

@Override
public Row map(String s) {
String[] split = s.split(",");
Row row = new Row(split.length);
for (int i = 0; i < split.length; i++) {
Object value = split[i];
if (types[i].equals(Types.STRING)) {
value = split[i];
}
if (types[i].equals(Types.LONG)) {
value = Long.valueOf(split[i]);
}
row.setField(i, value);
}
return row;
}
}).returns(typeInformation);

tableEnv.registerDataStream("user_click_name", stream, String.join(",", typeInformation.getFieldNames()) + ",proctime.proctime");

RedisAsyncLookupTableSource tableSource = RedisAsyncLookupTableSource.Builder.newBuilder()
.withFieldNames(new String[]{"id", "name"})
.withFieldTypes(new TypeInformation[]{Types.STRING, Types.STRING})
.build();
tableEnv.registerTableSource("info", tableSource);

String sql = "select t1.id,t1.user_click,t2.name" +
" from user_click_name as t1" +
" join info FOR SYSTEM_TIME AS OF t1.proctime as t2" +
" on t1.id = t2.id";

Table table = tableEnv.sqlQuery(sql);

DataStream<Row> result = tableEnv.toAppendStream(table, Row.class);

// result.print().setParallelism(1);

DataStream<String> printStream = result.map(new MapFunction<Row, String>() {
@Override
public String map(Row value) throws Exception {
return value.toString();
}
});

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "172.16.12.148:9094");
FlinkKafkaProducer011<String> kafkaProducer = new FlinkKafkaProducer011<>(// broker list
"user_click_name", // target topic
new SimpleStringSchema(),
properties);
printStream.addSink(kafkaProducer);

tableEnv.execute(Thread.currentThread().getStackTrace()[1].getClassName());
}
}

三、

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
public class TableSQL {
public static void main(String[] args) throws Exception{
//1\. 获取上下文环境 table的环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = BatchTableEnvironment.getTableEnvironment(env);
//2\. 读取score.csv
DataSet<String> input = env.readTextFile("score.csv");
input.print();
DataSet<PlayerData> topInput = input.map(new MapFunction<String, PlayerData>() {
@Override
public PlayerData map(String s) throws Exception {
String[] split = s.split(",");
return new PlayerData(String.valueOf(split[0]),
String.valueOf(split[1]),
String.valueOf(split[2]),
Integer.valueOf(split[3]),
Double.valueOf(split[4]),
Double.valueOf(split[5]),
Double.valueOf(split[6]),
Double.valueOf(split[7]),
Double.valueOf(split[8])
);
}
});
//3\. 注册成内存表
Table topScore = tableEnv.fromDataSet(topInput);
tableEnv.registerTable("score", topScore);
//4\. 编写sql 然后提交执行
//select player, count(season) as num from score group by player order by num desc;
Table queryResult = tableEnv.sqlQuery("select player, count(season) as num from score group by player order by num desc limit 3");
//5\. 结果进行打印
DataSet<Result> result = tableEnv.toDataSet(queryResult, Result.class);
result.print();
}
public static class PlayerData {
/**
* 赛季,球员,出场,首发,时间,助攻,抢断,盖帽,得分
*/

public String season;
public String player;
public String play_num;
public Integer first_court;
public Double time;
public Double assists;
public Double steals;
public Double blocks;
public Double scores;
public PlayerData() {
super();
}
public PlayerData(String season,
String player,
String play_num,
Integer first_court,
Double time,
Double assists,
Double steals,
Double blocks,
Double scores
) {
this.season = season;
this.player = player;
this.play_num = play_num;
this.first_court = first_court;
this.time = time;
this.assists = assists;
this.steals = steals;
this.blocks = blocks;
this.scores = scores;
}
}
public static class Result {
public String player;
public Long num;
public Result() {
super();
}
public Result(String player, Long num) {
this.player = player;
this.num = num;
}
@Override
public String toString() {
return player + ":" + num;
}
}
}//

当然我们也可以自定义一个 Sink,将结果输出到一个文件中,例如:

TableSink sink = new CsvTableSink("/home/result.csv", ",");
String[] fieldNames = {"name", "num"};
TypeInformation[] fieldTypes = {Types.STRING, Types.INT};
tableEnv.registerTableSink("result", fieldNames, fieldTypes, sink);
sqlQuery.insertInto("result");
env.execute();

然后我们运行程序,可以看到 /home 目录下生成的 result.csv,查看结果:

迈克尔-乔丹,10
凯文-杜兰特,4
阿伦-艾弗森,4

https://blog.csdn.net/u013411339/article/details/93267838
https://blog.csdn.net/u012554509/article/details/100533749


推荐阅读
  • 技术日志:深入探讨Spark Streaming与Spark SQL的融合应用
    技术日志:深入探讨Spark Streaming与Spark SQL的融合应用 ... [详细]
  • 本文探讨了利用Java实现WebSocket实时消息推送技术的方法。与传统的轮询、长连接或短连接等方案相比,WebSocket提供了一种更为高效和低延迟的双向通信机制。通过建立持久连接,服务器能够主动向客户端推送数据,从而实现真正的实时消息传递。此外,本文还介绍了WebSocket在实际应用中的优势和应用场景,并提供了详细的实现步骤和技术细节。 ... [详细]
  • 在本地环境中部署了两个不同版本的 Flink 集群,分别为 1.9.1 和 1.9.2。近期在尝试启动 1.9.1 版本的 Flink 任务时,遇到了 TaskExecutor 启动失败的问题。尽管 TaskManager 日志显示正常,但任务仍无法成功启动。经过详细分析,发现该问题是由 Kafka 版本不兼容引起的。通过调整 Kafka 客户端配置并升级相关依赖,最终成功解决了这一故障。 ... [详细]
  • Storm集成Kakfa
    一、整合说明Storm官方对Kafka的整合分为两个版本,官方说明文档分别如下:StormKafkaIntegratio ... [详细]
  • 全面解析JavaScript代码注释技巧与标准规范
    在Web前端开发中,JavaScript代码的可读性和维护性至关重要。本文将详细介绍如何有效地使用注释来提高代码的可读性,并探讨JavaScript代码注释的最佳实践和标准规范。通过合理的注释,开发者可以更好地理解和维护复杂的代码逻辑,提升团队协作效率。 ... [详细]
  • 本指南介绍了如何在ASP.NET Web应用程序中利用C#和JavaScript实现基于指纹识别的登录系统。通过集成指纹识别技术,用户无需输入传统的登录ID即可完成身份验证,从而提升用户体验和安全性。我们将详细探讨如何配置和部署这一功能,确保系统的稳定性和可靠性。 ... [详细]
  • HBase Java API 进阶:过滤器详解与应用实例
    本文详细探讨了HBase 1.2.6版本中Java API的高级应用,重点介绍了过滤器的使用方法和实际案例。首先,文章对几种常见的HBase过滤器进行了概述,包括列前缀过滤器(ColumnPrefixFilter)和时间戳过滤器(TimestampsFilter)。此外,还详细讲解了分页过滤器(PageFilter)的实现原理及其在大数据查询中的应用场景。通过具体的代码示例,读者可以更好地理解和掌握这些过滤器的使用技巧,从而提高数据处理的效率和灵活性。 ... [详细]
  • 本文深入探讨了CGLIB BeanCopier在Bean对象复制中的应用及其优化技巧。相较于Spring的BeanUtils和Apache的BeanUtils,CGLIB BeanCopier在性能上具有显著优势。通过详细分析其内部机制和使用场景,本文提供了多种优化方法,帮助开发者在实际项目中更高效地利用这一工具。此外,文章还讨论了CGLIB BeanCopier在复杂对象结构和大规模数据处理中的表现,为读者提供了实用的参考和建议。 ... [详细]
  • 使用 ListView 浏览安卓系统中的回收站文件 ... [详细]
  • 优化后的标题:深入探讨网关安全:将微服务升级为OAuth2资源服务器的最佳实践
    本文深入探讨了如何将微服务升级为OAuth2资源服务器,以订单服务为例,详细介绍了在POM文件中添加 `spring-cloud-starter-oauth2` 依赖,并配置Spring Security以实现对微服务的保护。通过这一过程,不仅增强了系统的安全性,还提高了资源访问的可控性和灵活性。文章还讨论了最佳实践,包括如何配置OAuth2客户端和资源服务器,以及如何处理常见的安全问题和错误。 ... [详细]
  • C++ 异步编程中获取线程执行结果的方法与技巧及其在前端开发中的应用探讨
    本文探讨了C++异步编程中获取线程执行结果的方法与技巧,并深入分析了这些技术在前端开发中的应用。通过对比不同的异步编程模型,本文详细介绍了如何高效地处理多线程任务,确保程序的稳定性和性能。同时,文章还结合实际案例,展示了这些方法在前端异步编程中的具体实现和优化策略。 ... [详细]
  • 本文详细介绍了在CentOS 6.5 64位系统上使用阿里云ECS服务器搭建LAMP环境的具体步骤。首先,通过PuTTY工具实现远程连接至服务器。接着,检查当前系统的磁盘空间使用情况,确保有足够的空间进行后续操作,可使用 `df` 命令进行查看。此外,文章还涵盖了安装和配置Apache、MySQL和PHP的相关步骤,以及常见问题的解决方法,帮助用户顺利完成LAMP环境的搭建。 ... [详细]
  • 在Eclipse中批量转换Java源代码文件的编码格式从GBK到UTF-8是一项常见的需求。通过编写简单的Java代码,可以高效地实现这一任务。该方法不仅适用于Java文件,还可以用于其他类型的文本文件编码转换。具体实现可以通过导入`java.io.File`类来操作文件系统,从而完成批量转换。此外,建议在转换过程中添加异常处理机制,以确保代码的健壮性和可靠性。 ... [详细]
  • 在Linux系统中,原本已安装了多个版本的Python 2,并且还安装了Anaconda,其中包含了Python 3。本文详细介绍了如何通过配置环境变量,使系统默认使用指定版本的Python,以便在不同版本之间轻松切换。此外,文章还提供了具体的实践步骤和注意事项,帮助用户高效地管理和使用不同版本的Python环境。 ... [详细]
  • 启动activemq_「Java」SpringBoot amp; ActiveMQ
    一、消息队列消息队列中间件是分布式系统中重要的组件,主要解决应用耦合、异步消息、流量削锋等问题,实现高性能、高可用、可伸缩和最终一致性架构, ... [详细]
author-avatar
BOSS
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有