热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Flink学习2安装和启动

Flink学习2-安装和启动

0x00 系列文章目录

  1. Flink学习1-基础概念
  2. Flink学习2-安装和启动
  3. Flink系列3-API介绍

0x01 摘要

本篇文章主要讲解Flink下载、安装和启动的步骤。

0x02 下载

关于下载的更多信息可参考Flink官网

如果是用的MacOS X,可以直接用Homebrew安装:

brew install apache-flink

当前最新稳定的版本是v1.6.1。Flink可以不依赖Hadoop,但我们环境中要把Flink跑在Yarn上,所以需要下载Flink With Hadoop的版本的tgz包:

  • Flink with Hadoop® 2.7-binary
  • Flink with Hadoop® 2.6-binary
  • Flink 1.6.1-source

0x03 安装

只需直接解压即可

 $ tar -zxvf flink-1.6.1-bin-hadoop27-scala_2.11.tgz
 $ cd flink-1.6.1
 $ bin/flink --version
 Version: 1.6.1, Commit ID: 23e2636

懒人可以设置一个PATH,以便以后在任意路径可以直接使用flink命令:

$ vim ~/.bash_profile
# 增加以下内容
PATH="/Users/chengc/cc/apps/flink-1.6.1/bin:${PATH}"
export PATH

保存后可以试试看:

$ flink -v
Version: 1.6.1, Commit ID: 23e2636

0x04 Flink集群启动

4.1 Flink集群的启动

通过简单命令就能在本地启动一个Flink集群

$ ./bin/start-cluster.sh 
Starting cluster.
Starting standalonesession daemon on host chengcdeMacBook-Pro.local.
Starting taskexecutor daemon on host chengcdeMacBook-Pro.local.

看到以上信息代表Flink启动成功,我们可以通过jps来看看启动了哪些进程:

$ jps
70673 TaskManagerRunner
70261 StandaloneSessionClusterEntrypoint
70678 Jps
69647 Launcher
69646 NailgunRunner

可以看到分别启动了好几个Flink的重要组件,如果你看了第一章应该了解他们的作用。

4.2 Flink监控页面

我们可以通过访问http://localhost:8081看看效果:
Flink学习2-安装和启动
可以从flink的web界面上看到现在运行了一个Task Manager实例。

4.3 Flink集群日志

还可以通过查看日志看到flink服务器正常启动:

tail -100f log/flink-*-standalonesession-*.log

4.4 Flink集群的停止

通过简单命令就能停止Flink集群:

$ ./bin/stop-cluster.sh

0x05 示例

5.1 Maven

以下的依赖分为Java版和Scala版。这些依赖包括Flink本地运行环境所以可以在本地运行调试我们的Flink代码。

5.1.1 For Java

<dependency>
  <groupId>org.apache.flinkgroupId>
  <artifactId>flink-javaartifactId>
  <version>1.6.1version>
dependency>
<dependency>
  <groupId>org.apache.flinkgroupId>
  <artifactId>flink-streaming-java_2.11artifactId>
  <version>1.6.1version>
dependency>
<dependency>
  <groupId>org.apache.flinkgroupId>
  <artifactId>flink-clients_2.11artifactId>
  <version>1.6.1version>
dependency>

5.1.2 For Scala

<dependency>
  <groupId>org.apache.flinkgroupId>
  <artifactId>flink-scala_2.11artifactId>
  <version>1.6.1version>
dependency>
<dependency>
  <groupId>org.apache.flinkgroupId>
  <artifactId>flink-streaming-scala_2.11artifactId>
  <version>1.6.1version>
dependency>
<dependency>
  <groupId>org.apache.flinkgroupId>
  <artifactId>flink-clients_2.11artifactId>
  <version>1.6.1version>
dependency>

5.2 Code

5.2.1 Java

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

/**
 * Implements a streaming windowed version of the "WordCount" program.
 *
 * 

This program connects to a server socket and reads strings from the socket. * The easiest way to try this out is to open a text server (at port 12345) * using the netcat tool via *

 * nc -l 12345
 * 
* and run this example with the hostname and the port as arguments. */
@SuppressWarnings("serial") public class SocketWindowWordCount { public static void main(String[] args) throws Exception { // the host and the port to connect to final String hostname; final int port; try { final ParameterTool params = ParameterTool.fromArgs(args); hostname = params.has("hostname") ? params.get("hostname") : "localhost"; port = params.getInt("port"); } catch (Exception e) { System.err.println("No port specified. Please run 'SocketWindowWordCount " + "--hostname --port ', where hostname (localhost by default) " + "and port is the address of the text server"); System.err.println("To start a simple text server, run 'netcat -l ' and " + "type the input text into the command line"); return; } // get the execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // get input data by connecting to the socket DataStream<String> text = env.socketTextStream(hostname, port, "\n"); // parse the data, group it, window it, and aggregate the counts DataStream<WordWithCount> windowCounts = text .flatMap(new FlatMapFunction<String, WordWithCount>() { @Override public void flatMap(String value, Collector<WordWithCount> out) { for (String word : value.split("\\s")) { out.collect(new WordWithCount(word, 1L)); } } }) .keyBy("word") .timeWindow(Time.seconds(5)) .reduce(new ReduceFunction<WordWithCount>() { @Override public WordWithCount reduce(WordWithCount a, WordWithCount b) { return new WordWithCount(a.word, a.count + b.count); } }); // print the results with a single thread, rather than in parallel windowCounts.print().setParallelism(1); env.execute("Socket Window WordCount"); } // ------------------------------------------------------------------------ /** * Data type for words with count. */ public static class WordWithCount { public String word; public long count; public WordWithCount() {} public WordWithCount(String word, long count) { this.word = word; this.count = count; } @Override public String toString() { return word + " : " + count; } } }

5.2.2 scala

import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

/**
 * Implements a streaming windowed version of the "WordCount" program.
 * 
 * This program connects to a server socket and reads strings from the socket.
 * The easiest way to try this out is to open a text sever (at port 12345) 
 * using the ''netcat'' tool via
 * {{{
 * nc -l 12345
 * }}}
 * and run this example with the hostname and the port as arguments..
 */
object SocketWindowWordCount {

  /** Main program method */
  def main(args: Array[String]) : Unit = {

    // the host and the port to connect to
    var hostname: String = "localhost"
    var port: Int = 0

    try {
      val params = ParameterTool.fromArgs(args)
      hostname = if (params.has("hostname")) params.get("hostname") else "localhost"
      port = params.getInt("port")
    } catch {
      case e: Exception => {
        System.err.println("No port specified. Please run 'SocketWindowWordCount " +
          "--hostname  --port ', where hostname (localhost by default) and port " +
          "is the address of the text server")
        System.err.println("To start a simple text server, run 'netcat -l ' " +
          "and type the input text into the command line")
        return
      }
    }
    
    // get the execution environment
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    
    // get input data by connecting to the socket
    val text: DataStream[String] = env.socketTextStream(hostname, port, '\n')

    // parse the data, group it, window it, and aggregate the counts 
    val windowCounts = text
          .flatMap { w => w.split("\\s") }
          .map { w => WordWithCount(w, 1) }
          .keyBy("word")
          .timeWindow(Time.seconds(5))
          .sum("count")

    // print the results with a single thread, rather than in parallel
    windowCounts.print().setParallelism(1)

    env.execute("Socket Window WordCount")
  }

  /** Data type for words with count */
  case class WordWithCount(word: String, count: Long)
}

5.3 打包

将文件打包为jar

flink-demo-1.0-SNAPSHOT-jar-with-dependencies.jar

5.4 启动示例程序

以上代码所写的程序功能是从socket中读取文本,然后每隔5秒打印出每个单词在当前时间往前推5秒的时间窗口内的出现次数。

5.4.1 启动netcat

9999端口启动本地netcat服务:

$ nc -l 9999

5.4.2 提交Flink应用

$ flink run /Users/chengc/cc/work/projects/flink-demo/target/SocketWindowWordCount-jar-with-dependencies.jar --port 9999
# 看到控制台输出以下信息代表任务提交成功
Starting execution of program

现在我们看看前面提到过的flink web界面:
Flink学习2-安装和启动

点击这行job信息能看到job详情页:
Flink学习2-安装和启动

5.4.3 测试Flink应用

通过以上步骤我们建立了Flink应用和9999端口的关系,现在我们试试再nc界面输入一些字符串:

$ nc -lk 9999
i am a chinese
who are you
how do you do
how do you do

与此同时,我们使用tailf 查看flink 应用的输出:

$  tail -f log/flink-*-taskexecutor-*.out
i : 1
chine : 1
a : 1
am : 1
who : 1
you : 1
are : 1
how : 2
you : 2
do : 4

可以看到 ,示例程序以翻滚窗口(tumbling window)的形式每隔5秒将前5秒的数据进行了字符统计。

0xFE 总结

本篇文章主要讲了下Flink的安装和示例程序的提交,希望大家有所收获。

下一章我们学习下Flink的API,看看Flink作者是怎么抽象API的:
Flink系列3-API介绍

0xFF 参考文档

Flink-Quickstart


推荐阅读
  • 全面解读Apache Flink的核心架构与优势
    Apache Flink作为大数据处理领域的新兴力量,凭借其独特的流处理能力和高效的批处理性能,迅速获得了广泛的关注。本文旨在深入探讨Flink的关键技术特点及其应用场景,为大数据处理提供新的视角。 ... [详细]
  • 本文详细介绍了 Dockerfile 的编写方法及其在网络配置中的应用,涵盖基础指令、镜像构建与发布流程,并深入探讨了 Docker 的默认网络、容器互联及自定义网络的实现。 ... [详细]
  • 本文详细介绍了如何在Linux系统上安装和配置Smokeping,以实现对网络链路质量的实时监控。通过详细的步骤和必要的依赖包安装,确保用户能够顺利完成部署并优化其网络性能监控。 ... [详细]
  • 本文详细介绍了 Flink 和 YARN 的交互机制。YARN 是 Hadoop 生态系统中的资源管理组件,类似于 Spark on YARN 的配置方式。我们将基于官方文档,深入探讨如何在 YARN 上部署和运行 Flink 任务。 ... [详细]
  • 深入解析Spark核心架构与部署策略
    本文详细探讨了Spark的核心架构,包括其运行机制、任务调度和内存管理等方面,以及四种主要的部署模式:Standalone、Apache Mesos、Hadoop YARN和Kubernetes。通过本文,读者可以深入了解Spark的工作原理及其在不同环境下的部署方式。 ... [详细]
  • 本文介绍了一款用于自动化部署 Linux 服务的 Bash 脚本。该脚本不仅涵盖了基本的文件复制和目录创建,还处理了系统服务的配置和启动,确保在多种 Linux 发行版上都能顺利运行。 ... [详细]
  • 本文探讨了如何优化和正确配置Kafka Streams应用程序以确保准确的状态存储查询。通过调整配置参数和代码逻辑,可以有效解决数据不一致的问题。 ... [详细]
  • 基于KVM的SRIOV直通配置及性能测试
    SRIOV介绍、VF直通配置,以及包转发率性能测试小慢哥的原创文章,欢迎转载目录?1.SRIOV介绍?2.环境说明?3.开启SRIOV?4.生成VF?5.VF ... [详细]
  • 深入解析 Apache Shiro 安全框架架构
    本文详细介绍了 Apache Shiro,一个强大且灵活的开源安全框架。Shiro 专注于简化身份验证、授权、会话管理和加密等复杂的安全操作,使开发者能够更轻松地保护应用程序。其核心目标是提供易于使用和理解的API,同时确保高度的安全性和灵活性。 ... [详细]
  • 本文详细介绍了如何准备和安装 Eclipse 开发环境及其相关插件,包括 JDK、Tomcat、Struts 等组件的安装步骤及配置方法。 ... [详细]
  • 在本周的白板演练中,Apache Flink 的 PMC 成员及数据工匠首席技术官 Stephan Ewen 深入探讨了如何利用保存点功能进行流处理中的数据重新处理、错误修复、系统升级和 A/B 测试。本文将详细解释保存点的工作原理及其应用场景。 ... [详细]
  • 本文详细介绍了macOS系统的核心组件,包括如何管理其安全特性——系统完整性保护(SIP),并探讨了不同版本的更新亮点。对于使用macOS系统的用户来说,了解这些信息有助于更好地管理和优化系统性能。 ... [详细]
  • 2023年京东Android面试真题解析与经验分享
    本文由一位拥有6年Android开发经验的工程师撰写,详细解析了京东面试中常见的技术问题。涵盖引用传递、Handler机制、ListView优化、多线程控制及ANR处理等核心知识点。 ... [详细]
  • Google最新推出的嵌入AI技术的便携式相机Clips现已上架,旨在通过人工智能技术自动捕捉用户生活中值得纪念的时刻,帮助人们减少照片数量过多的问题。 ... [详细]
  • 本文详细记录了在银河麒麟操作系统和龙芯架构上使用 Qt 5.15.2 进行项目打包时遇到的问题及解决方案,特别关注于 linuxdeployqt 工具的应用。 ... [详细]
author-avatar
年庚瑶
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有