热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Flink学习2安装和启动

Flink学习2-安装和启动

0x00 系列文章目录

  1. Flink学习1-基础概念
  2. Flink学习2-安装和启动
  3. Flink系列3-API介绍

0x01 摘要

本篇文章主要讲解Flink下载、安装和启动的步骤。

0x02 下载

关于下载的更多信息可参考Flink官网

如果是用的MacOS X,可以直接用Homebrew安装:

brew install apache-flink

当前最新稳定的版本是v1.6.1。Flink可以不依赖Hadoop,但我们环境中要把Flink跑在Yarn上,所以需要下载Flink With Hadoop的版本的tgz包:

  • Flink with Hadoop® 2.7-binary
  • Flink with Hadoop® 2.6-binary
  • Flink 1.6.1-source

0x03 安装

只需直接解压即可

 $ tar -zxvf flink-1.6.1-bin-hadoop27-scala_2.11.tgz
 $ cd flink-1.6.1
 $ bin/flink --version
 Version: 1.6.1, Commit ID: 23e2636

懒人可以设置一个PATH,以便以后在任意路径可以直接使用flink命令:

$ vim ~/.bash_profile
# 增加以下内容
PATH="/Users/chengc/cc/apps/flink-1.6.1/bin:${PATH}"
export PATH

保存后可以试试看:

$ flink -v
Version: 1.6.1, Commit ID: 23e2636

0x04 Flink集群启动

4.1 Flink集群的启动

通过简单命令就能在本地启动一个Flink集群

$ ./bin/start-cluster.sh 
Starting cluster.
Starting standalonesession daemon on host chengcdeMacBook-Pro.local.
Starting taskexecutor daemon on host chengcdeMacBook-Pro.local.

看到以上信息代表Flink启动成功,我们可以通过jps来看看启动了哪些进程:

$ jps
70673 TaskManagerRunner
70261 StandaloneSessionClusterEntrypoint
70678 Jps
69647 Launcher
69646 NailgunRunner

可以看到分别启动了好几个Flink的重要组件,如果你看了第一章应该了解他们的作用。

4.2 Flink监控页面

我们可以通过访问http://localhost:8081看看效果:
Flink学习2-安装和启动
可以从flink的web界面上看到现在运行了一个Task Manager实例。

4.3 Flink集群日志

还可以通过查看日志看到flink服务器正常启动:

tail -100f log/flink-*-standalonesession-*.log

4.4 Flink集群的停止

通过简单命令就能停止Flink集群:

$ ./bin/stop-cluster.sh

0x05 示例

5.1 Maven

以下的依赖分为Java版和Scala版。这些依赖包括Flink本地运行环境所以可以在本地运行调试我们的Flink代码。

5.1.1 For Java

<dependency>
  <groupId>org.apache.flinkgroupId>
  <artifactId>flink-javaartifactId>
  <version>1.6.1version>
dependency>
<dependency>
  <groupId>org.apache.flinkgroupId>
  <artifactId>flink-streaming-java_2.11artifactId>
  <version>1.6.1version>
dependency>
<dependency>
  <groupId>org.apache.flinkgroupId>
  <artifactId>flink-clients_2.11artifactId>
  <version>1.6.1version>
dependency>

5.1.2 For Scala

<dependency>
  <groupId>org.apache.flinkgroupId>
  <artifactId>flink-scala_2.11artifactId>
  <version>1.6.1version>
dependency>
<dependency>
  <groupId>org.apache.flinkgroupId>
  <artifactId>flink-streaming-scala_2.11artifactId>
  <version>1.6.1version>
dependency>
<dependency>
  <groupId>org.apache.flinkgroupId>
  <artifactId>flink-clients_2.11artifactId>
  <version>1.6.1version>
dependency>

5.2 Code

5.2.1 Java

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

/**
 * Implements a streaming windowed version of the "WordCount" program.
 *
 * 

This program connects to a server socket and reads strings from the socket. * The easiest way to try this out is to open a text server (at port 12345) * using the netcat tool via *

 * nc -l 12345
 * 
* and run this example with the hostname and the port as arguments. */
@SuppressWarnings("serial") public class SocketWindowWordCount { public static void main(String[] args) throws Exception { // the host and the port to connect to final String hostname; final int port; try { final ParameterTool params = ParameterTool.fromArgs(args); hostname = params.has("hostname") ? params.get("hostname") : "localhost"; port = params.getInt("port"); } catch (Exception e) { System.err.println("No port specified. Please run 'SocketWindowWordCount " + "--hostname --port ', where hostname (localhost by default) " + "and port is the address of the text server"); System.err.println("To start a simple text server, run 'netcat -l ' and " + "type the input text into the command line"); return; } // get the execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // get input data by connecting to the socket DataStream<String> text = env.socketTextStream(hostname, port, "\n"); // parse the data, group it, window it, and aggregate the counts DataStream<WordWithCount> windowCounts = text .flatMap(new FlatMapFunction<String, WordWithCount>() { @Override public void flatMap(String value, Collector<WordWithCount> out) { for (String word : value.split("\\s")) { out.collect(new WordWithCount(word, 1L)); } } }) .keyBy("word") .timeWindow(Time.seconds(5)) .reduce(new ReduceFunction<WordWithCount>() { @Override public WordWithCount reduce(WordWithCount a, WordWithCount b) { return new WordWithCount(a.word, a.count + b.count); } }); // print the results with a single thread, rather than in parallel windowCounts.print().setParallelism(1); env.execute("Socket Window WordCount"); } // ------------------------------------------------------------------------ /** * Data type for words with count. */ public static class WordWithCount { public String word; public long count; public WordWithCount() {} public WordWithCount(String word, long count) { this.word = word; this.count = count; } @Override public String toString() { return word + " : " + count; } } }

5.2.2 scala

import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

/**
 * Implements a streaming windowed version of the "WordCount" program.
 * 
 * This program connects to a server socket and reads strings from the socket.
 * The easiest way to try this out is to open a text sever (at port 12345) 
 * using the ''netcat'' tool via
 * {{{
 * nc -l 12345
 * }}}
 * and run this example with the hostname and the port as arguments..
 */
object SocketWindowWordCount {

  /** Main program method */
  def main(args: Array[String]) : Unit = {

    // the host and the port to connect to
    var hostname: String = "localhost"
    var port: Int = 0

    try {
      val params = ParameterTool.fromArgs(args)
      hostname = if (params.has("hostname")) params.get("hostname") else "localhost"
      port = params.getInt("port")
    } catch {
      case e: Exception => {
        System.err.println("No port specified. Please run 'SocketWindowWordCount " +
          "--hostname  --port ', where hostname (localhost by default) and port " +
          "is the address of the text server")
        System.err.println("To start a simple text server, run 'netcat -l ' " +
          "and type the input text into the command line")
        return
      }
    }
    
    // get the execution environment
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    
    // get input data by connecting to the socket
    val text: DataStream[String] = env.socketTextStream(hostname, port, '\n')

    // parse the data, group it, window it, and aggregate the counts 
    val windowCounts = text
          .flatMap { w => w.split("\\s") }
          .map { w => WordWithCount(w, 1) }
          .keyBy("word")
          .timeWindow(Time.seconds(5))
          .sum("count")

    // print the results with a single thread, rather than in parallel
    windowCounts.print().setParallelism(1)

    env.execute("Socket Window WordCount")
  }

  /** Data type for words with count */
  case class WordWithCount(word: String, count: Long)
}

5.3 打包

将文件打包为jar

flink-demo-1.0-SNAPSHOT-jar-with-dependencies.jar

5.4 启动示例程序

以上代码所写的程序功能是从socket中读取文本,然后每隔5秒打印出每个单词在当前时间往前推5秒的时间窗口内的出现次数。

5.4.1 启动netcat

9999端口启动本地netcat服务:

$ nc -l 9999

5.4.2 提交Flink应用

$ flink run /Users/chengc/cc/work/projects/flink-demo/target/SocketWindowWordCount-jar-with-dependencies.jar --port 9999
# 看到控制台输出以下信息代表任务提交成功
Starting execution of program

现在我们看看前面提到过的flink web界面:
Flink学习2-安装和启动

点击这行job信息能看到job详情页:
Flink学习2-安装和启动

5.4.3 测试Flink应用

通过以上步骤我们建立了Flink应用和9999端口的关系,现在我们试试再nc界面输入一些字符串:

$ nc -lk 9999
i am a chinese
who are you
how do you do
how do you do

与此同时,我们使用tailf 查看flink 应用的输出:

$  tail -f log/flink-*-taskexecutor-*.out
i : 1
chine : 1
a : 1
am : 1
who : 1
you : 1
are : 1
how : 2
you : 2
do : 4

可以看到 ,示例程序以翻滚窗口(tumbling window)的形式每隔5秒将前5秒的数据进行了字符统计。

0xFE 总结

本篇文章主要讲了下Flink的安装和示例程序的提交,希望大家有所收获。

下一章我们学习下Flink的API,看看Flink作者是怎么抽象API的:
Flink系列3-API介绍

0xFF 参考文档

Flink-Quickstart


推荐阅读
  • 全面解读Apache Flink的核心架构与优势
    Apache Flink作为大数据处理领域的新兴力量,凭借其独特的流处理能力和高效的批处理性能,迅速获得了广泛的关注。本文旨在深入探讨Flink的关键技术特点及其应用场景,为大数据处理提供新的视角。 ... [详细]
  • 本文详细介绍了在Mac操作系统中使用Python连接MySQL数据库的方法,包括常见的错误处理及解决方案。 ... [详细]
  • 本文详细分析了Hive在启动过程中遇到的权限拒绝错误,并提供了多种解决方案,包括调整文件权限、用户组设置以及环境变量配置等。 ... [详细]
  • 本文探讨了如何优化和正确配置Kafka Streams应用程序以确保准确的状态存储查询。通过调整配置参数和代码逻辑,可以有效解决数据不一致的问题。 ... [详细]
  • 在现代网络环境中,两台计算机之间的文件传输需求日益增长。传统的FTP和SSH方式虽然有效,但其配置复杂、步骤繁琐,难以满足快速且安全的传输需求。本文将介绍一种基于Go语言开发的新一代文件传输工具——Croc,它不仅简化了操作流程,还提供了强大的加密和跨平台支持。 ... [详细]
  • 深入解析 Apache Shiro 安全框架架构
    本文详细介绍了 Apache Shiro,一个强大且灵活的开源安全框架。Shiro 专注于简化身份验证、授权、会话管理和加密等复杂的安全操作,使开发者能够更轻松地保护应用程序。其核心目标是提供易于使用和理解的API,同时确保高度的安全性和灵活性。 ... [详细]
  • 本文详细介绍了 Flink 和 YARN 的交互机制。YARN 是 Hadoop 生态系统中的资源管理组件,类似于 Spark on YARN 的配置方式。我们将基于官方文档,深入探讨如何在 YARN 上部署和运行 Flink 任务。 ... [详细]
  • 深入解析Spark核心架构与部署策略
    本文详细探讨了Spark的核心架构,包括其运行机制、任务调度和内存管理等方面,以及四种主要的部署模式:Standalone、Apache Mesos、Hadoop YARN和Kubernetes。通过本文,读者可以深入了解Spark的工作原理及其在不同环境下的部署方式。 ... [详细]
  • 本文详细介绍了如何配置Apache Flume与Spark Streaming,实现高效的数据传输。文中提供了两种集成方案,旨在帮助用户根据具体需求选择最合适的配置方法。 ... [详细]
  • ANSI最全介绍linux终端字体改变颜色等ANSI转义序列维基百科,自由的百科全书由于国内不能访问wiki而且国内关于ANSI的介绍都是简短的不能达到,不够完整所以转wiki到此 ... [详细]
  • Hadoop MapReduce 实战案例:手机流量使用统计分析
    本文通过一个具体的Hadoop MapReduce案例,详细介绍了如何利用MapReduce框架来统计和分析手机用户的流量使用情况,包括上行和下行流量的计算以及总流量的汇总。 ... [详细]
  • 在开发过程中,我最初也依赖于功能全面但操作繁琐的集成开发环境(IDE),如Borland Delphi 和 Microsoft Visual Studio。然而,随着对高效开发的追求,我逐渐转向了更加轻量级和灵活的工具组合。通过 CLIfe,我构建了一个高度定制化的开发环境,不仅提高了代码编写效率,还简化了项目管理流程。这一配置结合了多种强大的命令行工具和插件,使我在日常开发中能够更加得心应手。 ... [详细]
  • 在 Go 开发环境中,通过使用 iTerm 和 Oh My Zsh,可以显著提升终端操作的效率和体验。Oh My Zsh 是一个强大的 Zsh 配置管理框架,提供了丰富的插件支持,如代码高亮、自动补全和多种编程语言支持,同时还拥有众多美观的主题,使终端界面更加个性化和高效。Zsh 作为一种高度可定制的 shell,不仅适用于交互式应用,还可用作脚本解释器,集成了 bash、ksh 和 tcsh 等其他 shell 的诸多优点,并具备独特的功能特性。 ... [详细]
  • 在Java编程中,将字符串转换为整数类型时,必须确保该字符串表示的数值在int类型的取值范围内。如果超出范围,将会抛出异常。本文介绍如何安全地进行这种转换,并提供详细的代码示例。 ... [详细]
  • 实践指南:使用Express、Create React App与MongoDB搭建React开发环境
    本文详细介绍了如何利用Express、Create React App和MongoDB构建一个高效的React应用开发环境,旨在为开发者提供一套完整的解决方案,包括环境搭建、数据模拟及前后端交互。 ... [详细]
author-avatar
年庚瑶
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有