热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

flink项目开发flink的scalashell命令行交互模式开发

flink的scalashell命令行交互模式开发flink带有一个集成的scalashell命令行。它可以以本地方式启动来模拟集群集群。执行下面的命令就可以通过shel
flink的 scala shell命令行交互模式开发

flink带有一个集成的scala shell命令行。它可以以本地方式启动来模拟集群集群。执行下面的命令就可以通过shell命令行和flink集群交互(这种方式方便于代码调试):

 

bin/start-scala-shell.sh local

如果想在集群上面运行scala shell,请查看本节后面的内容。

 

flink scala shell 用法

shell方式支持流处理和批处理。当启动shell命令行之后,两个不同的ExecutionEnvironments会被自动创建。使用benv和senv分别去处理批处理和流处理程序。(类似于spark-shell中sc变量)

DataSet API

下面的例子将会在scala shell中执行wordcount程序

Scala-Flink> val text = benv.fromElements(
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,")
Scala-Flink> val counts = text
.flatMap { _.toLowerCase.split("\\W+") }
.map { (_, 1) }.groupBy(0).sum(1)
Scala-Flink> counts.print()

print()命令会自定发送指定的任务到jobmanager去执行,并且会将结果显示在控制台。

也可以吧结果写到一个文件中,然而,在这种情况下,你需要执行execute方法,去执行你的程序

Scala-Flink> benv.execute("MyProgram")

DataStream API

类似于上面的批处理程序,我们可以通过DataStream API执行一个流处理程序。

Scala-Flink> val textStreaming = senv.fromElements(
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,")
Scala-Flink> val countsStreaming = textStreaming
.flatMap { _.toLowerCase.split("\\W+") }
.map { (_, 1) }.keyBy(0).sum(1)
Scala-Flink> countsStreaming.print()
Scala-Flink> senv.execute("Streaming Wordcount")

注意:在流处理情况下,print方法不会触发执行。需要调用execute方法才会真正执行。

flink shell会自动带有命令执行历史。

在shell命令行模式下添加外部依赖

可以将外部类路径添加到scala-shell中,当程序被调用的时候,这些外部依赖会自动的被发送到jobmanager上。

使用这个参数 -a 或者 --addclasspath  添加额外的依赖。

bin/start-scala-shell.sh [local | remote | yarn] --addclasspath

 

flink scala shell设置

查看scala shell模式提供的选型,可以执行这个命令:

bin/start-scala-shell.sh --help

 

本地模式local

 

使用shell连接一个本地集成的flink 集群 使用下面命令

bin/start-scala-shell.sh local

 

远程模式remote

 

使用scala shell 连接一个远程集群,使用host和port参数去连接指定的jobmanager

bin/start-scala-shell.sh remote

 

集群模式 yarn scala shell cluster

 

可以通过scala shell在yarn上启动一个专有的flink集群,yarn containers的数量可以通过参数-n 指定。shell在yarn上部署了一个新的集群并且连接到这个集群。你也可以指定集群的参数,例如:指定jobmanager的内存,yarn application的名称 等等。

例如:针对scala shell启动一个yarn集群包含两个taskmanager,使用下面的参数:

bin/start-scala-shell.sh yarn -n 2

针对所有的参数选项,可以在本节的最后查看完整的说明

yarn session模式

如果你之前已经使用flink yarn session模式启动了一个flink集群,scala shell可以使用下面的命令进行连接:

bin/start-scala-shell.sh yarn

 

完整的参数选项

 

Flink Scala Shell
用法: start-scala-shell.sh [local|remote|yarn] [options] ...
命令: local [options]
使用scala shell连接一个本地flink集群
-a | --addclasspath
指定flink使用的第三方依赖
命令: remote [options]
启动scala shell连接一个远程集群

主机名

端口号
-a | --addclasspath
指定flink使用的第三方依赖
命令: yarn [options]
使用flink连接一个yarn集群
-n arg | --container arg
分配的yarn container的数量 (等于TaskManagers的数量)
-jm arg | --jobManagerMemory arg
JobManager container 的内存[in MB]
-nm | --name
在YARN上给应用设置一个名字
-qu | --queue
指定YARN队列
-s | --slots
指定每个TaskManager的slot数量
-tm | --taskManagerMemory
TaskManager container的内存 [in MB]
-a | --addclasspath
指定flink使用的第三方jar
--configDir
配置文件目录.
-h | --help
打印帮助信息

 

获取更多大数据资料,视频以及技术交流请加群:

 

 


推荐阅读
  • Spring框架的核心组件与架构解析 ... [详细]
  • 在本地环境中部署了两个不同版本的 Flink 集群,分别为 1.9.1 和 1.9.2。近期在尝试启动 1.9.1 版本的 Flink 任务时,遇到了 TaskExecutor 启动失败的问题。尽管 TaskManager 日志显示正常,但任务仍无法成功启动。经过详细分析,发现该问题是由 Kafka 版本不兼容引起的。通过调整 Kafka 客户端配置并升级相关依赖,最终成功解决了这一故障。 ... [详细]
  • 技术日志:深入探讨Spark Streaming与Spark SQL的融合应用
    技术日志:深入探讨Spark Streaming与Spark SQL的融合应用 ... [详细]
  • oracle c3p0 dword 60,web_day10 dbcp c3p0 dbutils
    createdatabasemydbcharactersetutf8;alertdatabasemydbcharactersetutf8;1.自定义连接池为了不去经常创建连接和释放 ... [详细]
  • 在JavaWeb开发中,文件上传是一个常见的需求。无论是通过表单还是其他方式上传文件,都必须使用POST请求。前端部分通常采用HTML表单来实现文件选择和提交功能。后端则利用Apache Commons FileUpload库来处理上传的文件,该库提供了强大的文件解析和存储能力,能够高效地处理各种文件类型。此外,为了提高系统的安全性和稳定性,还需要对上传文件的大小、格式等进行严格的校验和限制。 ... [详细]
  • 在Android平台中,播放音频的采样率通常固定为44.1kHz,而录音的采样率则固定为8kHz。为了确保音频设备的正常工作,底层驱动必须预先设定这些固定的采样率。当上层应用提供的采样率与这些预设值不匹配时,需要通过重采样(resample)技术来调整采样率,以保证音频数据的正确处理和传输。本文将详细探讨FFMpeg在音频处理中的基础理论及重采样技术的应用。 ... [详细]
  • 使用Maven JAR插件将单个或多个文件及其依赖项合并为一个可引用的JAR包
    本文介绍了如何利用Maven中的maven-assembly-plugin插件将单个或多个Java文件及其依赖项打包成一个可引用的JAR文件。首先,需要创建一个新的Maven项目,并将待打包的Java文件复制到该项目中。通过配置maven-assembly-plugin,可以实现将所有文件及其依赖项合并为一个独立的JAR包,方便在其他项目中引用和使用。此外,该方法还支持自定义装配描述符,以满足不同场景下的需求。 ... [详细]
  • 本指南从零开始介绍Scala编程语言的基础知识,重点讲解了Scala解释器REPL(读取-求值-打印-循环)的使用方法。REPL是Scala开发中的重要工具,能够帮助初学者快速理解和实践Scala的基本语法和特性。通过详细的示例和练习,读者将能够熟练掌握Scala的基础概念和编程技巧。 ... [详细]
  • 投融资周报 | Circle 达成 4 亿美元融资协议,唯一艺术平台 A 轮融资超千万美元 ... [详细]
  • 在第二课中,我们将深入探讨Scala的面向对象编程核心概念及其在Spark源码中的应用。首先,通过详细的实战案例,全面解析Scala中的类和对象。作为一门纯面向对象的语言,Scala的类设计和对象使用是理解其面向对象特性的关键。此外,我们还将介绍如何通过阅读Spark源码来进一步巩固对这些概念的理解。这不仅有助于提升编程技能,还能为后续的高级应用开发打下坚实的基础。 ... [详细]
  • SparkMLlib提供了一些基本的统计学的算法,下面主要说明一下:1、Summarystatistics对于RDD[Vector]类型,SparkMLlib提供了colStats ... [详细]
  • 工作原理_一文理解 Spark 基础概念及工作原理
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了一文理解Spark基础概念及工作原理相关的知识,希望对你有一定的参考价值。 ... [详细]
  • 第二章:Kafka基础入门与核心概念解析
    本章节主要介绍了Kafka的基本概念及其核心特性。Kafka是一种分布式消息发布和订阅系统,以其卓越的性能和高吞吐量而著称。最初,Kafka被设计用于LinkedIn的活动流和运营数据处理,旨在高效地管理和传输大规模的数据流。这些数据主要包括用户活动记录、系统日志和其他实时信息。通过深入解析Kafka的设计原理和应用场景,读者将能够更好地理解其在现代大数据架构中的重要地位。 ... [详细]
  • 构建高可用性Spark分布式集群:大数据环境下的最佳实践
    在构建高可用性的Spark分布式集群过程中,确保所有节点之间的无密码登录是至关重要的一步。通过在每个节点上生成SSH密钥对(使用 `ssh-keygen -t rsa` 命令并保持默认设置),可以实现这一目标。此外,还需将生成的公钥分发到所有节点的 `~/.ssh/authorized_keys` 文件中,以确保节点间的无缝通信。为了进一步提升集群的稳定性和性能,建议采用负载均衡和故障恢复机制,并定期进行系统监控和维护。 ... [详细]
  • 如何在Spark数据排序过程中有效避免内存溢出(OOM)问题
    本文深入探讨了在使用Spark进行数据排序时如何有效预防内存溢出(OOM)问题。通过具体的代码示例,详细阐述了优化策略和技术手段,为读者在实际工作中遇到类似问题提供了宝贵的参考和指导。 ... [详细]
author-avatar
流浪汉中的小男人
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有