热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

ApacheBeam的DockerDemo

文章目录1Overview2Docker部署Flink&Beam2.2BeamFlink3Summary1Overview参考文章:https:medium.com0x0ecea-


文章目录

  • 1 Overview
  • 2 Docker 部署 Flink & Beam
    • 2.2 Beam Flink
  • 3 Summary


1 Overview


参考文章: https://medium.com/@0x0ece/a-quick-demo-of-apache-beam-with-docker-da98b99a502a


Apache Beam 是什么?

Apache Beam 是统一的批/流数据处理的编程模型。本文主要是参考官方文档,用 Docker 来快速跑起来一个用 Beam 来构建的 Flink 程序来处理数据的 Demo。


2 Docker 部署 Flink & Beam

首先利用 Docker Compose 来将 Flink Cluster 跑起来。

git clone https://github.com/ecesena/docker-beam-flink.git
cd docker-beam-flink

然后大家可以看看文件夹的树状结构。

➜ docker-beam-flink git:(master) tree
.
├── LICENSE
├── README.md
├── base
│ ├── Dockerfile
│ └── supervisor.conf
├── beam-flink
│ ├── Dockerfile
│ └── config-flink-load-jar.sh
├── build.sh
├── docker-compose.yml
├── flink
│ ├── Dockerfile
│ ├── conf
│ │ ├── flink-conf.yaml
│ │ ├── log4j.properties
│ │ ├── logback-yarn.xml
│ │ ├── logback.xml
│ │ └── slaves
│ └── config-flink.sh
└── screenshots└── showplan.png

从文件结构看,项目中包含了三个 Dockerfile,其依赖的顺序可以是 base/Dockerfile -> flink/Dockerfile -> beam-flink/Dockerfile。

base 中的 Dockerfile 是 Ubuntu 的基础镜像,这里就不分析了。剩下的逐一分析一下,分析写在里 Dockerfile 里。

flink

FROM base# add passless key to ssh
RUN ssh-keygen -f ~/.ssh/id_rsa -t rsa -N ''
RUN cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys && chmod 600 ~/.ssh/*## 安装 Flink 1.0.3
RUN mkdir ~/downloads && cd ~/downloads && \wget -q -O - http://apache.mirrors.pair.com/flink/flink-1.0.3/flink-1.0.3-bin-hadoop26-scala_2.10.tgz | tar -zxvf - -C /usr/local/
RUN cd /usr/local && ln -s ./flink-1.0.3 flink# 设置 Dockerfile 的环境变量
ENV FLINK_HOME /usr/local/flink
ENV PATH $PATH:$FLINK_HOME/bin# 将 Flink 的一些配置放入镜像中
ADD conf/flink-conf.yaml /usr/local/flink/conf/
ADD config-flink.sh /usr/local/flink/bin/# 设置配置脚本的权限
RUN chmod +x /usr/local/flink/bin/config-flink.sh# 端口映射
EXPOSE 6123
EXPOSE 22CMD ["/usr/local/flink/bin/config-flink.sh", "taskmanager"]

beam-flink

# 从依赖的 flink 镜像开始构建镜像
FROM flink# 下载 beam-starter,可以先理解为一个预先写好的基于 Beam 的 Flink 作业
RUN curl -L https://github.com/ecesena/beam-starter/releases/download/v0.1/beam-starter-0.1.jar > /root/downloads/beam-starter-0.1.jar# 下载一段文本文件
RUN curl http://www.gutenberg.org/cache/epub/1128/pg1128.txt > /tmp/kinglear.txt# 将本地的文件复制到镜像的目录里
ADD config-flink-load-jar.sh /usr/local/flink/bin/# Flink 上传 jar 包的脚本
RUN chmod +x /usr/local/flink/bin/config-flink-load-jar.sh# 运行 taskmanager
CMD ["/usr/local/flink/bin/config-flink.sh", "taskmanager"]

以上 Dockerfile 其实很容易理解,就不赘述了。然后用 docker-compose 来运行 Flink。

docker-compose up -d

运行之后,可以看看 Docker 正在 Running 的容器就有了。

➜ docker-beam-flink git:(master) docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
2de232e58df8 dataradiant/beam-flink "/usr/local/flink/bi…" 6 hours ago Up 6 hours 6121-6123/tcp, 0.0.0.0:32768->22/tcp docker-beam-flink_taskmanager_1
98b52be9c56e dataradiant/beam-flink "/usr/local/flink/bi…" 6 hours ago Up 6 hours 6123/tcp, 0.0.0.0:220->22/tcp, 0.0.0.0:48080->8080/tcp, 0.0.0.0:48081->8081/tcp docker-beam-flink_jobmanager_1

现在呢,我们基于上面的项目已经运行起来一个 Flink 集群,接下来,我们用 beam 的 Flink Runner 来跑起来一个 Flink 程序。


2.2 Beam Flink

打开 Flink 的 Web UI,然后在 Submit new Job 去提交作业。

image_1db7il89uuttlgedjt1br2vc71p.png-113.4kB

按照上图提示,提交的 jar 包是我们打镜像文件的时候打进去的。关于这个项目,我们可以先看看目录结构。

├── LICENSE
├── README.md
├── pom.xml
└── src├── main│ └── java│ └── com│ └── dataradiant│ └── beam│ ├── App.java│ └── examples│ ├── StreamWordCount.java│ └── WordCount.java└── test└── java└── com└── dataradiant└── beam└── AppTest.java

所以其实很容易理解,这个示例工程,其实就是基于 Beam 来创建的一个 Flink WordCount 程序而已。关于 WordCount 程序,核心代码如下。

Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);// 选择 Flink 作为 Runner
options.setRunner(FlinkRunner.class);// 创建数据处理的 Pipeline
Pipeline p = Pipeline.create(options);p.apply("ReadLines", TextIO.Read.from(options.getInput()))// CountWords() ,其实就是计算词频的一个静态风法.apply(new CountWords())// 定义输出的格式.apply(MapElements.via(new FormatAsTextFn())).apply("WriteCounts", TextIO.Write.to(options.getOutput()));p.run();

3 Summary

本文就是一个具体的例子,展示了如何用 Beam 来构建 Flink 作业,并且用 Docker 来运行这个程序。


推荐阅读
  • 本文介绍了如何使用Flume从Linux文件系统收集日志并存储到HDFS,然后通过MapReduce清洗数据,使用Hive进行数据分析,并最终通过Sqoop将结果导出到MySQL数据库。 ... [详细]
  • 用阿里云的免费 SSL 证书让网站从 HTTP 换成 HTTPS
    HTTP协议是不加密传输数据的,也就是用户跟你的网站之间传递数据有可能在途中被截获,破解传递的真实内容,所以使用不加密的HTTP的网站是不 ... [详细]
  • 在软件开发过程中,经常需要将多个项目或模块进行集成和调试,尤其是当项目依赖于第三方开源库(如Cordova、CocoaPods)时。本文介绍了如何在Xcode中高效地进行多项目联合调试,分享了一些实用的技巧和最佳实践,帮助开发者解决常见的调试难题,提高开发效率。 ... [详细]
  • 基于Net Core 3.0与Web API的前后端分离开发:Vue.js在前端的应用
    本文介绍了如何使用Net Core 3.0和Web API进行前后端分离开发,并重点探讨了Vue.js在前端的应用。后端采用MySQL数据库和EF Core框架进行数据操作,开发环境为Windows 10和Visual Studio 2019,MySQL服务器版本为8.0.16。文章详细描述了API项目的创建过程、启动步骤以及必要的插件安装,为开发者提供了一套完整的开发指南。 ... [详细]
  • 本文深入探讨了如何利用Maven高效管理项目中的外部依赖库。通过介绍Maven的官方依赖搜索地址(),详细讲解了依赖库的添加、版本管理和冲突解决等关键操作。此外,还提供了实用的配置示例和最佳实践,帮助开发者优化项目构建流程,提高开发效率。 ... [详细]
  • 在本地环境中部署了两个不同版本的 Flink 集群,分别为 1.9.1 和 1.9.2。近期在尝试启动 1.9.1 版本的 Flink 任务时,遇到了 TaskExecutor 启动失败的问题。尽管 TaskManager 日志显示正常,但任务仍无法成功启动。经过详细分析,发现该问题是由 Kafka 版本不兼容引起的。通过调整 Kafka 客户端配置并升级相关依赖,最终成功解决了这一故障。 ... [详细]
  • 深入解析Django CBV模型的源码运行机制
    本文详细探讨了Django CBV(Class-Based Views)模型的源码运行流程,通过具体的示例代码和详细的解释,帮助读者更好地理解和应用这一强大的功能。 ... [详细]
  • 本文详细介绍了 HTML 中 a 标签的 href 属性的多种用法,包括实现超链接、锚点以及调用 JavaScript 方法。通过具体的示例和解释,帮助开发者更好地理解和应用这些技术。 ... [详细]
  • 微软推出Windows Terminal Preview v0.10
    微软近期发布了Windows Terminal Preview v0.10,用户可以在微软商店或GitHub上获取这一更新。该版本在2月份发布的v0.9基础上,新增了鼠标输入和复制Pane等功能。 ... [详细]
  • 解决Bootstrap DataTable Ajax请求重复问题
    在最近的一个项目中,我们使用了JQuery DataTable进行数据展示,虽然使用起来非常方便,但在测试过程中发现了一个问题:当查询条件改变时,有时查询结果的数据不正确。通过FireBug调试发现,点击搜索按钮时,会发送两次Ajax请求,一次是原条件的请求,一次是新条件的请求。 ... [详细]
  • XAMPP 遇到 404 错误:无法找到请求的对象
    在使用 XAMPP 时遇到 404 错误,表示请求的对象未找到。通过详细分析发现,该问题可能由以下原因引起:1. `httpd-vhosts.conf` 文件中的配置路径错误;2. `public` 目录下缺少 `.htaccess` 文件。建议检查并修正这些配置,以确保服务器能够正确识别和访问所需的文件路径。 ... [详细]
  • 在Java Web服务开发中,Apache CXF 和 Axis2 是两个广泛使用的框架。CXF 由于其与 Spring 框架的无缝集成能力,以及更简便的部署方式,成为了许多开发者的首选。本文将详细介绍如何使用 CXF 框架进行 Web 服务的开发,包括环境搭建、服务发布和客户端调用等关键步骤,为开发者提供一个全面的实践指南。 ... [详细]
  • C++ 开发实战:实用技巧与经验分享
    C++ 开发实战:实用技巧与经验分享 ... [详细]
  • Presto:高效即席查询引擎的深度解析与应用
    本文深入解析了Presto这一高效的即席查询引擎,详细探讨了其架构设计及其优缺点。Presto通过内存到内存的数据处理方式,显著提升了查询性能,相比传统的MapReduce查询,不仅减少了数据传输的延迟,还提高了查询的准确性和效率。然而,Presto在大规模数据处理和容错机制方面仍存在一定的局限性。本文还介绍了Presto在实际应用中的多种场景,展示了其在大数据分析领域的强大潜力。 ... [详细]
  • 在探讨Hibernate框架的高级特性时,缓存机制和懒加载策略是提升数据操作效率的关键要素。缓存策略能够显著减少数据库访问次数,从而提高应用性能,特别是在处理频繁访问的数据时。Hibernate提供了多层次的缓存支持,包括一级缓存和二级缓存,以满足不同场景下的需求。懒加载策略则通过按需加载关联对象,进一步优化了资源利用和响应时间。本文将深入分析这些机制的实现原理及其最佳实践。 ... [详细]
author-avatar
jinyan胡_435
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有