热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

【大数据开发】SparkStreaming基于HDFS的实时计算开发(四)

基于HDFS文件的实时计算,其实就是监控一个HDFS目录,只要其中有新文件出现,就实时处理。相当

4


Part

4

Spark Streaming 基于HDFS的实时计算开发

Socket和HDFS文件的原理

关于集群同步的一些注意事项

编写本地源数据并创建HDFS文件目录

编写基于HDFS源端的WordCount实时处理程序

上传本地文件并展示实时结果






Socket和HDFS文件的原理

Socket和HDFS文件:



Socket:

    第二章的wordcount例子已经演示过了。

StreamingContext.socketTextStream ()


HDFS文件:

    基于HDFS文件的实时计算,其实就是监控一个HDFS目录,只要其中有新文件出现,就实时处理。相当于处理实时的文件流。

streamingContext.fileStream<KeyClass, ValueClass, InputFormatClass>(dataDirectory)
streamingContext.textFileStream[KeyClass, ValueClass, InputFormatClass](dataDirecto

    Spark Streaming会监视指定的HDFS目录,并且处理出现在目录中的文件。要注意的是,所有放入HDFS目录中的文件,都必须有相同的格式;必须使用移动或者重命名的方式,将文件移入目录;一旦处理之后,文件的内容即使改变,也不会再处理了;基于HDFS文件的数据源是没有Receiver的,因此不会占用一个cpu core,所以可以写成local[1]。




关于集群同步的一些注意事项

关于集群同步的时间问题:


修改主机时间,使源数据节点时间必须与计算节点时间保持同步:

# sudo date -s 2020-01-14
sudo date -s 08:00:00

修改hadoop配置文件:

# sudo vi hadoop-env.sh
......... 
export HADOOP_OPTS="$HADOOP_OPTS-Duser.timezOne=GMT+08"
......... 

# sudo vi yarn-env.sh
.........
YARN_OPTS="$YARN_OPTS -Duser.timezOne=GMT+08"
.........

# sudo vi hbase-env.sh
.........
export TZ="Asia/Shanghai"
........




创建本地源数据并创建HDFS文件目录

本地创建源数据文件:

# vi test.txt
hadoop hive
hbase hive
spark oracle flink spark
spark hbase

查看文件:


创建输入流数据源目录:

[kfk@bigdata-pro-m01 hadoop-2.7.0]$ bin/hdfs dfs -mkdir -p /user/spark/datas/sparkstreaming




编写基于HDFS源端的WordCount实时处理程序

//Java
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;


import java.util.Arrays;


public class HDFSWordCountJava {
public static void main(String[] args) throws Exception{
SparkConf cOnf= new SparkConf().setMaster("local[1]").setAppName("HDFSworkWordCount");
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(3));  //每隔3秒处理数据
String filePath = "hdfs://bigdata-pro-m01.kfk.com:9000/user/spark/datas/sparkstreaming";


JavaDStream<String> lines = jssc.textFileStream(filePath);


JavaDStream<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());


JavaPairDStream<String,Integer> pair = words.mapToPair(word -> new Tuple2<>(word,1));


JavaPairDStream<String,Integer> wordcount = pair.reduceByKey((x,y) -> (x + y));


        wordcount.print();
jssc.start();
jssc.awaitTermination();
}
}

//Scala
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Durations, StreamingContext}


object HDFSWordCountScala {
def main(args: Array[String]): Unit = {
val cOnf= new SparkConf().setMaster("local[1]").setAppName("HDFSworkWordCount")


val jssc = new StreamingContext(conf, Durations.seconds(3))


val filePath: String = "hdfs://bigdata-pro-m01.kfk.com:9000/user/spark/datas/sparkstreaming" //sparkstreaming目录下新增的所有文件都会被处理


val lines = jssc.textFileStream(filePath)


val words = lines.flatMap(line => line.split(" "));


val pair = words.map(word => (word,1))


val wordcount = pair.reduceByKey((x , y) => x + y)


wordcount.print()
jssc.start()
jssc.awaitTermination()
}
}




上传本地文件并展示实时结果


  1. 启动程序


  2. 上传HDFS文件

bin/hdfs dfs -put /opt/datas/test.txt /user/spark/datas/sparkstreaming
bin/hdfs dfs -copyFromLocal /opt/datas/test.txt /user/spark/datas/sparkstreaming/1.txt

    这里为什么上传了这么多文件没有执行成功呢?这是因为开始没有同步集群时间导致Last Modified异常(20200114测试的),计算节点无法实时读取数据。最后修改时间后重启namenode和datanode服务,重新上传6.txt文件才被计算节点处理。


    3. 打印结果


    总结一下,在实际企业中HDFS用于实时数据的传输比较少,这是因为数据落地到HDFS分布式文件系统需要一段时间,从而影响了时效性,所以分布式存储文件多用于大数据离线处理。而数据实时性处理还是要Kafka这种响应快速的中间件来映射。







微信号 : BIGDT_IN

● 扫码关注我们





推荐阅读
  • Hadoop2.6.0 + 云centos +伪分布式只谈部署
    3.0.3玩不好,现将2.6.0tar.gz上传到usr,chmod-Rhadoop:hadophadoop-2.6.0,rm掉3.0.32.在etcp ... [详细]
  • 向QTextEdit拖放文件的方法及实现步骤
    本文介绍了在使用QTextEdit时如何实现拖放文件的功能,包括相关的方法和实现步骤。通过重写dragEnterEvent和dropEvent函数,并结合QMimeData和QUrl等类,可以轻松实现向QTextEdit拖放文件的功能。详细的代码实现和说明可以参考本文提供的示例代码。 ... [详细]
  • MR程序的几种提交运行模式本地模型运行1在windows的eclipse里面直接运行main方法,就会将job提交给本地执行器localjobrunner执行-- ... [详细]
  • 本文介绍了Python高级网络编程及TCP/IP协议簇的OSI七层模型。首先简单介绍了七层模型的各层及其封装解封装过程。然后讨论了程序开发中涉及到的网络通信内容,主要包括TCP协议、UDP协议和IPV4协议。最后还介绍了socket编程、聊天socket实现、远程执行命令、上传文件、socketserver及其源码分析等相关内容。 ... [详细]
  • Java序列化对象传给PHP的方法及原理解析
    本文介绍了Java序列化对象传给PHP的方法及原理,包括Java对象传递的方式、序列化的方式、PHP中的序列化用法介绍、Java是否能反序列化PHP的数据、Java序列化的原理以及解决Java序列化中的问题。同时还解释了序列化的概念和作用,以及代码执行序列化所需要的权限。最后指出,序列化会将对象实例的所有字段都进行序列化,使得数据能够被表示为实例的序列化数据,但只有能够解释该格式的代码才能够确定数据的内容。 ... [详细]
  • 本文介绍了如何使用php限制数据库插入的条数并显示每次插入数据库之间的数据数目,以及避免重复提交的方法。同时还介绍了如何限制某一个数据库用户的并发连接数,以及设置数据库的连接数和连接超时时间的方法。最后提供了一些关于浏览器在线用户数和数据库连接数量比例的参考值。 ... [详细]
  • t-io 2.0.0发布-法网天眼第一版的回顾和更新说明
    本文回顾了t-io 1.x版本的工程结构和性能数据,并介绍了t-io在码云上的成绩和用户反馈。同时,还提到了@openSeLi同学发布的t-io 30W长连接并发压力测试报告。最后,详细介绍了t-io 2.0.0版本的更新内容,包括更简洁的使用方式和内置的httpsession功能。 ... [详细]
  • Go GUIlxn/walk 学习3.菜单栏和工具栏的具体实现
    本文介绍了使用Go语言的GUI库lxn/walk实现菜单栏和工具栏的具体方法,包括消息窗口的产生、文件放置动作响应和提示框的应用。部分代码来自上一篇博客和lxn/walk官方示例。文章提供了学习GUI开发的实际案例和代码示例。 ... [详细]
  • IOS开发之短信发送与拨打电话的方法详解
    本文详细介绍了在IOS开发中实现短信发送和拨打电话的两种方式,一种是使用系统底层发送,虽然无法自定义短信内容和返回原应用,但是简单方便;另一种是使用第三方框架发送,需要导入MessageUI头文件,并遵守MFMessageComposeViewControllerDelegate协议,可以实现自定义短信内容和返回原应用的功能。 ... [详细]
  • SpringMVC接收请求参数的方式总结
    本文总结了在SpringMVC开发中处理控制器参数的各种方式,包括处理使用@RequestParam注解的参数、MultipartFile类型参数和Simple类型参数的RequestParamMethodArgumentResolver,处理@RequestBody注解的参数的RequestResponseBodyMethodProcessor,以及PathVariableMapMethodArgumentResol等子类。 ... [详细]
  • 图像因存在错误而无法显示 ... [详细]
  • Hadoop 源码学习笔记(4)Hdfs 数据读写流程分析
    Hdfs的数据模型在对读写流程进行分析之前,我们需要先对Hdfs的数据模型有一个简单的认知。数据模型如上图所示,在NameNode中有一个唯一的FSDirectory类负责维护文件 ... [详细]
  • 对于开源的东东,尤其是刚出来不久,我认为最好的学习方式就是能够看源代码和doc,測试它的样例为了方便查看源代码,关联导入源代 ... [详细]
  •        在搭建Hadoop环境之前,请先阅读如下博文,把搭建Hadoop环境之前的准备工作做好,博文如下:       1、CentOS6.7下安装JDK,地址:http:b ... [详细]
  • 前言折腾了一段时间hadoop的部署管理,写下此系列博客记录一下。为了避免各位做部署这种重复性的劳动,我已经把部署的步骤写成脚本,各位只需要按着本文把脚本执行完,整个环境基本就部署 ... [详细]
author-avatar
枫叶
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有