热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

关于flink:基于-FFI-的-PyFlink-下一代-Python-运行时介绍

摘要:本文整顿自阿里巴巴高级开发工程师黄兴勃(断尘)在FlinkForwardAisa2021核心技术专场的演讲。次要内容包含:PyFlink最新性能PyFlinkRuntime基

摘要:本文整顿自阿里巴巴高级开发工程师黄兴勃 (断尘) 在 Flink Forward Aisa 2021 核心技术专场的演讲。次要内容包含:

  1. PyFlink 最新性能
  2. PyFlink Runtime
  3. 基于 FFI 的 PEMJA
  4. PyFlink Runtime 2.0
  5. Future Work

点击查看直播回放 & 演讲PDF

原 Flink Forward Asia 2021 演讲中的 JCP 我的项目已改名为 PEMJA,并且于 2022 年 1 月 14 日正式开源,开源地址为:

https://github.com/alibaba/pemja

Ps: JCP 已在本文替换为 PEMJA。

一、PyFlink 新性能

PyFlink 1.14 新增了很多性能,次要分为性能、易用性和性能三个方面。

性能方面,新增了 State TTL config。在 1.14 以前曾经实现了 Python Datastream API 以及一些操作 State 上的性能,然而并没有提供 State TTL config 的配置,这也意味着用户写 Python Datastream API 的自定义函数时无奈主动把State的值清掉,而是须要手动的操作,对用户不够敌对。

易用性方面,次要新增了以下几项性能:

  • 在依赖治理局部反对了 tar.gz 格局。
  • Profile 性能。用户写 PyFlink 会用到一些 Python 的自定义函数,但并不分明这部分函数的性能瓶颈在哪里。而有了 profile 性能之后,Python 函数呈现性能瓶颈时,便能够通过 profile 剖析它的瓶颈具体是由起因什么引起,从而能够针对这部分进行一些优化。
  • Print 性能。在 1.14 以前,打印自定义的 log 信息必须应用 Python 自定义的 logging 模块。但对于 Python 用户来说,print 是他们比拟习惯应用的一种输入日志信息的形式。所以在 1.14 增加上了这部分性能。
  • Local Debug 模式。在 1.14 以前,用户如果应用 Python 自定义的函数在本地开发 PyFlink 作业,必须应用 remote debug 形式调试自定义逻辑,但它应用起来绝对比拟繁琐,而且应用门槛较高。在 1.14 扭转了这种模式,如果在本地编写一个 PyFlink 作业应用了 Python 自定义函数,能够主动切到 local debug 模式,能够在 ide 外面间接 debug 自定义 Python 函数。

性能方面次要新增了以下性能:

  • Operator Fusion。这个性能次要针对在 Python Datastream API 的作业中做间断几个算子操作的场景。比方两次 .map 操作,在 1.14 以前,这两个 .map 会别离运行在两个 Python worker 中,而实现了 Operator Fusion 后,它们会被 merge 并运行在同一个 operator 中,而后由 Python worker 执行总的后果,达到了很好的性能优化。
  • State 序列化/反序列化优化。在 1.14 以前,State 序列化/反序列化优化是应用 Python 内置的序列化器 pickle,它能序列化各种 Python 自定义的数据结构,但须要把 State type 的信息序列化到数据结构中,这会导致序列化的构造领会更大。1.14 中对其进行了优化,应用了自定义的序列化器,一个 type 对应一个序列化器来做优化,使得序列化信息更小。
  • Finish Bundle 优化。在 1.14 以前 Finish Bundle 是同步的操作,现在把它改成了异步的操作,进步了它的性能,而且能解决一些 Checkpoint 无奈实现的场景。

二、PyFlink Runtime

上图是 PyFlink 现有的框架图。

图左侧的最上方的 Python Table API & SQL 和 Datastream API 是提供给用户的 Python API。用户通过这两个 Python API 编写 PyFlink 作业,再通过一个 py4j 的三方库把 Python API 转换成 Java API,即可对应到 Flink Java API 来形容这个作业。

针对 Table 和 SQL 的作业有个额定的 optimizer,它有两种 rule,一种是常见的 common rules,另一种是 Python rules。这里为什么会有 Python rules?家喻户晓,common rules 针对各种 Table 和 SQL 现有的作业都是无效的,而 Python rules 做的优化是针对 PyFlink 作业中应用了自定义的 Python 函数的场景,可能把对应的 operator 抽取进去。

形容完了作业之后,它会被翻译成一个 jobgraph,外面有对应的 Python operators。Python operators 形容的 jobgraph 会提交到 TM (Runtime) 下来运行, Runtime 中也有个 Python operators。

图右侧是 Python operators 的各种组件,形容了 PyFlink Runtime 最外围的局部。次要分为两个局部:Java operator 和 Python worker。

Java operator 中它有很多个组件,包含 data service 和 State service,以及针对 checkpoint、watermark 和 State request 的一些解决。因为自定义 Python 函数无奈间接运行在 Flink 现有的架构之上,Flink 现有的架构是基于 JVM 的,然而编写 Python 函数须要一个 Python Runtime,所以用 operator worker 来解决这个问题。

解决方案如下:发动一个 Python 过程运行 Python 自定义的函数,同时应用 Java operator 解决上游来的数据,再通过非凡解决之后发送给对应的 Python worker。这里应用的是过程间通信的计划,也就是图中的 data service。State service 针对 Python Datastream API 对 State 的操作,通过在 Python 里操作 State,数据会从 Python worker 返回到 Java operator,Java operator 再通过拜访 State backend 拿到对应的 State 数据,并回传给 Python worker,最初用户就能够操作 State 的后果了。

上图是 PyFlink Runtime Workflow。外面的角色别离是 Python operator、Python runner、bundle processor、coder、Python operation,这几个不同的角色运行在不同的中央。其中 Python operator 和 Python runner 是运行在 Java JVM 里,负责对接上游和上游的 Java operator,而 bundle processor、coder 以及 Python operation 运行在 PVM 里,bundle processor 利用了现有的 Apache Bean 框架,可能接管来自于 Java Python 的数据,它们之间应用了过程间通信。coder 是在 Python 端的一个自定义的序列化器,Java 端发送了一条数据,通过 Python operator 发送给 Python runner,由 Python runner 进行序列化后,再通过过程间的通信发送给 bundle processor。bundle processor 再把序列化后的二进制数组通过 coder 将它反序列化并失去一个 Python 对象。最初通过 Python operation 把反序列化之后的 Python 参数作为一个函数体的入参,而后调用自定义的 Python 函数,失去自定义的后果。

上述流程的瓶颈次要存在以下几个方面:首先是计算端调用用户自定义函数以及在调用之前,存在框架层 Python 写的开销;其次是自定义序列化局部,在 Java 端和 Python 端都须要序列化和反序列化数据;第三局部是过程间的通信。

针对上述瓶颈,进行了一些列优化:

  • 计算方面,利用 codegen 将现有的 Python 调函数的变量全都改为常量,函数的执行效率会更高;另外,将现有的 Python operation 的实现全都改为 cython,相当于将 Python 转化为 .c 的实现形式,性能失去了大幅晋升;
  • 序列化方面,提供了自定义序列化器,全都是纯 c 的实现,比 Python 更高效。
  • 通信方面,目前暂未实现优化。
  • 序列化和通信的问题,实质上就是 Java 和 Python 相互调用的问题,也就是如何优化 PyFlink 的 Runtime 架构的问题。

三、基于 FFI 的 PEMJA

Java 和 Python 相互调用曾经是一个比拟通用的问题,目前也曾经有很多种实现计划。

第一种是过程间相互调用的计划,即网络通信的计划,包含以下几种:

  • socket 计划,它所有的通信协定都是通过本人实现,能够很灵便,然而比拟繁琐;
  • py4j 计划,即 PyFlink 和 PySpark 在客户端编写作业时都应用 py4j;
  • Alink 计划,它是在 Runtime 运行时应用 py4j,也有自定义的 Python 函数;grpc 计划,它利用现有的 grp service,不须要自定义的协定,有自定义的 service 和 message;
  • 此外,共享内存的计划也是另一种过程间通信的计划,比方 Tensorflow on Flink,它是通过共享内存的形式实现的。还有 PyArrow Plasma,也是一种对象式的共享内存存储。

上述计划都是针对过程间通信,那么是否让 Python 和 Java 运行在同一个过程里,从而齐全打消过程间通信带来的困扰?

的确有一些现有的库在这方面做了尝试,第一种计划是将 Python 转成 Java。比方 p2j 是把 Python 的 source code 转成 Java 的 source code,voc 是把 Python 代码间接转成 Java 的 bytecode,这种计划的实质就是将 Python 转成一套能够间接运行在 JVM 之上的代码。但这套计划也存在不小的缺点,因为 Python 是在一直地倒退,它有各种语法,而将 Python 语法映射到 Java 中对应的对象是很艰难的,它们毕竟是不同的语言。

第二种计划是基于 Java 实现的 Python 解释器。首先是 Jython 计划,Python 其实是用 c 语言写的一套 Python 解释器,c 写的 Python 解释器能够运行在 c 之上,那么 Java 实现的 Python 解释器也就能够间接运行在 JVM 之上。另外一种计划是 Graalvm,它提供了一种 truffle framework 的形式,能够反对各种编程语言应用独特的构造,这种构造能运行在 JVM 之上,也就能够让各种语言运行在同一个过程里。

上述计划实现的前提是可能辨认 Python code,也就意味着要能兼容现有的各种 Python code,然而目前来看,兼容是一个难以解决的问题,因而也就阻止了这套 Python 转成 Java 计划持续推广的可能性。

第三种是基于 FFI 的一套计划。

FFI 的实质就是 host language 如何调用一个 guest language,即 Java 与 Python 之间的相互调用,对应的具体实现计划有很多种。

Java 提供了 JNI (Java native interface),让 Java 用户可能通过 JNI 的接口调用 c 实现的一些 lib,反过来也同样实用。有了这套接口之后, JVM 的厂商就会依据这套接口去实现 JNI,从而实现 Java 与 c 之间的相互调用。

Python/C API 也是相似的, Python 是一套 c 实现的解释器,因而能很好地反对 Python 代码调用 c 的三方库,反之也同样实用。

Cython 提供了一个工具,可能将 source code 转换成另一种语言能辨认的代码。比方将 Python 代码转换成一套十分高效的 c 语言代码,再嵌入到 cPython 解释器中即可间接运行,十分高效。

Ctypes 是通过将 c 的 library 封装起来,使得 Python 能高效地调用 c 的 library。

上述提到基于 FFI 的计划的外围就是 c。有了 c 这个桥梁之后,一个 Java 写成的代码,通过 JNI 接口就能调用到 c,而后由 c 去调用 cPython API 的接口,最终实现 Java 和 Python 运行在同一个线程里,这就是 PEMJA 的整体思路。解决了过程间通信的问题,以及因为它自身是应用的是本人提供的 Python/C API,也就不存在兼容性的问题,克服了 Java 实现解释器的缺点。

上图展现了基于这套思维的几种实现,但这几种实现都或多或少存在一些问题。

JPype 解决的问题是 Python 调用 Java 的问题,不反对 Java 调用 Python,所以它并不实用这个场景。

JEP 实现了 Java 调用 Python,但它的具体实现存在很多限度,一是只能用源码装置,对环境的要求十分高,以及它须要依赖 cPython 三方的一些 .source 文件,十分不利于跨平台的装置应用。JEP 的启动入口必须是JEP的程序,须要动静加载类库,必须提前在环境变量中设置好,十分不利于它作为一个第三方的中间件插件运行在另一个架构上。此外还有性能上问题,它没有很好地克服现有的 Python GIL 的问题,所以导致它的性能并不是那么高效。

而 PEMJA 根本克服了上述问题,更好的实现了 Java 和 Python 相互调用。

上图是几种框架的性能比照。这里应用了一个比拟规范简略的 String upper 函数。这里次要比拟的是框架层的开销,并不是自定义函数的性能,所以应用了一个最简略的函数。同时,思考到现有的各种函数最罕用的数据结构是 String,所以这里应用了 String。

这里别离比照的是 100 个 bytes 和 1000 个 bytes 在这 4 种解释器下的性能,能够看到 Jython 并没有像设想中那么高效,反而是这 4 种实现计划中性能最低的。JEP 的性能也远远比不上 PEMJA,PEMJA 在 100 bytes 的时候大略是纯 Java 实现的 40%,1000 bytes 的状况下性能竟然超过了纯 Java 的实现。

如何解释这个景象呢?String upper 自身是一套 Java 的实现,而在 Python 中它是 .c 的实现,函数自身的执行效率比 Java 高,再联合框架开销足够小的状况,整体的性能反而比 Java 更高,也就意味着在某些场景下,Python UDF 的性能是有可能超过 Java UDF 的。

当初很多用户应用 Java UDF 而不应用 Python UDF 的一个关键点是 Python UDF 性能远远比不上 Java。然而如果 Java 的性能并没有比 Python 更好的话,Python 反而就有了劣势,因为它毕竟是一种脚本语言,写起来是更不便。

上图展现了 PEMJA 的架构。

Java 中的 damond thread 负责初始化以及最初的销毁以及在 PEMJA 和对应的 Python PVM 里创立及开释资源。用户应用的是 Java 中的 PEMJA 实例,实例映射到 PEMJA 中对应 PEMJA 的 instance,instant 会创立每一个 Python 的 sub interpreter。Python double interpreter 绝对于全局 Python interpreter,是一个更小的可能掌控 GIL 的概念,它有本人独立的 hip 空间,所以可能实现命名空间的隔离。这里的每一个 thread 都会对应一个 Python sub interpret,能够在对应的 PVM 里执行本人的 Python function。

四、PyFlink Runtime 2.0

PyFlink Runtime 2.0 就是基于 PEMJA 做的。

上图右边是 PyFlink 1.0 的架构。外面有两个过程,一个是 Java 过程,一个是 Python 过程。它们之间的数据交互是通过 data service 和 State service 实现,应用了过程 IPC 通信。

有了 PEMJA 之后,就能够把 data service 和 State service 替换成 PEMJA Lib,随即能够把右边原来的 JVM 和左边的 PVM 运行在同一个过程里,从而彻底解掉的 IPC 过程通信的问题。

上图将现有的 PyFlink UDF、PyFlink 基于 PEMJA 的一套 UDF 以及 Java UDF 做了性能比照。也是应用 String upper 函数,比拟 100 bytes 和 1000 bytes 的性能。能够看到,在 100 bytes 的状况下,UDF on PEMJA 的实现曾经根本达到 Java UDF 的 50% 的性能。在 1000 bytes 的状况下,UDF on PEMJA 的性能曾经超过了 Java UDF。尽管这和实现了自定义的函数无关,但也能阐明这套 PEMJA 框架的性能之高效。

五 、Future Work

将来,会开源 PEMJA 框架 (已于 2022 年 1 月 14 日正式开源),因为它波及到通用的解决方案,不仅仅是使用在 PyFlink 之上,各种 Java 和 Python 相互调用的计划也都能够利用这套框架,所以会对 PEMJA 框架做一个独立的开源。它的第一个版本临时只反对 Java 调用 Python 性能,后续会反对 Python 调用 Java 的性能,因为 Python Datastream API 用 Python 写的函数调用 State 是依赖于 Python 调用 Java 的性能。此外,将实现 PEMJA 反对 Numpy 原生数据结构,实现了这个反对之后,pandas UDF 也就得以使用,性能将会失去质的飞跃。

欢送大家退出 “PyFlink 交换群”,交换 PyFlink 相干的问题。


Flink CDC Meetup · Online

工夫:5 月 21 日 9:00-12:25

PC 端直播观看:https://developer.aliyun.com/…

挪动端倡议微信扫一扫关注 ApacheFlink 视频号预约观看:

更多 Flink 相干技术问题,可扫码退出社区钉钉交换群
第一工夫获取最新技术文章和社区动静,请关注公众号~

流动举荐

阿里云基于 Apache Flink 构建的企业级产品-实时计算Flink版现开启流动:
99 元试用 实时计算Flink版(包年包月、10CU)即有机会取得 Flink 独家定制卫衣;另包 3 个月及以上还有 85 折优惠!
理解流动详情:https://www.aliyun.com/produc…


推荐阅读
  • 使用在线工具jsonschema2pojo根据json生成java对象
    本文介绍了使用在线工具jsonschema2pojo根据json生成java对象的方法。通过该工具,用户只需将json字符串复制到输入框中,即可自动将其转换成java对象。该工具还能解析列表式的json数据,并将嵌套在内层的对象也解析出来。本文以请求github的api为例,展示了使用该工具的步骤和效果。 ... [详细]
  • 20211101CleverTap参与度和分析工具功能平台学习/实践
    1.应用场景主要用于学习CleverTap的使用,该平台主要用于客户保留与参与平台.为客户提供价值.这里接触到的原因,是目前公司用到该平台的服务~2.学习操作 ... [详细]
  • 本文介绍了设计师伊振华受邀参与沈阳市智慧城市运行管理中心项目的整体设计,并以数字赋能和创新驱动高质量发展的理念,建设了集成、智慧、高效的一体化城市综合管理平台,促进了城市的数字化转型。该中心被称为当代城市的智能心脏,为沈阳市的智慧城市建设做出了重要贡献。 ... [详细]
  • XML介绍与使用的概述及标签规则
    本文介绍了XML的基本概念和用途,包括XML的可扩展性和标签的自定义特性。同时还详细解释了XML标签的规则,包括标签的尖括号和合法标识符的组成,标签必须成对出现的原则以及特殊标签的使用方法。通过本文的阅读,读者可以对XML的基本知识有一个全面的了解。 ... [详细]
  • Go Cobra命令行工具入门教程
    本文介绍了Go语言实现的命令行工具Cobra的基本概念、安装方法和入门实践。Cobra被广泛应用于各种项目中,如Kubernetes、Hugo和Github CLI等。通过使用Cobra,我们可以快速创建命令行工具,适用于写测试脚本和各种服务的Admin CLI。文章还通过一个简单的demo演示了Cobra的使用方法。 ... [详细]
  • 十大经典排序算法动图演示+Python实现
    本文介绍了十大经典排序算法的原理、演示和Python实现。排序算法分为内部排序和外部排序,常见的内部排序算法有插入排序、希尔排序、选择排序、冒泡排序、归并排序、快速排序、堆排序、基数排序等。文章还解释了时间复杂度和稳定性的概念,并提供了相关的名词解释。 ... [详细]
  • 开源Keras Faster RCNN模型介绍及代码结构解析
    本文介绍了开源Keras Faster RCNN模型的环境需求和代码结构,包括FasterRCNN源码解析、RPN与classifier定义、data_generators.py文件的功能以及损失计算。同时提供了该模型的开源地址和安装所需的库。 ... [详细]
  • 花瓣|目标值_Compose 动画边学边做夏日彩虹
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了Compose动画边学边做-夏日彩虹相关的知识,希望对你有一定的参考价值。引言Comp ... [详细]
  • 微软头条实习生分享深度学习自学指南
    本文介绍了一位微软头条实习生自学深度学习的经验分享,包括学习资源推荐、重要基础知识的学习要点等。作者强调了学好Python和数学基础的重要性,并提供了一些建议。 ... [详细]
  • 安装mysqlclient失败解决办法
    本文介绍了在MAC系统中,使用django使用mysql数据库报错的解决办法。通过源码安装mysqlclient或将mysql_config添加到系统环境变量中,可以解决安装mysqlclient失败的问题。同时,还介绍了查看mysql安装路径和使配置文件生效的方法。 ... [详细]
  • EPICS Archiver Appliance存储waveform记录的尝试及资源需求分析
    本文介绍了EPICS Archiver Appliance存储waveform记录的尝试过程,并分析了其所需的资源容量。通过解决错误提示和调整内存大小,成功存储了波形数据。然后,讨论了储存环逐束团信号的意义,以及通过记录多圈的束团信号进行参数分析的可能性。波形数据的存储需求巨大,每天需要近250G,一年需要90T。然而,储存环逐束团信号具有重要意义,可以揭示出每个束团的纵向振荡频率和模式。 ... [详细]
  • 本文介绍了在开发Android新闻App时,搭建本地服务器的步骤。通过使用XAMPP软件,可以一键式搭建起开发环境,包括Apache、MySQL、PHP、PERL。在本地服务器上新建数据库和表,并设置相应的属性。最后,给出了创建new表的SQL语句。这个教程适合初学者参考。 ... [详细]
  • 云原生边缘计算之KubeEdge简介及功能特点
    本文介绍了云原生边缘计算中的KubeEdge系统,该系统是一个开源系统,用于将容器化应用程序编排功能扩展到Edge的主机。它基于Kubernetes构建,并为网络应用程序提供基础架构支持。同时,KubeEdge具有离线模式、基于Kubernetes的节点、群集、应用程序和设备管理、资源优化等特点。此外,KubeEdge还支持跨平台工作,在私有、公共和混合云中都可以运行。同时,KubeEdge还提供数据管理和数据分析管道引擎的支持。最后,本文还介绍了KubeEdge系统生成证书的方法。 ... [详细]
  • 使用Ubuntu中的Python获取浏览器历史记录原文: ... [详细]
  • CF:3D City Model(小思维)问题解析和代码实现
    本文通过解析CF:3D City Model问题,介绍了问题的背景和要求,并给出了相应的代码实现。该问题涉及到在一个矩形的网格上建造城市的情景,每个网格单元可以作为建筑的基础,建筑由多个立方体叠加而成。文章详细讲解了问题的解决思路,并给出了相应的代码实现供读者参考。 ... [详细]
author-avatar
mobiledu2502857407
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有