热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

7.4基本输入源

一、文件流1.在spark-shell中创建文件流进入spark-shell创建文件流。另外打开一个终端窗口,启动进入spark-shell上面在spark-shell中执行的程序

一、文件流

技术分享图片


1.在spark-shell中创建文件流

技术分享图片

进入spark-shell创建文件流。另外打开一个终端窗口,启动进入spark-shell 

技术分享图片

上面在spark-shell中执行的程序,一旦你输入ssc.start()以后,程序就开始自动进入循环监听状态,屏幕上会显示一堆的信息,如下:

技术分享图片

在“/usr/local/spark/mycode/streaming/logfile”目录下新建一个log.txt文件,就可以在监听窗口中显示词频统计结果


2.采用独立应用程序方式创建文件流

在当前streaming下创建三级子目录,因为只有把代码放到src/main/scala目录下,sbt打包编译工具才能够正确运行。

技术分享图片

用vim编辑器新建一个TestStreaming.scala代码文件,请在里面输入以下代码:


import org.apache.spark._
import org.apache.spark.streaming._
object WordCountStreaming {
def main(args: Array[String]) {
val sparkCOnf= new SparkConf().setAppName("WordCountStreaming").setMaster("local[2]")//设置为本地运行模式,2个线程,一个监听,另一个处理数据
val ssc = new StreamingContext(sparkConf, Seconds(2))// 时间间隔为2秒
val lines = ssc.textFileStream("file:///usr/local/spark/mycode/streaming/logfile") //这里采用本地文件,当然你也可以采用HDFS文件
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}

技术分享图片

技术分享图片

在simple.sbt文件中输入以下代码: 技术分享图片

执行sbt打包编译的命令如下:

技术分享图片

 打包成功以后,就可以输入以下命令启动这个程序:

技术分享图片



  1. 执行上面命令后,就进入了监听状态(我们把运行这个监听程序的窗口称为监听窗口)

  2. 切换到另外一个Shell窗口,在"/usr/local/spark/mycode/streaming/logfile"目录下再新建一个log2.txt文件,文件里面随便输入一些单词,保存好文件退出vim编辑器

  3. 再次切换回“监听窗口”,等待20秒以后,按键盘Ctrl+C或者Ctrl+D停止监听程序,就可以看到监听窗口的屏幕上会打印出单词统计信息


二、套接字流

Spark Streaming可以通过Socket端口监听并接收数据,然后进行相应处理


1.Socket工作原理

技术分享图片


2.使用套接字流作为数据源

技术分享图片

请在NetworkWordCount.scala文件中输入如下内容:(客户端向服务端发起连接,需要告诉它向哪个主机哪个端口发起连接)


package org.apache.spark.examples.streaming
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.storage.StorageLevel
object NetworkWordCount {
def main(args: Array[String]) {
if (args.length <2) {
System.err.println("Usage: NetworkWordCount ")
System.exit(1)
}
StreamingExamples.setStreamingLogLevels() # 设置日志显示级别
val sparkCOnf= new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]") #生成sparkConf对象
val ssc = new StreamingContext(sparkConf, Seconds(1)) #生成一个StreamingContext对象
   # 1.定义输入数据流,args(0)是TCP服务端的主机名,args(1)是TCP服务端的端口号(字符串转整数)
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)#保存数据方式
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}

技术分享图片技术分享图片

在相同目录下再新建另外一个代码文件StreamingExamples.scala,文件内容如下:


package org.apache.spark.examples.streaming
import org.apache.spark.internal.Logging
import org.apache.log4j.{Level, Logger}
/** Utility functions for Spark Streaming examples. */
object StreamingExamples extends Logging { # 单例对象,不需要实例化,直接用它的静态方法
/** Set reasonable logging levels for streaming if the user has not configured log4j. */
def setStreamingLogLevels() {
val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
if (!log4jInitialized) {
// We first log something to initialize Spark‘s default logging, then we override the
// logging level.
logInfo("Setting log level to [WARN] for streaming example." +
" To override add a custom log4j.properties to the classpath.")
Logger.getRootLogger.setLevel(Level.WARN)
}
}
}

技术分享图片

技术分享图片技术分享图片技术分享图片

 新打开一个窗口作为nc窗口,启动nc程序:

技术分享图片

可以在nc窗口中随意输入一些单词,监听窗口就会自动获得单词数据流信息,在监听窗口每隔1秒就会打印出词频统计信息,大概会在屏幕上出现类似如下的结果:

技术分享图片


3.使用Socket编程实现自定义数据源

下面我们再前进一步,把数据源头的产生方式修改一下,不要使用nc程序,而是采用自己编写的程序产生Socket数据源。

技术分享图片

 

 

技术分享图片

 

 

 技术分享图片

 

 

 


三、RDD队列流


推荐阅读
  • 本题要求在一组数中反复取出两个数相加,并将结果放回数组中,最终求出最小的总加法代价。这是一个经典的哈夫曼编码问题,利用贪心算法可以有效地解决。 ... [详细]
  • 本文介绍了如何通过Java代码计算一个整数的位数,并展示了多个基础编程示例,包括求和、平均分计算、条件判断等。 ... [详细]
  • 探索新一代API文档工具,告别Swagger的繁琐
    对于后端开发者而言,编写和维护API文档既繁琐又不可或缺。本文将介绍一款全新的API文档工具,帮助团队更高效地协作,简化API文档生成流程。 ... [详细]
  • 本文探讨了在构建应用程序时,如何对不同类型的数据进行结构化设计。主要分为三类:全局配置、用户个人设置和用户关系链。每种类型的数据都有其独特的用途和应用场景,合理规划这些数据结构有助于提升用户体验和系统的可维护性。 ... [详细]
  • 气象对比分析
    本文探讨了不同地区和时间段的天气模式,通过详细的图表和数据分析,揭示了气候变化的趋势及其对环境和社会的影响。 ... [详细]
  • 深入解析动态代理模式:23种设计模式之三
    在设计模式中,动态代理模式是应用最为广泛的一种代理模式。它允许我们在运行时动态创建代理对象,并在调用方法时进行增强处理。本文将详细介绍动态代理的实现机制及其应用场景。 ... [详细]
  • 通常情况下,修改my.cnf配置文件后需要重启MySQL服务才能使新参数生效。然而,通过特定命令可以在不重启服务的情况下实现配置的即时更新。本文将详细介绍如何在线调整MySQL配置,并验证其有效性。 ... [详细]
  • Python自动化测试入门:Selenium环境搭建
    本文详细介绍如何在Python环境中安装和配置Selenium,包括开发工具PyCharm的安装、Python环境的设置以及Selenium包的安装方法。此外,还提供了编写和运行第一个自动化测试脚本的步骤。 ... [详细]
  • 本文详细介绍如何在 iOS 7 环境下申请苹果开发者账号,涵盖从访问开发者网站到最终激活账号的完整流程。包括选择个人或企业账号类型、付款方式及注意事项等。 ... [详细]
  • 本篇文章介绍如何将两个分别表示整数的链表进行相加,并生成一个新的链表。每个链表节点包含0到9的数值,如9-3-7和6-3相加得到1-0-0-0。通过反向处理链表、逐位相加并处理进位,最终再将结果链表反向,即可完成计算。 ... [详细]
  • 本文探讨了C++编程中理解代码执行期间复杂度的挑战,特别是编译器在程序运行时生成额外指令以确保对象构造、内存管理、类型转换及临时对象创建的安全性。 ... [详细]
  • 本文详细介绍了如何解决 Microsoft SQL Server 中用户 'sa' 登录失败的问题。错误代码为 18470,提示该帐户已被禁用。我们将通过 Windows 身份验证方式登录,并启用 'sa' 帐户以恢复其访问权限。 ... [详细]
  • ListView简单使用
    先上效果:主要实现了Listview的绑定和点击事件。项目资源结构如下:先创建一个动物类,用来装载数据:Animal类如下:packagecom.example.simplelis ... [详细]
  • CentOS 系统管理基础
    本文介绍了如何在 CentOS 中查询系统版本、内核版本、位数以及磁盘分区的相关知识。通过这些命令,用户可以快速了解系统的配置和磁盘结构。 ... [详细]
  • 本文详细探讨了 PHP 中 method_exists() 和 is_callable() 函数的区别,帮助开发者更好地理解和使用这两个函数。文章不仅解释了它们的功能差异,还提供了代码示例和应用场景的分析。 ... [详细]
author-avatar
qzq9037091
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有