热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

使用Flink集群环境进行数据处理

前言上篇文章记录了搭建分布式Flink集群环境的过程搭建Flink集群环境这篇文章咱们聊一聊Flink客户端如何对接Flink集群环境的过程示例:Flink读取Had

前言

上篇文章记录了搭建分布式Flink集群环境的过程 搭建Flink集群环境

这篇文章咱们聊一聊Flink客户端如何对接Flink集群环境的过程

示例:Flink读取Hadoop中的文件 然后通过集群环境进行数据处理的过程

Hadoop

Hadoop集群环境搭建

搭建大数据运行环境之一

搭建大数据运行环境之二

Hadoop集群端口说明

Hadoop集群搭建过程异常情况

不能格式化存储目录

详细异常信息

org.apache.hadoop.hdfs.qjournal.client.QuorumException: Could not format one or more JournalNodes. 1 exceptions thrown:
192.168.84.132:8485: Directory /usr/local/hadoop/jn/data/nameservices001 is in an inconsistent state: Can't format the storage directory because the current directory is not empty

journalnode的端口是8485

处理方式

每一个hadoop journalnode节点上将指定目录删除即可

rm -rf /usr/local/hadoop/jn/data/nameservices001

上传文件到hdfs

cd /usr/local/hadoop/sbin
# 创建文件夹
hdfs dfs -mkdir /hdfsdata
# 文件
sudo vi /home/aaa.txt
# 上传文件到指定文件夹
hdfs dfs -put /home/aaa.txt  /hdfsdata

上传文件异常

Hadoop DataNode 节点启不来

详细异常信息

File /hdfsdata/aaa.txt._COPYING_ could only be replicated to 0 nodes instead of minReplication (=1). 
There are 0 datanode(s) running and no node(s) are excluded in this operation

查看WebUI DataNode情况

http://192.168.84.128:50070/dfshealth.html#tab-datanode

解决方法一
停止集群

cd /usr/local/hadoop/sbin
./stop-all.sh

删除在hdfs中配置的data目录
  • 查看data目录

在core-site.xml中配置的hadoop.tmp.dir对应文件件

cat /usr/local/hadoop/etc/hadoop/core-site.xml 

  • 删除

rm -rf /usr/local/hadoop/tmp/*

重新格式化

./hadoop namenode -format

重新启动集群

./start-all.sh

解决方法二

如果上面的方法还是不能启动DataNode那么使用这个方法

当执行文件系统格式化时
会在namenode数据文件夹
(即配置文件中dfs.name.dir在本地系统的路径)
中保存一个current/VERSION文件
记录namespaceID
标志了所有格式化的namenode版本
如果我们频繁的格式化namenode
那么datanode中保存(即dfs.data.dir在本地系统的路径)的current/VERSION文件只是你地第一次格式化时保存的namenode的ID
因此就会造成namenode和datanode之间的ID不一致

  • 解决方法A:(推荐)

删除DataNode的所有资料及将集群中每个datanode节点的/dfs/data/current中的VERSION删除
然后重新执行hadoop namenode -format进行格式化
重启集群,错误消失

  • 解决方法B:

将name/current下的VERSION中的clusterID复制到data/current下的VERSION中,覆盖掉原来的clusterID

查看DataNode情况

DataNode已经起来了

查看上传文件

http://192.168.84.128:50070

该文件路径

hdfs://192.168.84.128:8020/hdfsdata/aaa.txt

Flink读取数据源并处理数据

DEMO源码

https://gitee.com/pingfanrenbiji/flink-examples-streaming

Flink读取hdfs文件并处理数据

创建flink执行环境

  • 第一个参数:远程flink集群 jobmanager ip地址
  • 第二个参数:8081是jobmanager webui端口
  • 第三个参数:是当前文件夹所在的jar包

数据源

读取hdfs文件数据

各种算子简介

以单词计数为例

先要将字符串数据解析成单词和次数 使用tuple2表示
第一个字段是单词 第二个字段是次数
次数初始值设置成1

flatmap

flatmap来做解析的工作
一行数据可能有多个单词

keyBy


将数据流按照单词字段即0号索引字段做分组
keyBy(int index) 得到一个以单词为key的tuple2数据流

timeWindow


在流上指定想要的窗口
并根据窗口中的数据计算结果
每5秒聚合一次单词数
每个窗口都是从零开始统计的

timeWindow 指定想要5秒的翻滚窗口(Tumble)

sum


第三个调用为每个key每个窗口指定了sum聚合函数
按照次数字段(即1号索引字段想家)
得到结果数据流
将每5秒输出一次 这5秒内每个单词出现的次数

将数据打印到控制台

所有算子操作(创建源、聚合、打印)只是构建了内部算子操作的图形

只有在execute被调用时才会在提交到集群或本地计算机上执行

执行报错 找不到代码异常

具体异常信息

Caused by: org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could not read the user code wrapper: org.apache.flink.streaming.scala.examples.remotejob.RemoteJobTest$$anon$2

解决方法

  • 将当前目录文件夹打包成jar包

使用maven插件maven-jar-plugin

  • 第三个参数指向该jar包

在FLink Web UI查看该任务的执行过程

编译异常

无效的标记

--add-exports=java.base/sun.net.util=ALL-UNNAMED

不支持hdfs文件系统

具体异常信息

org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded

处理方式

  • 下载 flink hadoop资源jar包

https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.7.5-7.0/flink-shaded-hadoop-2-uber-2.7.5-7.0.jar

  • 放入flink 安装包 lib目录下

每个节点都需要放上该jar包 然后重启flink集群环境

当前操作节点hadoop namenode节点为standby状态

具体详细信息

org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category READ is not supported in state standby

解决方法

重新格式化2个namenode节点即可

具体详见

搭建大数据运行环境之二

遗留问题

flink数据源来自于socket数据

启动socket服务并输入数据

问题是

Flink并没有监听到该socket数据
暂时还没有找到原因 
了解的朋友们请联系我 
指导我一下哦

如果本地环境是可以监听到的

后记

为了解决这个问题
我请教了下 “Apache Flink China社区”钉钉群里面的谢波老师
他告诉我:

通过java或scala一般创建本地执行环境 即

'final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();'

很少有

'final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(ip,port,jarfiles);'

这样用的

若使用flink分布式环境 那么通过web ui界面 上传jar包的方式来完成

这也就解释了为什么我没有找到相关资料
只能靠自己'摸着石头过河'

结语

在了解一件新事物的时候 
按照自己的想法 一番努力和挣扎之后
也许方向是错误的 
但也会对它更进一步的了解了


推荐阅读
  • t-io 2.0.0发布-法网天眼第一版的回顾和更新说明
    本文回顾了t-io 1.x版本的工程结构和性能数据,并介绍了t-io在码云上的成绩和用户反馈。同时,还提到了@openSeLi同学发布的t-io 30W长连接并发压力测试报告。最后,详细介绍了t-io 2.0.0版本的更新内容,包括更简洁的使用方式和内置的httpsession功能。 ... [详细]
  • 如何实现织梦DedeCms全站伪静态
    本文介绍了如何通过修改织梦DedeCms源代码来实现全站伪静态,以提高管理和SEO效果。全站伪静态可以避免重复URL的问题,同时通过使用mod_rewrite伪静态模块和.htaccess正则表达式,可以更好地适应搜索引擎的需求。文章还提到了一些相关的技术和工具,如Ubuntu、qt编程、tomcat端口、爬虫、php request根目录等。 ... [详细]
  • 本文介绍了Java工具类库Hutool,该工具包封装了对文件、流、加密解密、转码、正则、线程、XML等JDK方法的封装,并提供了各种Util工具类。同时,还介绍了Hutool的组件,包括动态代理、布隆过滤、缓存、定时任务等功能。该工具包可以简化Java代码,提高开发效率。 ... [详细]
  • Android系统移植与调试之如何修改Android设备状态条上音量加减键在横竖屏切换的时候的显示于隐藏
    本文介绍了如何修改Android设备状态条上音量加减键在横竖屏切换时的显示与隐藏。通过修改系统文件system_bar.xml实现了该功能,并分享了解决思路和经验。 ... [详细]
  • 本文介绍了在Windows环境下如何配置php+apache环境,包括下载php7和apache2.4、安装vc2015运行时环境、启动php7和apache2.4等步骤。希望对需要搭建php7环境的读者有一定的参考价值。摘要长度为169字。 ... [详细]
  • Imtryingtofigureoutawaytogeneratetorrentfilesfromabucket,usingtheAWSSDKforGo.我正 ... [详细]
  • mac php错误日志配置方法及错误级别修改
    本文介绍了在mac环境下配置php错误日志的方法,包括修改php.ini文件和httpd.conf文件的操作步骤。同时还介绍了如何修改错误级别,以及相应的错误级别参考链接。 ... [详细]
  • 图像因存在错误而无法显示 ... [详细]
  • Activiti7流程定义开发笔记
    本文介绍了Activiti7流程定义的开发笔记,包括流程定义的概念、使用activiti-explorer和activiti-eclipse-designer进行建模的方式,以及生成流程图的方法。还介绍了流程定义部署的概念和步骤,包括将bpmn和png文件添加部署到activiti数据库中的方法,以及使用ZIP包进行部署的方式。同时还提到了activiti.cfg.xml文件的作用。 ... [详细]
  • 大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记
    本文介绍了大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记,包括outputFormat接口实现类、自定义outputFormat步骤和案例。案例中将包含nty的日志输出到nty.log文件,其他日志输出到other.log文件。同时提供了一些相关网址供参考。 ... [详细]
  • 目录浏览漏洞与目录遍历漏洞的危害及修复方法
    本文讨论了目录浏览漏洞与目录遍历漏洞的危害,包括网站结构暴露、隐秘文件访问等。同时介绍了检测方法,如使用漏洞扫描器和搜索关键词。最后提供了针对常见中间件的修复方式,包括关闭目录浏览功能。对于保护网站安全具有一定的参考价值。 ... [详细]
  • Java如何导入和导出Excel文件的方法和步骤详解
    本文详细介绍了在SpringBoot中使用Java导入和导出Excel文件的方法和步骤,包括添加操作Excel的依赖、自定义注解等。文章还提供了示例代码,并将代码上传至GitHub供访问。 ... [详细]
  • Apache Shiro 身份验证绕过漏洞 (CVE202011989) 详细解析及防范措施
    本文详细解析了Apache Shiro 身份验证绕过漏洞 (CVE202011989) 的原理和影响,并提供了相应的防范措施。Apache Shiro 是一个强大且易用的Java安全框架,常用于执行身份验证、授权、密码和会话管理。在Apache Shiro 1.5.3之前的版本中,与Spring控制器一起使用时,存在特制请求可能导致身份验证绕过的漏洞。本文还介绍了该漏洞的具体细节,并给出了防范该漏洞的建议措施。 ... [详细]
  • Sleuth+zipkin链路追踪SpringCloud微服务的解决方案
    在庞大的微服务群中,随着业务扩展,微服务个数增多,系统调用链路复杂化。Sleuth+zipkin是解决SpringCloud微服务定位和追踪的方案。通过TraceId将不同服务调用的日志串联起来,实现请求链路跟踪。通过Feign调用和Request传递TraceId,将整个调用链路的服务日志归组合并,提供定位和追踪的功能。 ... [详细]
  • 本文介绍了禅道作为一款国产开源免费的测试管理工具的特点和功能,并提供了禅道的搭建和调试方法。禅道是一款B/S结构的项目管理工具,可以实现组织管理、后台管理、产品管理、项目管理和测试管理等功能。同时,本文还介绍了其他软件测试相关工具,如功能自动化工具和性能自动化工具,以及白盒测试工具的使用。通过本文的阅读,读者可以了解禅道的基本使用方法和优势,从而更好地进行测试管理工作。 ... [详细]
author-avatar
禾漾啊
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有