热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

FlinkSockettablesource/sink

最近想起刚做实时数仓时候的项目架构,如下:从架构图可以看到,数仓是基于Flink和Kafka的实时能力做的,同时每层的结果会存一份到HBase(公司特点方便查数)那时候就想弄个支持

最近想起刚做实时数仓时候的项目架构,如下:

从架构图可以看到,数仓是基于 Flink 和 Kafka 的实时能力做的,同时每层的结果会存一份到 HBase(公司特点方便查数)

那时候就想弄个支持多路输出的 Table Sink,一直没有动手。现在先来个简单的 SocketTableSink 练练手

之前搞 Table Source 的时候,已经实现了 Socke Table Source(从官网抄的),这次刚好一起凑成一组 Table Source/Sink

先来个架构图:

类图:

SocketDynamicTableFactory 实现 Flink 的 DynamicTableSourceFactory, DynamicTableSinkFactory

SocketDynamicTableSource 实现 Flink 的 ScanTableSource, ScanTableSource 实现了 DynamicTableSource

SocketDynamicTableSink 实现 Flink 的 DynamicTableSink

SocketSourceFunction 继承 Flink 的 RichSourceFunction

SocketSinkFunction 继承 RichSinkFunction


代码


SocketDynamicTableFactory

定义了connector 标识,和 Socket table source/sink 必选、可选的参数,如: hostname、port 等

@Override
public String factoryIdentifier() {
return "socket"; // used for matching to `cOnnector= '...'`
}
// define all options statically
public static final ConfigOption HOSTNAME = ConfigOptions.key("hostname")
.stringType()
.noDefaultValue();
public static final ConfigOption PORT = ConfigOptions.key("port")
.intType()
.noDefaultValue();

实现了 createDynamicTableSource、createDynamicTableSink 方法,解析 Catalog 中的参数,根据 format 类型选择合适的序列化器或反序列化器,最后创建 SocketDynamicTableSource、SocketDynamicTableSink

/**
* create socket table source
* @param context
* @return
*/
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
// either implement your custom validation logic here ...
// or use the provided helper utility
final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
// discover a suitable decoding format
final DecodingFormat> decodingFormat = helper.discoverDecodingFormat(
DeserializationFormatFactory.class,
FactoryUtil.FORMAT);
// validate all options
helper.validate();
// get the validated options
final ReadableConfig optiOns= helper.getOptions();
final String hostname = options.get(HOSTNAME);
final int port = options.get(MAX_RETRY);
final byte byteDelimiter = (byte) (int) options.get(BYTE_DELIMITER);
// derive the produced data type (excluding computed columns) from the catalog table
final DataType producedDataType =
context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType();
// create and return dynamic table source
return new SocketDynamicTableSource(hostname, port, byteDelimiter, decodingFormat, producedDataType);
}
/**
* create socket table sink
* @param context
* @return
*/
@Override
public DynamicTableSink createDynamicTableSink(Context context) {
// either implement your custom validation logic here ...
// or use the provided helper utility
final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
final EncodingFormat> encodingFormat = helper.discoverEncodingFormat(
SerializationFormatFactory.class,
FactoryUtil.FORMAT);
// validate all options
helper.validate();
// get the validated options
final ReadableConfig optiOns= helper.getOptions();
final String hostname = options.get(HOSTNAME);
final int port = options.get(PORT);
final int maxRetry = options.get(MAX_RETRY);
final long retryInterval = options.get(RETRY_INTERVAL) * 1000;
final byte byteDelimiter = (byte) (int) options.get(BYTE_DELIMITER);
Preconditions.checkState(maxRetry > 0, "max retry time max greater than 0, current : ", maxRetry);
// derive the produced data type (excluding computed columns) from the catalog table
final DataType producedDataType =
context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType();
return new SocketDynamicTableSink(hostname, port, maxRetry, retryInterval, encodingFormat, byteDelimiter, producedDataType);
}

SocketDynamicTableSource

用 DecodingFormat 创建对应的反序列化器,创建 Source Function


@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
// create runtime classes that are shipped to the cluster
// create deserializer
final DeserializationSchema deserializer = decodingFormat.createRuntimeDecoder(
runtimeProviderContext,
producedDataType);
final SourceFunction sourceFunction = new SocketSourceFunction(hostname, port, byteDelimiter, deserializer);
return SourceFunctionProvider.of(sourceFunction, false);
}

SocketSourceFunction

从 socket 中获取输入流,反序列化数据,输出到下游


@Override
public void run(SourceContext ctx) throws Exception {
while (isRunning) {
// open and consume from socket
try (final Socket socket = new Socket()) {
currentSocket = socket;
socket.connect(new InetSocketAddress(hostname, port), 0);
try (InputStream stream = socket.getInputStream()) {
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
int b;
while ((b = stream.read()) >= 0) {
// buffer until delimiter
if (b != byteDelimiter) {
buffer.write(b);
}
// decode and emit record
else {
ctx.collect(deserializer.deserialize(buffer.toByteArray()));
buffer.reset();
}
}
}
} catch (Throwable t) {
t.printStackTrace(); // print and continue
}
Thread.sleep(1000);
}
}

SocketDynamicTableSink

和 SocketDynamicTableSource 类似,用 EncodingFormat 创建序列化器,创建 SocketSinkFunction


@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
final SerializationSchema serializer = encodingFormat.createRuntimeEncoder(context, producedDataType);
SocketSinkFunction sink = new SocketSinkFunction(hostname, port, serializer, maxRetry, retryInterval);
return SinkFunctionProvider.of(sink);
}

SocketSinkFunction

将数据序列化后,写入到 socket 的输出流


@Override
public void invoke(RowData element, Context context) throws Exception {
int retryTime = 0;
byte[] message = serializer.serialize(element);
while (retryTime <= maxRetry) {
try {
os.write(message);
os.flush();
return;
} catch (Exception e) {
Thread.sleep(retryInterval);
++retryTime;
reconnect();
// LOG.warn("send data error retry: {}", retryTime);
}
}
LOG.warn("send error after retry {} times, ignore it: {}", maxRetry, new String(message));
}

测试

SQL 读 socket 写 socket:

-- kafka source
drop table if exists user_log;
CREATE TABLE user_log (
user_id VARCHAR
,item_id VARCHAR
,category_id VARCHAR
,behavior VARCHAR
,ts TIMESTAMP(3)
,WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'socket'
,'hostname' = 'thinkpad'
,'port' = '12345'
,'format' = 'json'
-- ,'format' = 'csv'
);
-- set table.sql-dialect=hive;
-- kafka sink
drop table if exists socket_sink;
CREATE TABLE socket_sink (
user_id STRING
,item_id STRING
,category_id STRING
,behavior STRING
,ts timestamp(3)
) WITH (
'connector' = 'socket'
,'hostname' = 'localhost'
,'max.retry' = '2'
-- ,'retry.interval' = '2'
,'port' = '12346'
,'format' = 'csv'
);
-- streaming sql, insert into mysql table
insert into socket_sink
SELECT user_id, item_id, category_id, behavior, ts
FROM user_log;

先打开两个端口:

nc -lk localhost 12345
nc -lk localhost 12346

输入数据:

输出:



  • 注: 要先打开输入、输出的端口,不然连不上端口,source 和sink 都要报错

完整示例参考: github sqlSubmit

欢迎关注Flink菜鸟公众号,会不定期更新Flink(开发技术)相关的推文



推荐阅读
  • JUC(三):深入解析AQS
    本文详细介绍了Java并发工具包中的核心类AQS(AbstractQueuedSynchronizer),包括其基本概念、数据结构、源码分析及核心方法的实现。 ... [详细]
  • 本文介绍了在 Java 编程中遇到的一个常见错误:对象无法转换为 long 类型,并提供了详细的解决方案。 ... [详细]
  • 深入解析 Lifecycle 的实现原理
    本文将详细介绍 Android Jetpack 中 Lifecycle 组件的实现原理,帮助开发者更好地理解和使用 Lifecycle,避免常见的内存泄漏问题。 ... [详细]
  • 本文详细介绍了 PHP 中对象的生命周期、内存管理和魔术方法的使用,包括对象的自动销毁、析构函数的作用以及各种魔术方法的具体应用场景。 ... [详细]
  • 第二章:Kafka基础入门与核心概念解析
    本章节主要介绍了Kafka的基本概念及其核心特性。Kafka是一种分布式消息发布和订阅系统,以其卓越的性能和高吞吐量而著称。最初,Kafka被设计用于LinkedIn的活动流和运营数据处理,旨在高效地管理和传输大规模的数据流。这些数据主要包括用户活动记录、系统日志和其他实时信息。通过深入解析Kafka的设计原理和应用场景,读者将能够更好地理解其在现代大数据架构中的重要地位。 ... [详细]
  • Spring – Bean Life Cycle
    Spring – Bean Life Cycle ... [详细]
  • DAO(Data Access Object)模式是一种用于抽象和封装所有对数据库或其他持久化机制访问的方法,它通过提供一个统一的接口来隐藏底层数据访问的复杂性。 ... [详细]
  • 单片微机原理P3:80C51外部拓展系统
      外部拓展其实是个相对来说很好玩的章节,可以真正开始用单片机写程序了,比较重要的是外部存储器拓展,81C55拓展,矩阵键盘,动态显示,DAC和ADC。0.IO接口电路概念与存 ... [详细]
  • 从0到1搭建大数据平台
    从0到1搭建大数据平台 ... [详细]
  • Java高并发与多线程(二):线程的实现方式详解
    本文将深入探讨Java中线程的三种主要实现方式,包括继承Thread类、实现Runnable接口和实现Callable接口,并分析它们之间的异同及其应用场景。 ... [详细]
  • 思科IOS XE与ISE集成实现TACACS认证配置
    本文详细介绍了如何在思科IOS XE设备上配置TACACS认证,并通过ISE(Identity Services Engine)进行用户管理和授权。配置包括网络拓扑、设备设置和ISE端的具体步骤。 ... [详细]
  • Flutter中计算文本尺寸的方法
    在Flutter开发中,有时需要计算文本的宽度和高度。本文介绍了一种利用TextPainter类实现这一功能的方法。 ... [详细]
  • MySQL的查询执行流程涉及多个关键组件,包括连接器、查询缓存、分析器和优化器。在服务层,连接器负责建立与客户端的连接,查询缓存用于存储和检索常用查询结果,以提高性能。分析器则解析SQL语句,生成语法树,而优化器负责选择最优的查询执行计划。这一流程确保了MySQL能够高效地处理各种复杂的查询请求。 ... [详细]
  • 浏览器作为我们日常不可或缺的软件工具,其背后的运作机制却鲜为人知。本文将深入探讨浏览器内核及其版本的演变历程,帮助读者更好地理解这一关键技术组件,揭示其内部运作的奥秘。 ... [详细]
  • 在C#编程中,设计流畅的用户界面是一项重要的任务。本文分享了实现Fluent界面设计的技巧与方法,特别是通过编写领域特定语言(DSL)来简化字符串操作。我们探讨了如何在不使用`+`符号的情况下,通过方法链式调用来组合字符串,从而提高代码的可读性和维护性。文章还介绍了如何利用静态方法和扩展方法来实现这一目标,并提供了一些实用的示例代码。 ... [详细]
author-avatar
谁的围脖搞笑排行榜
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有