热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Flink学习2安装和启动

Flink学习2-安装和启动

0x00 系列文章目录

  1. Flink学习1-基础概念
  2. Flink学习2-安装和启动
  3. Flink系列3-API介绍

0x01 摘要

本篇文章主要讲解Flink下载、安装和启动的步骤。

0x02 下载

关于下载的更多信息可参考Flink官网

如果是用的MacOS X,可以直接用Homebrew安装:

brew install apache-flink

当前最新稳定的版本是v1.6.1。Flink可以不依赖Hadoop,但我们环境中要把Flink跑在Yarn上,所以需要下载Flink With Hadoop的版本的tgz包:

  • Flink with Hadoop® 2.7-binary
  • Flink with Hadoop® 2.6-binary
  • Flink 1.6.1-source

0x03 安装

只需直接解压即可

 $ tar -zxvf flink-1.6.1-bin-hadoop27-scala_2.11.tgz
 $ cd flink-1.6.1
 $ bin/flink --version
 Version: 1.6.1, Commit ID: 23e2636

懒人可以设置一个PATH,以便以后在任意路径可以直接使用flink命令:

$ vim ~/.bash_profile
# 增加以下内容
PATH="/Users/chengc/cc/apps/flink-1.6.1/bin:${PATH}"
export PATH

保存后可以试试看:

$ flink -v
Version: 1.6.1, Commit ID: 23e2636

0x04 Flink集群启动

4.1 Flink集群的启动

通过简单命令就能在本地启动一个Flink集群

$ ./bin/start-cluster.sh 
Starting cluster.
Starting standalonesession daemon on host chengcdeMacBook-Pro.local.
Starting taskexecutor daemon on host chengcdeMacBook-Pro.local.

看到以上信息代表Flink启动成功,我们可以通过jps来看看启动了哪些进程:

$ jps
70673 TaskManagerRunner
70261 StandaloneSessionClusterEntrypoint
70678 Jps
69647 Launcher
69646 NailgunRunner

可以看到分别启动了好几个Flink的重要组件,如果你看了第一章应该了解他们的作用。

4.2 Flink监控页面

我们可以通过访问http://localhost:8081看看效果:
Flink学习2-安装和启动
可以从flink的web界面上看到现在运行了一个Task Manager实例。

4.3 Flink集群日志

还可以通过查看日志看到flink服务器正常启动:

tail -100f log/flink-*-standalonesession-*.log

4.4 Flink集群的停止

通过简单命令就能停止Flink集群:

$ ./bin/stop-cluster.sh

0x05 示例

5.1 Maven

以下的依赖分为Java版和Scala版。这些依赖包括Flink本地运行环境所以可以在本地运行调试我们的Flink代码。

5.1.1 For Java

<dependency>
  <groupId>org.apache.flinkgroupId>
  <artifactId>flink-javaartifactId>
  <version>1.6.1version>
dependency>
<dependency>
  <groupId>org.apache.flinkgroupId>
  <artifactId>flink-streaming-java_2.11artifactId>
  <version>1.6.1version>
dependency>
<dependency>
  <groupId>org.apache.flinkgroupId>
  <artifactId>flink-clients_2.11artifactId>
  <version>1.6.1version>
dependency>

5.1.2 For Scala

<dependency>
  <groupId>org.apache.flinkgroupId>
  <artifactId>flink-scala_2.11artifactId>
  <version>1.6.1version>
dependency>
<dependency>
  <groupId>org.apache.flinkgroupId>
  <artifactId>flink-streaming-scala_2.11artifactId>
  <version>1.6.1version>
dependency>
<dependency>
  <groupId>org.apache.flinkgroupId>
  <artifactId>flink-clients_2.11artifactId>
  <version>1.6.1version>
dependency>

5.2 Code

5.2.1 Java

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

/**
 * Implements a streaming windowed version of the "WordCount" program.
 *
 * 

This program connects to a server socket and reads strings from the socket. * The easiest way to try this out is to open a text server (at port 12345) * using the netcat tool via *

 * nc -l 12345
 * 
* and run this example with the hostname and the port as arguments. */
@SuppressWarnings("serial") public class SocketWindowWordCount { public static void main(String[] args) throws Exception { // the host and the port to connect to final String hostname; final int port; try { final ParameterTool params = ParameterTool.fromArgs(args); hostname = params.has("hostname") ? params.get("hostname") : "localhost"; port = params.getInt("port"); } catch (Exception e) { System.err.println("No port specified. Please run 'SocketWindowWordCount " + "--hostname --port ', where hostname (localhost by default) " + "and port is the address of the text server"); System.err.println("To start a simple text server, run 'netcat -l ' and " + "type the input text into the command line"); return; } // get the execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // get input data by connecting to the socket DataStream<String> text = env.socketTextStream(hostname, port, "\n"); // parse the data, group it, window it, and aggregate the counts DataStream<WordWithCount> windowCounts = text .flatMap(new FlatMapFunction<String, WordWithCount>() { @Override public void flatMap(String value, Collector<WordWithCount> out) { for (String word : value.split("\\s")) { out.collect(new WordWithCount(word, 1L)); } } }) .keyBy("word") .timeWindow(Time.seconds(5)) .reduce(new ReduceFunction<WordWithCount>() { @Override public WordWithCount reduce(WordWithCount a, WordWithCount b) { return new WordWithCount(a.word, a.count + b.count); } }); // print the results with a single thread, rather than in parallel windowCounts.print().setParallelism(1); env.execute("Socket Window WordCount"); } // ------------------------------------------------------------------------ /** * Data type for words with count. */ public static class WordWithCount { public String word; public long count; public WordWithCount() {} public WordWithCount(String word, long count) { this.word = word; this.count = count; } @Override public String toString() { return word + " : " + count; } } }

5.2.2 scala

import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

/**
 * Implements a streaming windowed version of the "WordCount" program.
 * 
 * This program connects to a server socket and reads strings from the socket.
 * The easiest way to try this out is to open a text sever (at port 12345) 
 * using the ''netcat'' tool via
 * {{{
 * nc -l 12345
 * }}}
 * and run this example with the hostname and the port as arguments..
 */
object SocketWindowWordCount {

  /** Main program method */
  def main(args: Array[String]) : Unit = {

    // the host and the port to connect to
    var hostname: String = "localhost"
    var port: Int = 0

    try {
      val params = ParameterTool.fromArgs(args)
      hostname = if (params.has("hostname")) params.get("hostname") else "localhost"
      port = params.getInt("port")
    } catch {
      case e: Exception => {
        System.err.println("No port specified. Please run 'SocketWindowWordCount " +
          "--hostname  --port ', where hostname (localhost by default) and port " +
          "is the address of the text server")
        System.err.println("To start a simple text server, run 'netcat -l ' " +
          "and type the input text into the command line")
        return
      }
    }
    
    // get the execution environment
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    
    // get input data by connecting to the socket
    val text: DataStream[String] = env.socketTextStream(hostname, port, '\n')

    // parse the data, group it, window it, and aggregate the counts 
    val windowCounts = text
          .flatMap { w => w.split("\\s") }
          .map { w => WordWithCount(w, 1) }
          .keyBy("word")
          .timeWindow(Time.seconds(5))
          .sum("count")

    // print the results with a single thread, rather than in parallel
    windowCounts.print().setParallelism(1)

    env.execute("Socket Window WordCount")
  }

  /** Data type for words with count */
  case class WordWithCount(word: String, count: Long)
}

5.3 打包

将文件打包为jar

flink-demo-1.0-SNAPSHOT-jar-with-dependencies.jar

5.4 启动示例程序

以上代码所写的程序功能是从socket中读取文本,然后每隔5秒打印出每个单词在当前时间往前推5秒的时间窗口内的出现次数。

5.4.1 启动netcat

9999端口启动本地netcat服务:

$ nc -l 9999

5.4.2 提交Flink应用

$ flink run /Users/chengc/cc/work/projects/flink-demo/target/SocketWindowWordCount-jar-with-dependencies.jar --port 9999
# 看到控制台输出以下信息代表任务提交成功
Starting execution of program

现在我们看看前面提到过的flink web界面:
Flink学习2-安装和启动

点击这行job信息能看到job详情页:
Flink学习2-安装和启动

5.4.3 测试Flink应用

通过以上步骤我们建立了Flink应用和9999端口的关系,现在我们试试再nc界面输入一些字符串:

$ nc -lk 9999
i am a chinese
who are you
how do you do
how do you do

与此同时,我们使用tailf 查看flink 应用的输出:

$  tail -f log/flink-*-taskexecutor-*.out
i : 1
chine : 1
a : 1
am : 1
who : 1
you : 1
are : 1
how : 2
you : 2
do : 4

可以看到 ,示例程序以翻滚窗口(tumbling window)的形式每隔5秒将前5秒的数据进行了字符统计。

0xFE 总结

本篇文章主要讲了下Flink的安装和示例程序的提交,希望大家有所收获。

下一章我们学习下Flink的API,看看Flink作者是怎么抽象API的:
Flink系列3-API介绍

0xFF 参考文档

Flink-Quickstart


推荐阅读
  • 在本地环境中部署了两个不同版本的 Flink 集群,分别为 1.9.1 和 1.9.2。近期在尝试启动 1.9.1 版本的 Flink 任务时,遇到了 TaskExecutor 启动失败的问题。尽管 TaskManager 日志显示正常,但任务仍无法成功启动。经过详细分析,发现该问题是由 Kafka 版本不兼容引起的。通过调整 Kafka 客户端配置并升级相关依赖,最终成功解决了这一故障。 ... [详细]
  • 本文探讨了 Kafka 集群的高效部署与优化策略。首先介绍了 Kafka 的下载与安装步骤,包括从官方网站获取最新版本的压缩包并进行解压。随后详细讨论了集群配置的最佳实践,涵盖节点选择、网络优化和性能调优等方面,旨在提升系统的稳定性和处理能力。此外,还提供了常见的故障排查方法和监控方案,帮助运维人员更好地管理和维护 Kafka 集群。 ... [详细]
  • 在对WordPress Duplicator插件0.4.4版本的安全评估中,发现其存在跨站脚本(XSS)攻击漏洞。此漏洞可能被利用进行恶意操作,建议用户及时更新至最新版本以确保系统安全。测试方法仅限于安全研究和教学目的,使用时需自行承担风险。漏洞编号:HTB23162。 ... [详细]
  • 为了在Hadoop 2.7.2中实现对Snappy压缩和解压功能的原生支持,本文详细介绍了如何重新编译Hadoop源代码,并优化其Native编译过程。通过这一优化,可以显著提升数据处理的效率和性能。此外,还探讨了编译过程中可能遇到的问题及其解决方案,为用户提供了一套完整的操作指南。 ... [详细]
  • 在安装 iOS 开发所需的 CocoaPods 时,用户可能会遇到多种问题。其中一个常见问题是,在执行 `pod setup` 命令后,系统无法连接到 GitHub 以更新 CocoaPods/Specs 仓库。这可能是由于网络连接不稳定、GitHub 服务器暂时不可用或本地配置错误等原因导致。为解决此问题,建议检查网络连接、确保 GitHub API 限制未被触发,并验证本地配置文件是否正确。 ... [详细]
  • PHP预处理常量详解:如何定义与使用常量 ... [详细]
  • Amoeba 通过优化 MySQL 的读写分离功能显著提升了数据库性能。作为一款基于 MySQL 协议的代理工具,Amoeba 能够高效地处理应用程序的请求,并根据预设的规则将 SQL 请求智能地分配到不同的数据库实例,从而实现负载均衡和高可用性。该方案不仅提高了系统的并发处理能力,还有效减少了主数据库的负担,确保了数据的一致性和可靠性。 ... [详细]
  • 数字图书馆近期展出了一批精选的Linux经典著作,这些书籍虽然部分较为陈旧,但依然具有重要的参考价值。如需转载相关内容,请务必注明来源:小文论坛(http://www.xiaowenbbs.com)。 ... [详细]
  • 本文介绍了 Vue 开发的入门指南,重点讲解了开发环境的配置与项目的基本搭建。推荐使用 WebStorm 作为 IDE,其下载地址为 。安装时请选择适合您操作系统的版本,并通过 获取激活码。WebStorm 是前端开发者的理想选择,提供了丰富的功能和强大的代码编辑能力。 ... [详细]
  • 本文介绍了如何在 Windows 系统上利用 Docker 构建一个包含 NGINX、PHP、MySQL、Redis 和 Elasticsearch 的集成开发环境。通过详细的步骤说明,帮助开发者快速搭建和配置这一复杂的技术栈,提升开发效率和环境一致性。 ... [详细]
  • 本文探讨了 Java 中 Pair 类的历史与现状。虽然 Java 标准库中没有内置的 Pair 类,但社区和第三方库提供了多种实现方式,如 Apache Commons 的 Pair 类和 JavaFX 的 javafx.util.Pair 类。这些实现为需要处理成对数据的开发者提供了便利。此外,文章还讨论了为何标准库未包含 Pair 类的原因,以及在现代 Java 开发中使用 Pair 类的最佳实践。 ... [详细]
  • 本文详细介绍了在 Vue.js 前端框架中集成 vue-i18n 插件以实现多语言支持的方法。通过具体的配置步骤和示例代码,帮助开发者快速掌握如何在项目中实现国际化功能,提升用户体验。同时,文章还探讨了常见的多语言切换问题及解决方案,为开发人员提供了实用的参考。 ... [详细]
  • 本指南从零开始介绍Scala编程语言的基础知识,重点讲解了Scala解释器REPL(读取-求值-打印-循环)的使用方法。REPL是Scala开发中的重要工具,能够帮助初学者快速理解和实践Scala的基本语法和特性。通过详细的示例和练习,读者将能够熟练掌握Scala的基础概念和编程技巧。 ... [详细]
  • Squaretest:自动生成功能测试代码的高效插件
    本文将介绍一款名为Squaretest的高效插件,该工具能够自动生成功能测试代码。使用这款插件的主要原因是公司近期加强了代码质量的管控,对各项目进行了严格的单元测试评估。Squaretest不仅提高了测试代码的生成效率,还显著提升了代码的质量和可靠性。 ... [详细]
  • HBase Java API 进阶:过滤器详解与应用实例
    本文详细探讨了HBase 1.2.6版本中Java API的高级应用,重点介绍了过滤器的使用方法和实际案例。首先,文章对几种常见的HBase过滤器进行了概述,包括列前缀过滤器(ColumnPrefixFilter)和时间戳过滤器(TimestampsFilter)。此外,还详细讲解了分页过滤器(PageFilter)的实现原理及其在大数据查询中的应用场景。通过具体的代码示例,读者可以更好地理解和掌握这些过滤器的使用技巧,从而提高数据处理的效率和灵活性。 ... [详细]
author-avatar
年庚瑶
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有