热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

flink项目开发flink的scalashell命令行交互模式开发

flink的scalashell命令行交互模式开发flink带有一个集成的scalashell命令行。它可以以本地方式启动来模拟集群集群。执行下面的命令就可以通过shel
flink的 scala shell命令行交互模式开发

flink带有一个集成的scala shell命令行。它可以以本地方式启动来模拟集群集群。执行下面的命令就可以通过shell命令行和flink集群交互(这种方式方便于代码调试):

 

bin/start-scala-shell.sh local

如果想在集群上面运行scala shell,请查看本节后面的内容。

 

flink scala shell 用法

shell方式支持流处理和批处理。当启动shell命令行之后,两个不同的ExecutionEnvironments会被自动创建。使用benv和senv分别去处理批处理和流处理程序。(类似于spark-shell中sc变量)

DataSet API

下面的例子将会在scala shell中执行wordcount程序

Scala-Flink> val text = benv.fromElements(
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,")
Scala-Flink> val counts = text
.flatMap { _.toLowerCase.split("\\W+") }
.map { (_, 1) }.groupBy(0).sum(1)
Scala-Flink> counts.print()

print()命令会自定发送指定的任务到jobmanager去执行,并且会将结果显示在控制台。

也可以吧结果写到一个文件中,然而,在这种情况下,你需要执行execute方法,去执行你的程序

Scala-Flink> benv.execute("MyProgram")

DataStream API

类似于上面的批处理程序,我们可以通过DataStream API执行一个流处理程序。

Scala-Flink> val textStreaming = senv.fromElements(
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,")
Scala-Flink> val countsStreaming = textStreaming
.flatMap { _.toLowerCase.split("\\W+") }
.map { (_, 1) }.keyBy(0).sum(1)
Scala-Flink> countsStreaming.print()
Scala-Flink> senv.execute("Streaming Wordcount")

注意:在流处理情况下,print方法不会触发执行。需要调用execute方法才会真正执行。

flink shell会自动带有命令执行历史。

在shell命令行模式下添加外部依赖

可以将外部类路径添加到scala-shell中,当程序被调用的时候,这些外部依赖会自动的被发送到jobmanager上。

使用这个参数 -a 或者 --addclasspath  添加额外的依赖。

bin/start-scala-shell.sh [local | remote | yarn] --addclasspath

 

flink scala shell设置

查看scala shell模式提供的选型,可以执行这个命令:

bin/start-scala-shell.sh --help

 

本地模式local

 

使用shell连接一个本地集成的flink 集群 使用下面命令

bin/start-scala-shell.sh local

 

远程模式remote

 

使用scala shell 连接一个远程集群,使用host和port参数去连接指定的jobmanager

bin/start-scala-shell.sh remote

 

集群模式 yarn scala shell cluster

 

可以通过scala shell在yarn上启动一个专有的flink集群,yarn containers的数量可以通过参数-n 指定。shell在yarn上部署了一个新的集群并且连接到这个集群。你也可以指定集群的参数,例如:指定jobmanager的内存,yarn application的名称 等等。

例如:针对scala shell启动一个yarn集群包含两个taskmanager,使用下面的参数:

bin/start-scala-shell.sh yarn -n 2

针对所有的参数选项,可以在本节的最后查看完整的说明

yarn session模式

如果你之前已经使用flink yarn session模式启动了一个flink集群,scala shell可以使用下面的命令进行连接:

bin/start-scala-shell.sh yarn

 

完整的参数选项

 

Flink Scala Shell
用法: start-scala-shell.sh [local|remote|yarn] [options] ...
命令: local [options]
使用scala shell连接一个本地flink集群
-a | --addclasspath
指定flink使用的第三方依赖
命令: remote [options]
启动scala shell连接一个远程集群

主机名

端口号
-a | --addclasspath
指定flink使用的第三方依赖
命令: yarn [options]
使用flink连接一个yarn集群
-n arg | --container arg
分配的yarn container的数量 (等于TaskManagers的数量)
-jm arg | --jobManagerMemory arg
JobManager container 的内存[in MB]
-nm | --name
在YARN上给应用设置一个名字
-qu | --queue
指定YARN队列
-s | --slots
指定每个TaskManager的slot数量
-tm | --taskManagerMemory
TaskManager container的内存 [in MB]
-a | --addclasspath
指定flink使用的第三方jar
--configDir
配置文件目录.
-h | --help
打印帮助信息

 

获取更多大数据资料,视频以及技术交流请加群:

 

 


推荐阅读
  • 流处理中的计数挑战与解决方案
    本文探讨了在流处理中进行计数的各种技术和挑战,并基于作者在2016年圣何塞举行的Hadoop World大会上的演讲进行了深入分析。文章不仅介绍了传统批处理和Lambda架构的局限性,还详细探讨了流处理架构的优势及其在现代大数据应用中的重要作用。 ... [详细]
  • 本文探讨了在使用Apache Flink向Kafka发送数据过程中遇到的事务频繁失败问题,并提供了详细的解决方案,包括必要的配置调整和最佳实践。 ... [详细]
  • 本文将介绍如何编写一些有趣的VBScript脚本,这些脚本可以在朋友之间进行无害的恶作剧。通过简单的代码示例,帮助您了解VBScript的基本语法和功能。 ... [详细]
  • 优化ListView性能
    本文深入探讨了如何通过多种技术手段优化ListView的性能,包括视图复用、ViewHolder模式、分批加载数据、图片优化及内存管理等。这些方法能够显著提升应用的响应速度和用户体验。 ... [详细]
  • Explore a common issue encountered when implementing an OAuth 1.0a API, specifically the inability to encode null objects and how to resolve it. ... [详细]
  • 技术分享:从动态网站提取站点密钥的解决方案
    本文探讨了如何从动态网站中提取站点密钥,特别是针对验证码(reCAPTCHA)的处理方法。通过结合Selenium和requests库,提供了详细的代码示例和优化建议。 ... [详细]
  • 本文详细介绍了如何在Linux系统上安装和配置Smokeping,以实现对网络链路质量的实时监控。通过详细的步骤和必要的依赖包安装,确保用户能够顺利完成部署并优化其网络性能监控。 ... [详细]
  • c# – UWP:BrightnessOverride StartOverride逻辑 ... [详细]
  • 本文详细介绍了如何使用 Yii2 的 GridView 组件在列表页面实现数据的直接编辑功能。通过具体的代码示例和步骤,帮助开发者快速掌握这一实用技巧。 ... [详细]
  • 全面解读Apache Flink的核心架构与优势
    Apache Flink作为大数据处理领域的新兴力量,凭借其独特的流处理能力和高效的批处理性能,迅速获得了广泛的关注。本文旨在深入探讨Flink的关键技术特点及其应用场景,为大数据处理提供新的视角。 ... [详细]
  • 如何在U8系统中连接服务器并获取数据
    本文介绍了如何在U8系统中通过不同的方法连接服务器并获取数据,包括使用MySQL客户端连接实例的方法,如非SSL连接和SSL连接,并提供了详细的步骤和注意事项。 ... [详细]
  • 从0到1搭建大数据平台
    从0到1搭建大数据平台 ... [详细]
  • 优化ASM字节码操作:简化类转换与移除冗余指令
    本文探讨如何利用ASM框架进行字节码操作,以优化现有类的转换过程,简化复杂的转换逻辑,并移除不必要的加0操作。通过这些技术手段,可以显著提升代码性能和可维护性。 ... [详细]
  • 资源推荐 | TensorFlow官方中文教程助力英语非母语者学习
    来源:机器之心。本文详细介绍了TensorFlow官方提供的中文版教程和指南,帮助开发者更好地理解和应用这一强大的开源机器学习平台。 ... [详细]
  • Flink1.10定义UDAGG遇到SQL
    按照以下代码测试定义的UDAGG会一直出现org.apache.flink.table.api.ValidationException:SQLvalidationfailed.nu ... [详细]
author-avatar
流浪汉中的小男人
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有