热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

FlinkSockettablesource/sink

最近想起刚做实时数仓时候的项目架构,如下:从架构图可以看到,数仓是基于Flink和Kafka的实时能力做的,同时每层的结果会存一份到HBase(公司特点方便查数)那时候就想弄个支持

最近想起刚做实时数仓时候的项目架构,如下:

从架构图可以看到,数仓是基于 Flink 和 Kafka 的实时能力做的,同时每层的结果会存一份到 HBase(公司特点方便查数)

那时候就想弄个支持多路输出的 Table Sink,一直没有动手。现在先来个简单的 SocketTableSink 练练手

之前搞 Table Source 的时候,已经实现了 Socke Table Source(从官网抄的),这次刚好一起凑成一组 Table Source/Sink

先来个架构图:

类图:

SocketDynamicTableFactory 实现 Flink 的 DynamicTableSourceFactory, DynamicTableSinkFactory

SocketDynamicTableSource 实现 Flink 的 ScanTableSource, ScanTableSource 实现了 DynamicTableSource

SocketDynamicTableSink 实现 Flink 的 DynamicTableSink

SocketSourceFunction 继承 Flink 的 RichSourceFunction

SocketSinkFunction 继承 RichSinkFunction


代码


SocketDynamicTableFactory

定义了connector 标识,和 Socket table source/sink 必选、可选的参数,如: hostname、port 等

@Override
public String factoryIdentifier() {
return "socket"; // used for matching to `cOnnector= '...'`
}
// define all options statically
public static final ConfigOption HOSTNAME = ConfigOptions.key("hostname")
.stringType()
.noDefaultValue();
public static final ConfigOption PORT = ConfigOptions.key("port")
.intType()
.noDefaultValue();

实现了 createDynamicTableSource、createDynamicTableSink 方法,解析 Catalog 中的参数,根据 format 类型选择合适的序列化器或反序列化器,最后创建 SocketDynamicTableSource、SocketDynamicTableSink

/**
* create socket table source
* @param context
* @return
*/
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
// either implement your custom validation logic here ...
// or use the provided helper utility
final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
// discover a suitable decoding format
final DecodingFormat> decodingFormat = helper.discoverDecodingFormat(
DeserializationFormatFactory.class,
FactoryUtil.FORMAT);
// validate all options
helper.validate();
// get the validated options
final ReadableConfig optiOns= helper.getOptions();
final String hostname = options.get(HOSTNAME);
final int port = options.get(MAX_RETRY);
final byte byteDelimiter = (byte) (int) options.get(BYTE_DELIMITER);
// derive the produced data type (excluding computed columns) from the catalog table
final DataType producedDataType =
context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType();
// create and return dynamic table source
return new SocketDynamicTableSource(hostname, port, byteDelimiter, decodingFormat, producedDataType);
}
/**
* create socket table sink
* @param context
* @return
*/
@Override
public DynamicTableSink createDynamicTableSink(Context context) {
// either implement your custom validation logic here ...
// or use the provided helper utility
final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
final EncodingFormat> encodingFormat = helper.discoverEncodingFormat(
SerializationFormatFactory.class,
FactoryUtil.FORMAT);
// validate all options
helper.validate();
// get the validated options
final ReadableConfig optiOns= helper.getOptions();
final String hostname = options.get(HOSTNAME);
final int port = options.get(PORT);
final int maxRetry = options.get(MAX_RETRY);
final long retryInterval = options.get(RETRY_INTERVAL) * 1000;
final byte byteDelimiter = (byte) (int) options.get(BYTE_DELIMITER);
Preconditions.checkState(maxRetry > 0, "max retry time max greater than 0, current : ", maxRetry);
// derive the produced data type (excluding computed columns) from the catalog table
final DataType producedDataType =
context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType();
return new SocketDynamicTableSink(hostname, port, maxRetry, retryInterval, encodingFormat, byteDelimiter, producedDataType);
}

SocketDynamicTableSource

用 DecodingFormat 创建对应的反序列化器,创建 Source Function


@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
// create runtime classes that are shipped to the cluster
// create deserializer
final DeserializationSchema deserializer = decodingFormat.createRuntimeDecoder(
runtimeProviderContext,
producedDataType);
final SourceFunction sourceFunction = new SocketSourceFunction(hostname, port, byteDelimiter, deserializer);
return SourceFunctionProvider.of(sourceFunction, false);
}

SocketSourceFunction

从 socket 中获取输入流,反序列化数据,输出到下游


@Override
public void run(SourceContext ctx) throws Exception {
while (isRunning) {
// open and consume from socket
try (final Socket socket = new Socket()) {
currentSocket = socket;
socket.connect(new InetSocketAddress(hostname, port), 0);
try (InputStream stream = socket.getInputStream()) {
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
int b;
while ((b = stream.read()) >= 0) {
// buffer until delimiter
if (b != byteDelimiter) {
buffer.write(b);
}
// decode and emit record
else {
ctx.collect(deserializer.deserialize(buffer.toByteArray()));
buffer.reset();
}
}
}
} catch (Throwable t) {
t.printStackTrace(); // print and continue
}
Thread.sleep(1000);
}
}

SocketDynamicTableSink

和 SocketDynamicTableSource 类似,用 EncodingFormat 创建序列化器,创建 SocketSinkFunction


@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
final SerializationSchema serializer = encodingFormat.createRuntimeEncoder(context, producedDataType);
SocketSinkFunction sink = new SocketSinkFunction(hostname, port, serializer, maxRetry, retryInterval);
return SinkFunctionProvider.of(sink);
}

SocketSinkFunction

将数据序列化后,写入到 socket 的输出流


@Override
public void invoke(RowData element, Context context) throws Exception {
int retryTime = 0;
byte[] message = serializer.serialize(element);
while (retryTime <= maxRetry) {
try {
os.write(message);
os.flush();
return;
} catch (Exception e) {
Thread.sleep(retryInterval);
++retryTime;
reconnect();
// LOG.warn("send data error retry: {}", retryTime);
}
}
LOG.warn("send error after retry {} times, ignore it: {}", maxRetry, new String(message));
}

测试

SQL 读 socket 写 socket:

-- kafka source
drop table if exists user_log;
CREATE TABLE user_log (
user_id VARCHAR
,item_id VARCHAR
,category_id VARCHAR
,behavior VARCHAR
,ts TIMESTAMP(3)
,WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'socket'
,'hostname' = 'thinkpad'
,'port' = '12345'
,'format' = 'json'
-- ,'format' = 'csv'
);
-- set table.sql-dialect=hive;
-- kafka sink
drop table if exists socket_sink;
CREATE TABLE socket_sink (
user_id STRING
,item_id STRING
,category_id STRING
,behavior STRING
,ts timestamp(3)
) WITH (
'connector' = 'socket'
,'hostname' = 'localhost'
,'max.retry' = '2'
-- ,'retry.interval' = '2'
,'port' = '12346'
,'format' = 'csv'
);
-- streaming sql, insert into mysql table
insert into socket_sink
SELECT user_id, item_id, category_id, behavior, ts
FROM user_log;

先打开两个端口:

nc -lk localhost 12345
nc -lk localhost 12346

输入数据:

输出:



  • 注: 要先打开输入、输出的端口,不然连不上端口,source 和sink 都要报错

完整示例参考: github sqlSubmit

欢迎关注Flink菜鸟公众号,会不定期更新Flink(开发技术)相关的推文



推荐阅读
  • 如何自行分析定位SAP BSP错误
    The“BSPtag”Imentionedintheblogtitlemeansforexamplethetagchtmlb:configCelleratorbelowwhichi ... [详细]
  • 本文介绍了Oracle数据库中tnsnames.ora文件的作用和配置方法。tnsnames.ora文件在数据库启动过程中会被读取,用于解析LOCAL_LISTENER,并且与侦听无关。文章还提供了配置LOCAL_LISTENER和1522端口的示例,并展示了listener.ora文件的内容。 ... [详细]
  • http:my.oschina.netleejun2005blog136820刚看到群里又有同学在说HTTP协议下的Get请求参数长度是有大小限制的,最大不能超过XX ... [详细]
  • 自动轮播,反转播放的ViewPagerAdapter的使用方法和效果展示
    本文介绍了如何使用自动轮播、反转播放的ViewPagerAdapter,并展示了其效果。该ViewPagerAdapter支持无限循环、触摸暂停、切换缩放等功能。同时提供了使用GIF.gif的示例和github地址。通过LoopFragmentPagerAdapter类的getActualCount、getActualItem和getActualPagerTitle方法可以实现自定义的循环效果和标题展示。 ... [详细]
  • Android开发实现的计时器功能示例
    本文分享了Android开发实现的计时器功能示例,包括效果图、布局和按钮的使用。通过使用Chronometer控件,可以实现计时器功能。该示例适用于Android平台,供开发者参考。 ... [详细]
  • 深入理解Kafka服务端请求队列中请求的处理
    本文深入分析了Kafka服务端请求队列中请求的处理过程,详细介绍了请求的封装和放入请求队列的过程,以及处理请求的线程池的创建和容量设置。通过场景分析、图示说明和源码分析,帮助读者更好地理解Kafka服务端的工作原理。 ... [详细]
  • 使用圣杯布局模式实现网站首页的内容布局
    本文介绍了使用圣杯布局模式实现网站首页的内容布局的方法,包括HTML部分代码和实例。同时还提供了公司新闻、最新产品、关于我们、联系我们等页面的布局示例。商品展示区包括了车里子和农家生态土鸡蛋等产品的价格信息。 ... [详细]
  • IhaveconfiguredanactionforaremotenotificationwhenitarrivestomyiOsapp.Iwanttwodiff ... [详细]
  • android listview OnItemClickListener失效原因
    最近在做listview时发现OnItemClickListener失效的问题,经过查找发现是因为button的原因。不仅listitem中存在button会影响OnItemClickListener事件的失效,还会导致单击后listview每个item的背景改变,使得item中的所有有关焦点的事件都失效。本文给出了一个范例来说明这种情况,并提供了解决方法。 ... [详细]
  • Python正则表达式学习记录及常用方法
    本文记录了学习Python正则表达式的过程,介绍了re模块的常用方法re.search,并解释了rawstring的作用。正则表达式是一种方便检查字符串匹配模式的工具,通过本文的学习可以掌握Python中使用正则表达式的基本方法。 ... [详细]
  • 成功安装Sabayon Linux在thinkpad X60上的经验分享
    本文分享了作者在国庆期间在thinkpad X60上成功安装Sabayon Linux的经验。通过修改CHOST和执行emerge命令,作者顺利完成了安装过程。Sabayon Linux是一个基于Gentoo Linux的发行版,可以将电脑快速转变为一个功能强大的系统。除了作为一个live DVD使用外,Sabayon Linux还可以被安装在硬盘上,方便用户使用。 ... [详细]
  • 本文介绍了在Linux下安装和配置Kafka的方法,包括安装JDK、下载和解压Kafka、配置Kafka的参数,以及配置Kafka的日志目录、服务器IP和日志存放路径等。同时还提供了单机配置部署的方法和zookeeper地址和端口的配置。通过实操成功的案例,帮助读者快速完成Kafka的安装和配置。 ... [详细]
  • 怎么在PHP项目中实现一个HTTP断点续传功能发布时间:2021-01-1916:26:06来源:亿速云阅读:96作者:Le ... [详细]
  • Python爬虫中使用正则表达式的方法和注意事项
    本文介绍了在Python爬虫中使用正则表达式的方法和注意事项。首先解释了爬虫的四个主要步骤,并强调了正则表达式在数据处理中的重要性。然后详细介绍了正则表达式的概念和用法,包括检索、替换和过滤文本的功能。同时提到了re模块是Python内置的用于处理正则表达式的模块,并给出了使用正则表达式时需要注意的特殊字符转义和原始字符串的用法。通过本文的学习,读者可以掌握在Python爬虫中使用正则表达式的技巧和方法。 ... [详细]
  • 本文介绍了Oracle存储过程的基本语法和写法示例,同时还介绍了已命名的系统异常的产生原因。 ... [详细]
author-avatar
谁的围脖搞笑排行榜
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有