热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Flink(三)IDEA开发Flink环境搭建与测试

一.IDEA开发环境1.pom文件设置1.8

一.IDEA开发环境


1.pom文件设置


1.8
1.8
UTF-8
2.11.12
2.11
2.7.6
1.6.1



org.scala-lang
scala-library
${scala.version}


org.apache.flink
flink-java
${flink.version}


org.apache.flink
flink-streaming-java_${scala.binary.version}
${flink.version}


org.apache.flink
flink-scala_${scala.binary.version}
${flink.version}


org.apache.flink
flink-streaming-scala_${scala.binary.version}
${flink.version}


org.apache.flink
flink-table_${scala.binary.version}
${flink.version}


org.apache.flink
flink-clients_${scala.binary.version}
${flink.version}


org.apache.flink
flink-connector-kafka-0.10_${scala.binary.version}
${flink.version}


org.apache.hadoop
hadoop-client
${hadoop.version}


mysql
mysql-connector-java
5.1.38


com.alibaba
fastjson
1.2.22



src/main/scala
src/test/scala



net.alchim31.maven
scala-maven-plugin
3.2.0



compile
testCompile




-dependencyfile
${project.build.directory}/.scala_dependencies






org.apache.maven.plugins
maven-surefire-plugin
2.18.1

false
true

**/*Test.*
**/*Suite.*




org.apache.maven.plugins
maven-shade-plugin
3.0.0


package

shade




*:*

META-INF/*.SF
META-INF/*.DSA
META-INF/*.RSA





org.apache.spark.WordCount








2.flink开发流程

Flink具有特殊类DataSetDataStream在程序中表示数据。您可以将它们视为可以包含重复项的不可变数据集合。在DataSet数据有限的情况下,对于一个DataStream元素的数量可以是无界的。

这些集合在某些关键方面与常规Java集合不同。首先,它们是不可变的,这意味着一旦创建它们就无法添加或删除元素。你也不能简单地检查里面的元素。

集合最初通过在弗林克程序添加源创建和新的集合从这些通过将它们使用API方法如衍生mapfilter等等。

Flink程序看起来像是转换数据集合的常规程序。每个程序包含相同的基本部分:

1.获取execution environment,

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

2.加载/创建初始化数据

DataStream text = env.readTextFile("file:///path/to/file");

3.指定此数据的转换

val mapped = input.map { x => x.toInt }

4.指定放置计算结果的位置

writeAsText(String path)

print()

5.触发程序执行

在local模式下执行程序

execute()

将程序达成jar运行在线上

./bin/flink run \

-m node21:8081 \

./examples/batch/WordCount.jar \

--input  hdfs:///user/admin/input/wc.txt \

--output  hdfs:///user/admin/output2  \


二. Wordcount案例


1.Scala代码

package com.xyg.streaming
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
/**
* Author: Mr.Deng
* Date: 2018/10/15
* Desc:
*/
object SocketWindowWordCount {
def main(args: Array[String]) : Unit = {
// 定义一个数据类型保存单词出现的次数
case class WordWithCount(word: String, count: Long)
// port 表示需要连接的端口
val port: Int = try {
ParameterTool.fromArgs(args).getInt("port")
} catch {
case e: Exception => {
System.err.println("No port specified. Please run 'SocketWindowWordCount --port

'")
return
}
}
// 获取运行环境
val env: StreamExecutiOnEnvironment= StreamExecutionEnvironment.getExecutionEnvironment
// 连接此socket获取输入数据
val text = env.socketTextStream("node21", port, '\n')
//需要加上这一行隐式转换 否则在调用flatmap方法的时候会报错
import org.apache.flink.api.scala._
// 解析数据, 分组, 窗口化, 并且聚合求SUM
val windowCounts = text
.flatMap { w => w.split("\\s") }
.map { w => WordWithCount(w, 1) }
.keyBy("word")
.timeWindow(Time.seconds(5), Time.seconds(1))
.sum("count")
// 打印输出并设置使用一个并行度
windowCounts.print().setParallelism(1)
env.execute("Socket Window WordCount")
}
}

2.Java代码

package com.xyg.streaming;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
/**
* Author: Mr.Deng
* Date: 2018/10/15
* Desc: 使用flink对指定窗口内的数据进行实时统计,最终把结果打印出来
* 先在node21机器上执行nc -l 9000
*/
public class StreamingWindowWordCountJava {
public static void main(String[] args) throws Exception {
//定义socket的端口号
int port;
try{
ParameterTool parameterTool = ParameterTool.fromArgs(args);
port = parameterTool.getInt("port");
}catch (Exception e){
System.err.println("没有指定port参数,使用默认值9000");
port = 9000;
}
//获取运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//连接socket获取输入的数据
DataStreamSource text = env.socketTextStream("node21", port, "\n");
//计算数据
DataStream windowCount = text.flatMap(new FlatMapFunction() {
public void flatMap(String value, Collector out) throws Exception {
String[] splits = value.split("\\s");
for (String word:splits) {
out.collect(new WordWithCount(word,1L));
}
}
})//打平操作,把每行的单词转为类型的数据
//针对相同的word数据进行分组
.keyBy("word")
//指定计算数据的窗口大小和滑动窗口大小
.timeWindow(Time.seconds(2),Time.seconds(1))
.sum("count");
//把数据打印到控制台,使用一个并行度
windowCount.print().setParallelism(1);
//注意:因为flink是懒加载的,所以必须调用execute方法,上面的代码才会执行
env.execute("streaming word count");
}
/**
* 主要为了存储单词以及单词出现的次数
*/
public static class WordWithCount{
public String word;
public long count;
public WordWithCount(){}
public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return "WordWithCount{" +
"word='" + word + '\'' +
", count=" + count +
'}';
}
}
}

3.运行测试

首先,使用nc命令启动一个本地监听,命令是:

[admin@node21 ~]$ nc -l 9000

通过netstat命令观察9000端口。 netstat -anlp | grep 9000,启动监听如果报错:-bash: nc: command not found,请先安装nc,在线安装命令:yum -y install nc

然后,IDEA上运行flink官方案例程序

node21上输入

IDEA控制台输出如下


4.集群测试

这里单机测试官方案例

[admin@node21 flink-1.6.1]$ pwd
/opt/flink-1.6.1
[admin@node21 flink-1.6.1]$ ./bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host node21.
Starting taskexecutor daemon on host node21.
[admin@node21 flink-1.6.1]$ jps
2100 StandaloneSessionClusterEntrypoint
2518 TaskManagerRunner
2584 Jps
[admin@node21 flink-1.6.1]$ ./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000

程序连接到套接字并等待输入。您可以检查Web界面以验证作业是否按预期运行:

单词在5秒的时间窗口(处理时间,翻滚窗口)中计算并打印到stdout。监视TaskManager的输出文件并写入一些文本nc(输入在点击后逐行发送到Flink):


三. 使用IDEA开发离线程序

Dataset是flink的常用程序,数据集通过source进行初始化,例如读取文件或者序列化集合,然后通过transformation(filtering、mapping、joining、grouping)将数据集转成,然后通过sink进行存储,既可以写入hdfs这种分布式文件系统,也可以打印控制台,flink可以有很多种运行方式,如local、flink集群、yarn等.


1. scala程序

import org.apache.flink.api.scala._
object WordCount {
def main(args: Array[String]) {
//初始化环境
val env = ExecutionEnvironment.getExecutionEnvironment
//从字符串中加载数据
val text = env.fromElements(
"Who's there?",
"I think I hear them. Stand, ho! Who's there?")
//分割字符串、汇总tuple、按照key进行分组、统计分组后word个数
val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
.map { (_, 1) }
.groupBy(0)
.sum(1)
//打印
counts.print()
}
}

2. java程序

package com.xyg.batch;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
* Author: Mr.Deng
* Date: 2018/10/19
* Desc:
*/
public class WordCount {
public static void main(String[] args) throws Exception {
//构建环境
final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
//通过字符串构建数据集
DataSet text = env.fromElements(
"Who's there?",
"I think I hear them. Stand, ho! Who's there?");
//分割字符串、按照key进行分组、统计相同的key个数
DataSet> wordCounts = text
.flatMap(new LineSplitter())
.groupBy(0)
.sum(1);
//打印
wordCounts.print();
}
//分割字符串的方法
public static class LineSplitter implements FlatMapFunction> {
@Override
public void flatMap(String line, Collector> out) {
for (String word : line.split(" ")) {
out.collect(new Tuple2(word, 1));
}
}
}
}

3.运行

 


推荐阅读
  • 全面解读Apache Flink的核心架构与优势
    Apache Flink作为大数据处理领域的新兴力量,凭借其独特的流处理能力和高效的批处理性能,迅速获得了广泛的关注。本文旨在深入探讨Flink的关键技术特点及其应用场景,为大数据处理提供新的视角。 ... [详细]
  • 本文详细介绍了如何配置Apache Flume与Spark Streaming,实现高效的数据传输。文中提供了两种集成方案,旨在帮助用户根据具体需求选择最合适的配置方法。 ... [详细]
  • window下kafka的安装以及测试
    目录一、安装JDK(需要安装依赖javaJDK)二、安装Kafka三、测试参考在Windows系统上安装消息队列kafka一、安装JDKÿ ... [详细]
  • 本文探讨了在使用Apache Flink向Kafka发送数据过程中遇到的事务频繁失败问题,并提供了详细的解决方案,包括必要的配置调整和最佳实践。 ... [详细]
  • Scala 实现 UTF-8 编码属性文件读取与克隆
    本文介绍如何使用 Scala 以 UTF-8 编码方式读取属性文件,并实现属性文件的克隆功能。通过这种方式,可以确保配置文件在多线程环境下的一致性和高效性。 ... [详细]
  • 解决JAX-WS动态客户端工厂弃用问题并迁移到XFire
    在处理Java项目中的JAR包冲突时,我们遇到了JaxWsDynamicClientFactory被弃用的问题,并成功将其迁移到org.codehaus.xfire.client。本文详细介绍了这一过程及解决方案。 ... [详细]
  • 在本周的白板演练中,Apache Flink 的 PMC 成员及数据工匠首席技术官 Stephan Ewen 深入探讨了如何利用保存点功能进行流处理中的数据重新处理、错误修复、系统升级和 A/B 测试。本文将详细解释保存点的工作原理及其应用场景。 ... [详细]
  • 深入解析:OpenShift Origin环境下的Kubernetes Spark Operator
    本文探讨了如何在OpenShift Origin平台上利用Kubernetes Spark Operator来管理和部署Apache Spark集群与应用。作为Radanalytics.io项目的一部分,这一开源工具为大数据处理提供了强大的支持。 ... [详细]
  • 本文详细探讨了如何在 SparkSQL 中创建 DataFrame,涵盖了从基本概念到具体实践的各种方法。作为持续学习的一部分,本文将持续更新以提供最新信息。 ... [详细]
  • 流处理中的计数挑战与解决方案
    本文探讨了在流处理中进行计数的各种技术和挑战,并基于作者在2016年圣何塞举行的Hadoop World大会上的演讲进行了深入分析。文章不仅介绍了传统批处理和Lambda架构的局限性,还详细探讨了流处理架构的优势及其在现代大数据应用中的重要作用。 ... [详细]
  • 本文介绍了如何通过 Maven 依赖引入 SQLiteJDBC 和 HikariCP 包,从而在 Java 应用中高效地连接和操作 SQLite 数据库。文章提供了详细的代码示例,并解释了每个步骤的实现细节。 ... [详细]
  • 实体映射最强工具类:MapStruct真香 ... [详细]
  • 探讨了小型企业在构建安全网络和软件时所面临的挑战和机遇。本文介绍了如何通过合理的方法和工具,确保小型企业能够有效提升其软件的安全性,从而保护客户数据并增强市场竞争力。 ... [详细]
  • 本文介绍如何使用布局文件在Android应用中排列多行TextView和Button,使其占据屏幕的特定比例,并提供示例代码以帮助理解和实现。 ... [详细]
  • 本文详细介绍了如何准备和安装 Eclipse 开发环境及其相关插件,包括 JDK、Tomcat、Struts 等组件的安装步骤及配置方法。 ... [详细]
author-avatar
手机用户2502854041
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有