热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

sparkstreaming连接消费nsq

为什么80%的码农都做不了架构师?spark-streaming连接消费nsq目的使用NSQ作为消息流使用spark-streaming进行消费对数据进行清洗后

为什么80%的码农都做不了架构师?>>>   hot3.png

spark-streaming连接消费nsq

目的

  • 使用 NSQ作为消息流
  • 使用 spark-streaming 进行消费
  • 对数据进行清洗后,保存到hive仓库中

连接方案

1、编写Spark Streaming Custom Receivers(spark-streaming 自定义接收器),详细见文档

2、使用 nsq 官方提供的Java程序连接包 JavaNSQClient ,详细见文档

详细代码

自定义连接器

ReliableNSQReceiver.scala

import com.github.brainlag.nsq.callbacks.NSQMessageCallback
import com.github.brainlag.nsq.lookup.DefaultNSQLookup
import com.github.brainlag.nsq.{NSQConsumer, NSQMessage}
import org.apache.spark.internal.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiverclass MessageCallbacks(store_fun:String => Unit) extends NSQMessageCallback with Logging {def message(message: NSQMessage): Unit ={val s = new String(message.getMessage())store_fun(s)message.finished()}
}
/* 自定义连接器 */
class ReliableNSQReceiver(host: String, port: Int, topic: String, channel: String)extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {var consumer: NSQConsumer = nulldef onStart() {// 启动通过连接接收数据的线程new Thread("Socket Receiver") {override def run() { receive() }}.start()}def onStop() {logInfo("Stopped receiving")consumer.close}/** 接收数据 */private def receive() {try {val lookup = new DefaultNSQLookuplookup.addLookupAddress(host, port)consumer = new NSQConsumer(lookup, topic, channel, new MessageCallbacks(store))consumer.start} catch {case e: java.net.ConnectException =>restart("Error connecting to " + host + ":" + port, e)case t: Throwable =>restart("Error receiving data", t)}}}

使用连接器

import com.google.gson.JsonParser
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}/*
* 在定义一个 context 之后,您必须执行以下操作.* 通过创建输入 DStreams 来定义输入源.
* 通过应用转换和输出操作 DStreams 定义流计算(streaming computations).
* 开始接收输入并且使用 streamingContext.start() 来处理数据.
* 使用 streamingContext.awaitTermination() 等待处理被终止(手动或者由于任何错误).
* 使用 streamingContext.stop() 来手动的停止处理.*/object ELKStreaming extends Logging{def main(args: Array[String]): Unit &#61;{if (args.length <4) {System.err.println("Usage: ELKStreaming ")System.exit(1)}logInfo("start &#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;>")StreamingExamples.setStreamingLogLevels()val sparkConf &#61; new SparkConf().setAppName("ELKStreaming").setMaster("yarn").set("hive.metastore.uris", "thrift://hadoop15.bigdata.org:9083")// 创建一个批次间隔为10val ssc &#61; new StreamingContext(sparkConf, Seconds(args(2).toInt))// 使用自定义的NSQReceiverval lines &#61; ssc.receiverStream(new ReliableNSQReceiver(args(0), args(1).toInt, "log", "scalatest"))val hiveStream: DStream[(String, String)] &#61; lines.map(line &#61;> prefix_exit(line))// 将计算后的数据保存到hive中hiveStream.foreachRDD(rdd &#61;> {// 利用SparkConf来初始化SparkSession。val sparkSession: SparkSession &#61; SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()// 导入隐式转换来将RDDimport sparkSession.implicits._// 将RDD转换成DFval df: DataFrame &#61; rdd.toDF("str", "ymd")// 取出表中的字段logInfo("df count &#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;>"&#43; df.count)df.createOrReplaceTempView("spark_logs")sparkSession.sql("insert into "&#43;args(3)&#43;" partition (ymd) select str,ymd from spark_logs")})ssc.start()ssc.awaitTermination()}def prefix_exit(line:String):(String,String) &#61;{// 对数据进行清洗计算val obj &#61; new JsonParser().parse(line).getAsJsonObjectval data_str1 &#61; obj.get("recv_timestamp").toString().split("T|Z|\"")val data_str2 &#61; data_str1(1).split(&#39;-&#39;)val data_str3 &#61; data_str2(1)&#43;"/"&#43;data_str2(2)&#43;"/"&#43;data_str2(0)&#43;" "&#43;data_str1(2)&#43;" [I] "&#43;obj.get("index_type").toString().split("\"")(1)&#43;" "&#43;lineval data_str4 &#61; data_str2(0)&#43;data_str2(1)&#43;data_str2(2)(data_str3.toString(), data_str4.toString())}
}


转:https://my.oschina.net/2devil/blog/3003109



推荐阅读
  • 在Android中实现黑客帝国风格的数字雨效果
    本文将详细介绍如何在Android平台上利用自定义View实现类似《黑客帝国》中的数字雨效果。通过实例代码,我们将探讨如何设置文字颜色、大小,以及如何控制数字下落的速度和间隔。 ... [详细]
  • 从理想主义者的内心深处萌发的技术信仰,推动了云原生技术在全球范围内的快速发展。本文将带你深入了解阿里巴巴在开源领域的贡献与成就。 ... [详细]
  • Web动态服务器Python基本实现
    Web动态服务器Python基本实现 ... [详细]
  • 精选10款Python框架助力并行与分布式机器学习
    随着神经网络模型的不断深化和复杂化,训练这些模型变得愈发具有挑战性,不仅需要处理大量的权重,还必须克服内存限制等问题。本文将介绍10款优秀的Python框架,帮助开发者高效地实现分布式和并行化的深度学习模型训练。 ... [详细]
  • 流处理中的计数挑战与解决方案
    本文探讨了在流处理中进行计数的各种技术和挑战,并基于作者在2016年圣何塞举行的Hadoop World大会上的演讲进行了深入分析。文章不仅介绍了传统批处理和Lambda架构的局限性,还详细探讨了流处理架构的优势及其在现代大数据应用中的重要作用。 ... [详细]
  • 本文详细介绍了 Spark 中的弹性分布式数据集(RDD)及其常见的操作方法,包括 union、intersection、cartesian、subtract、join、cogroup 等转换操作,以及 count、collect、reduce、take、foreach、first、saveAsTextFile 等行动操作。 ... [详细]
  • ASP.NET 进度条实现详解
    本文介绍了如何在ASP.NET中使用HTML和JavaScript创建一个动态更新的进度条,并通过Default.aspx页面进行展示。 ... [详细]
  • 利用Node.js实现PSD文件的高效切图
    本文介绍了如何通过Node.js及其psd2json模块,快速实现PSD文件的自动化切图过程,以适应项目中频繁的界面更新需求。此方法不仅提高了工作效率,还简化了从设计稿到实际应用的转换流程。 ... [详细]
  • 本文详细介绍了在Windows系统中如何配置Nginx以实现高效的缓存加速功能,包括关键的配置文件设置和示例代码。 ... [详细]
  • 本文详细介绍了如何正确设置Shadowsocks公共代理,包括调整超时设置、检查系统限制、防止滥用及遵守DMCA法规等关键步骤。 ... [详细]
  • 本文探讨了如何通过Service Locator模式来简化和优化在B/S架构中的服务命名访问,特别是对于需要频繁访问的服务,如JNDI和XMLNS。该模式通过缓存机制减少了重复查找的成本,并提供了对多种服务的统一访问接口。 ... [详细]
  • 本文档介绍了如何使用ESP32开发板在STA模式下实现与TCP服务器的通信,包括环境搭建、代码解析及实验步骤。 ... [详细]
  • 目录预备知识导包构建数据集神经网络结构训练测试精度可视化计算模型精度损失可视化输出网络结构信息训练神经网络定义参数载入数据载入神经网络结构、损失及优化训练及测试损失、精度可视化qu ... [详细]
  • 2020年9月15日,Oracle正式发布了最新的JDK 15版本。本次更新带来了许多新特性,包括隐藏类、EdDSA签名算法、模式匹配、记录类、封闭类和文本块等。 ... [详细]
  • 浅析python实现布隆过滤器及Redis中的缓存穿透原理_python
    本文带你了解了位图的实现,布隆过滤器的原理及Python中的使用,以及布隆过滤器如何应对Redis中的缓存穿透,相信你对布隆过滤 ... [详细]
author-avatar
金婉jessica氵_573
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有