热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

sparkstreaming连接消费nsq

为什么80%的码农都做不了架构师?spark-streaming连接消费nsq目的使用NSQ作为消息流使用spark-streaming进行消费对数据进行清洗后

为什么80%的码农都做不了架构师?>>>   hot3.png

spark-streaming连接消费nsq

目的

  • 使用 NSQ作为消息流
  • 使用 spark-streaming 进行消费
  • 对数据进行清洗后,保存到hive仓库中

连接方案

1、编写Spark Streaming Custom Receivers(spark-streaming 自定义接收器),详细见文档

2、使用 nsq 官方提供的Java程序连接包 JavaNSQClient ,详细见文档

详细代码

自定义连接器

ReliableNSQReceiver.scala

import com.github.brainlag.nsq.callbacks.NSQMessageCallback
import com.github.brainlag.nsq.lookup.DefaultNSQLookup
import com.github.brainlag.nsq.{NSQConsumer, NSQMessage}
import org.apache.spark.internal.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiverclass MessageCallbacks(store_fun:String => Unit) extends NSQMessageCallback with Logging {def message(message: NSQMessage): Unit ={val s = new String(message.getMessage())store_fun(s)message.finished()}
}
/* 自定义连接器 */
class ReliableNSQReceiver(host: String, port: Int, topic: String, channel: String)extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {var consumer: NSQConsumer = nulldef onStart() {// 启动通过连接接收数据的线程new Thread("Socket Receiver") {override def run() { receive() }}.start()}def onStop() {logInfo("Stopped receiving")consumer.close}/** 接收数据 */private def receive() {try {val lookup = new DefaultNSQLookuplookup.addLookupAddress(host, port)consumer = new NSQConsumer(lookup, topic, channel, new MessageCallbacks(store))consumer.start} catch {case e: java.net.ConnectException =>restart("Error connecting to " + host + ":" + port, e)case t: Throwable =>restart("Error receiving data", t)}}}

使用连接器

import com.google.gson.JsonParser
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}/*
* 在定义一个 context 之后,您必须执行以下操作.* 通过创建输入 DStreams 来定义输入源.
* 通过应用转换和输出操作 DStreams 定义流计算(streaming computations).
* 开始接收输入并且使用 streamingContext.start() 来处理数据.
* 使用 streamingContext.awaitTermination() 等待处理被终止(手动或者由于任何错误).
* 使用 streamingContext.stop() 来手动的停止处理.*/object ELKStreaming extends Logging{def main(args: Array[String]): Unit &#61;{if (args.length <4) {System.err.println("Usage: ELKStreaming ")System.exit(1)}logInfo("start &#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;>")StreamingExamples.setStreamingLogLevels()val sparkConf &#61; new SparkConf().setAppName("ELKStreaming").setMaster("yarn").set("hive.metastore.uris", "thrift://hadoop15.bigdata.org:9083")// 创建一个批次间隔为10val ssc &#61; new StreamingContext(sparkConf, Seconds(args(2).toInt))// 使用自定义的NSQReceiverval lines &#61; ssc.receiverStream(new ReliableNSQReceiver(args(0), args(1).toInt, "log", "scalatest"))val hiveStream: DStream[(String, String)] &#61; lines.map(line &#61;> prefix_exit(line))// 将计算后的数据保存到hive中hiveStream.foreachRDD(rdd &#61;> {// 利用SparkConf来初始化SparkSession。val sparkSession: SparkSession &#61; SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()// 导入隐式转换来将RDDimport sparkSession.implicits._// 将RDD转换成DFval df: DataFrame &#61; rdd.toDF("str", "ymd")// 取出表中的字段logInfo("df count &#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;>"&#43; df.count)df.createOrReplaceTempView("spark_logs")sparkSession.sql("insert into "&#43;args(3)&#43;" partition (ymd) select str,ymd from spark_logs")})ssc.start()ssc.awaitTermination()}def prefix_exit(line:String):(String,String) &#61;{// 对数据进行清洗计算val obj &#61; new JsonParser().parse(line).getAsJsonObjectval data_str1 &#61; obj.get("recv_timestamp").toString().split("T|Z|\"")val data_str2 &#61; data_str1(1).split(&#39;-&#39;)val data_str3 &#61; data_str2(1)&#43;"/"&#43;data_str2(2)&#43;"/"&#43;data_str2(0)&#43;" "&#43;data_str1(2)&#43;" [I] "&#43;obj.get("index_type").toString().split("\"")(1)&#43;" "&#43;lineval data_str4 &#61; data_str2(0)&#43;data_str2(1)&#43;data_str2(2)(data_str3.toString(), data_str4.toString())}
}


转:https://my.oschina.net/2devil/blog/3003109



推荐阅读
  • 在CentOS上部署和配置FreeSWITCH
    在CentOS系统上部署和配置FreeSWITCH的过程涉及多个步骤。本文详细介绍了从源代码安装FreeSWITCH的方法,包括必要的依赖项安装、编译和配置过程。此外,还提供了常见的配置选项和故障排除技巧,帮助用户顺利完成部署并确保系统的稳定运行。 ... [详细]
  • 本文介绍如何使用 Python 的 DOM 和 SAX 方法解析 XML 文件,并通过示例展示了如何动态创建数据库表和处理大量数据的实时插入。 ... [详细]
  • Hadoop的文件操作位于包org.apache.hadoop.fs里面,能够进行新建、删除、修改等操作。比较重要的几个类:(1)Configurati ... [详细]
  • 浅析python实现布隆过滤器及Redis中的缓存穿透原理_python
    本文带你了解了位图的实现,布隆过滤器的原理及Python中的使用,以及布隆过滤器如何应对Redis中的缓存穿透,相信你对布隆过滤 ... [详细]
  • DAO(Data Access Object)模式是一种用于抽象和封装所有对数据库或其他持久化机制访问的方法,它通过提供一个统一的接口来隐藏底层数据访问的复杂性。 ... [详细]
  • IOS Run loop详解
    为什么80%的码农都做不了架构师?转自http:blog.csdn.netztp800201articledetails9240913感谢作者分享Objecti ... [详细]
  • 本文详细介绍了MySQL数据库的基础语法与核心操作,涵盖从基础概念到具体应用的多个方面。首先,文章从基础知识入手,逐步深入到创建和修改数据表的操作。接着,详细讲解了如何进行数据的插入、更新与删除。在查询部分,不仅介绍了DISTINCT和LIMIT的使用方法,还探讨了排序、过滤和通配符的应用。此外,文章还涵盖了计算字段以及多种函数的使用,包括文本处理、日期和时间处理及数值处理等。通过这些内容,读者可以全面掌握MySQL数据库的核心操作技巧。 ... [详细]
  • 在 Vue 应用开发中,页面状态管理和跨页面数据传递是常见需求。本文将详细介绍 Vue Router 提供的两种有效方式,帮助开发者高效地实现页面间的数据交互与状态同步,同时分享一些最佳实践和注意事项。 ... [详细]
  • 在Android应用开发中,实现与MySQL数据库的连接是一项重要的技术任务。本文详细介绍了Android连接MySQL数据库的操作流程和技术要点。首先,Android平台提供了SQLiteOpenHelper类作为数据库辅助工具,用于创建或打开数据库。开发者可以通过继承并扩展该类,实现对数据库的初始化和版本管理。此外,文章还探讨了使用第三方库如Retrofit或Volley进行网络请求,以及如何通过JSON格式交换数据,确保与MySQL服务器的高效通信。 ... [详细]
  • 本文探讨了 Kafka 集群的高效部署与优化策略。首先介绍了 Kafka 的下载与安装步骤,包括从官方网站获取最新版本的压缩包并进行解压。随后详细讨论了集群配置的最佳实践,涵盖节点选择、网络优化和性能调优等方面,旨在提升系统的稳定性和处理能力。此外,还提供了常见的故障排查方法和监控方案,帮助运维人员更好地管理和维护 Kafka 集群。 ... [详细]
  • 本文介绍了如何在iOS平台上使用GLSL着色器将YV12格式的视频帧数据转换为RGB格式,并展示了转换后的图像效果。通过详细的技术实现步骤和代码示例,读者可以轻松掌握这一过程,适用于需要进行视频处理的应用开发。 ... [详细]
  • 在过去,我曾使用过自建MySQL服务器中的MyISAM和InnoDB存储引擎(也曾尝试过Memory引擎)。今年初,我开始转向阿里云的关系型数据库服务,并深入研究了其高效的压缩存储引擎TokuDB。TokuDB在数据压缩和处理大规模数据集方面表现出色,显著提升了存储效率和查询性能。通过实际应用,我发现TokuDB不仅能够有效减少存储成本,还能显著提高数据处理速度,特别适用于高并发和大数据量的场景。 ... [详细]
  • 深入解析 Vue 中的 Axios 请求库
    本文深入探讨了 Vue 中的 Axios 请求库,详细解析了其核心功能与使用方法。Axios 是一个基于 Promise 的 HTTP 客户端,支持浏览器和 Node.js 环境。文章首先介绍了 Axios 的基本概念,随后通过具体示例展示了如何在 Vue 项目中集成和使用 Axios 进行数据请求。无论你是初学者还是有经验的开发者,本文都能为你解决 Vue.js 相关问题提供有价值的参考。 ... [详细]
  • 揭秘腾讯云CynosDB计算层设计优化背后的不为人知的故事与技术细节
    揭秘腾讯云CynosDB计算层设计优化背后的不为人知的故事与技术细节 ... [详细]
  • 解决MySQL 5.1服务器无法正确识别中文字符的问题
    在使用MySQL 5.1服务器时,可能会遇到无法正确识别中文字符的问题。由于相关资料较少且不够全面,本文将详细阐述解决方案。首先,需要检查MySQL的配置文件,确保字符集设置正确,并通过命令行工具验证当前的字符编码配置。此外,建议更新到最新版本以避免此类问题。 ... [详细]
author-avatar
金婉jessica氵_573
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有