热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

7.4基本输入源

一、文件流1.在spark-shell中创建文件流进入spark-shell创建文件流。另外打开一个终端窗口,启动进入spark-shell上面在spark-shell中执行的程序

一、文件流

技术分享图片


1.在spark-shell中创建文件流

技术分享图片

进入spark-shell创建文件流。另外打开一个终端窗口,启动进入spark-shell 

技术分享图片

上面在spark-shell中执行的程序,一旦你输入ssc.start()以后,程序就开始自动进入循环监听状态,屏幕上会显示一堆的信息,如下:

技术分享图片

在“/usr/local/spark/mycode/streaming/logfile”目录下新建一个log.txt文件,就可以在监听窗口中显示词频统计结果


2.采用独立应用程序方式创建文件流

在当前streaming下创建三级子目录,因为只有把代码放到src/main/scala目录下,sbt打包编译工具才能够正确运行。

技术分享图片

用vim编辑器新建一个TestStreaming.scala代码文件,请在里面输入以下代码:


import org.apache.spark._
import org.apache.spark.streaming._
object WordCountStreaming {
def main(args: Array[String]) {
val sparkCOnf= new SparkConf().setAppName("WordCountStreaming").setMaster("local[2]")//设置为本地运行模式,2个线程,一个监听,另一个处理数据
val ssc = new StreamingContext(sparkConf, Seconds(2))// 时间间隔为2秒
val lines = ssc.textFileStream("file:///usr/local/spark/mycode/streaming/logfile") //这里采用本地文件,当然你也可以采用HDFS文件
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}

技术分享图片

技术分享图片

在simple.sbt文件中输入以下代码: 技术分享图片

执行sbt打包编译的命令如下:

技术分享图片

 打包成功以后,就可以输入以下命令启动这个程序:

技术分享图片



  1. 执行上面命令后,就进入了监听状态(我们把运行这个监听程序的窗口称为监听窗口)

  2. 切换到另外一个Shell窗口,在"/usr/local/spark/mycode/streaming/logfile"目录下再新建一个log2.txt文件,文件里面随便输入一些单词,保存好文件退出vim编辑器

  3. 再次切换回“监听窗口”,等待20秒以后,按键盘Ctrl+C或者Ctrl+D停止监听程序,就可以看到监听窗口的屏幕上会打印出单词统计信息


二、套接字流

Spark Streaming可以通过Socket端口监听并接收数据,然后进行相应处理


1.Socket工作原理

技术分享图片


2.使用套接字流作为数据源

技术分享图片

请在NetworkWordCount.scala文件中输入如下内容:(客户端向服务端发起连接,需要告诉它向哪个主机哪个端口发起连接)


package org.apache.spark.examples.streaming
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.storage.StorageLevel
object NetworkWordCount {
def main(args: Array[String]) {
if (args.length <2) {
System.err.println("Usage: NetworkWordCount ")
System.exit(1)
}
StreamingExamples.setStreamingLogLevels() # 设置日志显示级别
val sparkCOnf= new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]") #生成sparkConf对象
val ssc = new StreamingContext(sparkConf, Seconds(1)) #生成一个StreamingContext对象
   # 1.定义输入数据流,args(0)是TCP服务端的主机名,args(1)是TCP服务端的端口号(字符串转整数)
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)#保存数据方式
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}

技术分享图片技术分享图片

在相同目录下再新建另外一个代码文件StreamingExamples.scala,文件内容如下:


package org.apache.spark.examples.streaming
import org.apache.spark.internal.Logging
import org.apache.log4j.{Level, Logger}
/** Utility functions for Spark Streaming examples. */
object StreamingExamples extends Logging { # 单例对象,不需要实例化,直接用它的静态方法
/** Set reasonable logging levels for streaming if the user has not configured log4j. */
def setStreamingLogLevels() {
val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
if (!log4jInitialized) {
// We first log something to initialize Spark‘s default logging, then we override the
// logging level.
logInfo("Setting log level to [WARN] for streaming example." +
" To override add a custom log4j.properties to the classpath.")
Logger.getRootLogger.setLevel(Level.WARN)
}
}
}

技术分享图片

技术分享图片技术分享图片技术分享图片

 新打开一个窗口作为nc窗口,启动nc程序:

技术分享图片

可以在nc窗口中随意输入一些单词,监听窗口就会自动获得单词数据流信息,在监听窗口每隔1秒就会打印出词频统计信息,大概会在屏幕上出现类似如下的结果:

技术分享图片


3.使用Socket编程实现自定义数据源

下面我们再前进一步,把数据源头的产生方式修改一下,不要使用nc程序,而是采用自己编写的程序产生Socket数据源。

技术分享图片

 

 

技术分享图片

 

 

 技术分享图片

 

 

 


三、RDD队列流


推荐阅读
  • 本文深入解析了Java面向对象编程的核心概念及其应用,重点探讨了面向对象的三大特性:封装、继承和多态。封装确保了数据的安全性和代码的可维护性;继承支持代码的重用和扩展;多态则增强了程序的灵活性和可扩展性。通过具体示例,文章详细阐述了这些特性在实际开发中的应用和优势。 ... [详细]
  • 单链表的高效遍历及性能优化策略
    本文探讨了单链表的高效遍历方法及其性能优化策略。在单链表的数据结构中,插入操作的时间复杂度为O(n),而遍历操作的时间复杂度为O(n^2)。通过在 `LinkList.h` 和 `main.cpp` 文件中对单链表进行封装,我们实现了创建和销毁功能的优化,提高了单链表的使用效率。此外,文章还介绍了几种常见的优化技术,如缓存节点指针和批量处理,以进一步提升遍历性能。 ... [详细]
  • ButterKnife 是一款用于 Android 开发的注解库,主要用于简化视图和事件绑定。本文详细介绍了 ButterKnife 的基础用法,包括如何通过注解实现字段和方法的绑定,以及在实际项目中的应用示例。此外,文章还提到了截至 2016 年 4 月 29 日,ButterKnife 的最新版本为 8.0.1,为开发者提供了最新的功能和性能优化。 ... [详细]
  • 在 CentOS 6.5 系统上部署 VNC 服务器的详细步骤与配置指南
    在 CentOS 6.5 系统上部署 VNC 服务器时,首先需要确认 VNC 服务是否已安装。通常情况下,VNC 服务默认未安装。可以通过运行特定的查询命令来检查其安装状态。如果查询结果为空,则表明 VNC 服务尚未安装,需进行手动安装。此外,建议在安装前确保系统的软件包管理器已更新至最新版本,以避免兼容性问题。 ... [详细]
  • iOS 设备唯一标识获取的高效解决方案与实践
    在iOS 7中,苹果公司再次禁止了对MAC地址的访问,使得开发者无法直接获取设备的物理地址。为了在开发过程中实现设备的唯一标识,苹果推荐使用Keychain服务来存储和管理唯一的标识符。此外,还可以结合其他技术手段,如UUID和广告标识符(IDFA),以确保设备的唯一性和安全性。这些方法不仅能够满足应用的需求,还能保护用户的隐私。 ... [详细]
  • 本文探讨了如何有效地构建和优化微信公众平台账号,涵盖了用户信息管理、内容创作与发布、互动策略及数据分析等方面。通过合理设置用户信息字段,如用户名、昵称、密码、真实姓名和性别等,确保账号的安全性和用户体验。同时,文章还介绍了如何利用微信公众平台的各项功能,提升用户参与度和品牌影响力。 ... [详细]
  • 并发编程入门:初探多任务处理技术
    并发编程入门:探索多任务处理技术并发编程是指在单个处理器上高效地管理多个任务的执行过程。其核心在于通过合理分配和协调任务,提高系统的整体性能。主要应用场景包括:1) 将复杂任务分解为多个子任务,并分配给不同的线程,实现并行处理;2) 通过同步机制确保线程间协调一致,避免资源竞争和数据不一致问题。此外,理解并发编程还涉及锁机制、线程池和异步编程等关键技术。 ... [详细]
  • 在 POJ1651 的乘法谜题挑战中,如果选手按相反顺序选择卡片,即先选 50,再选 20,最后选 1,则最终得分会有所不同。题目要求输入的第一行包含... 改写后的摘要:在 POJ1651 的乘法谜题挑战中,如果选手按照逆序选取卡片,例如依次选择 50、20 和 1,最终的得分将发生变化。题目首先要求输入的第一行包括... ... [详细]
  • 资源管理器的基础架构包括三个核心组件:1)资源池,用于将CPU和内存等资源分配给不同的容器;2)负载组,负责承载任务并将其分配到相应的资源池;3)分类函数,用于将不同的会话映射到合适的负载组。该系统提供了两种主要的资源管理策略。 ... [详细]
  • AngularJS 进阶指南:第三部分深入解析
    在本文中,我们将深入探讨 AngularJS 的指令模型,特别是 `ng-model` 指令。`ng-model` 指令用于将 HTML 元素与应用程序数据进行双向绑定,支持多种数据类型验证,如数字、电子邮件地址和必填项检查。此外,我们还将介绍如何利用该指令优化表单验证和数据处理流程,提升开发效率和用户体验。 ... [详细]
  • Android中将独立SO库封装进JAR包并实现SO库的加载与调用
    在Android开发中,将独立的SO库封装进JAR包并实现其加载与调用是一个常见的需求。本文详细介绍了如何将SO库嵌入到JAR包中,并确保在外部应用调用该JAR包时能够正确加载和使用这些SO库。通过这种方式,开发者可以更方便地管理和分发包含原生代码的库文件,提高开发效率和代码复用性。文章还探讨了常见的问题及其解决方案,帮助开发者避免在实际应用中遇到的坑。 ... [详细]
  • 开发了一款Windows API查看器,支持VBA语句导出,并提供超过两万个API的MSDN链接查询功能。
    开发了一款名为Windows API Viewer的工具,支持导出VBA语句,并集成了超过两万个API的MSDN链接查询功能,方便用户快速查找和使用相关API信息。 ... [详细]
  • 男性健康问题常常被忽视,许多人对疾病持轻视态度,即使出现症状也往往置之不理,认为会自行好转。然而,现代男性在健康管理方面应当重视医生的专业建议。以下是十个关键点,包括但不限于:胸口疼痛应及时就医、定期进行体检、保持合理饮食和适量运动等,以维护整体健康。 ... [详细]
  • 本指南旨在帮助Swoole初学者快速掌握异步并发编程的基本概念和实践方法。通过实例演示,我们将使用Swoole PHP扩展构建一个简单的客户端与服务器模型,并实现基本的通信功能。首先,我们将从客户端的实现入手(文件名为:client.php)。 ... [详细]
  • 在 Visual Studio 中,未选中文本时,使用 `Ctrl+X` 可以剪切并删除当前行,适用于快速删除整行代码;`Ctrl+C` 用于复制当前行的代码;`Ctrl+L` 则用于删除当前行。此外,通过组合键 `Ctrl+K, Ctrl+C` 可以注释选定的代码行,提升代码编辑效率。这些快捷键和技巧能够显著提高开发者的生产力,建议开发者熟练掌握并灵活运用。 ... [详细]
author-avatar
qzq9037091
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有