为什么80%的码农都做不了架构师?>>>
目的
- 使用 NSQ作为消息流
- 使用 spark-streaming 进行消费
- 对数据进行清洗后,保存到hive仓库中
连接方案
1、编写Spark Streaming Custom Receivers(spark-streaming 自定义接收器),详细见文档
2、使用 nsq 官方提供的Java程序连接包 JavaNSQClient ,详细见文档
详细代码
自定义连接器
ReliableNSQReceiver.scala
import com.github.brainlag.nsq.callbacks.NSQMessageCallback
import com.github.brainlag.nsq.lookup.DefaultNSQLookup
import com.github.brainlag.nsq.{NSQConsumer, NSQMessage}
import org.apache.spark.internal.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiverclass MessageCallbacks(store_fun:String => Unit) extends NSQMessageCallback with Logging {def message(message: NSQMessage): Unit ={val s = new String(message.getMessage())store_fun(s)message.finished()}
}
/* 自定义连接器 */
class ReliableNSQReceiver(host: String, port: Int, topic: String, channel: String)extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {var consumer: NSQConsumer = nulldef onStart() {// 启动通过连接接收数据的线程new Thread("Socket Receiver") {override def run() { receive() }}.start()}def onStop() {logInfo("Stopped receiving")consumer.close}/** 接收数据 */private def receive() {try {val lookup = new DefaultNSQLookuplookup.addLookupAddress(host, port)consumer = new NSQConsumer(lookup, topic, channel, new MessageCallbacks(store))consumer.start} catch {case e: java.net.ConnectException =>restart("Error connecting to " + host + ":" + port, e)case t: Throwable =>restart("Error receiving data", t)}}}
使用连接器
import com.google.gson.JsonParser
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}/*
* 在定义一个 context 之后,您必须执行以下操作.* 通过创建输入 DStreams 来定义输入源.
* 通过应用转换和输出操作 DStreams 定义流计算(streaming computations).
* 开始接收输入并且使用 streamingContext.start() 来处理数据.
* 使用 streamingContext.awaitTermination() 等待处理被终止(手动或者由于任何错误).
* 使用 streamingContext.stop() 来手动的停止处理.*/object ELKStreaming extends Logging{def main(args: Array[String]): Unit &#61;{if (args.length <4) {System.err.println("Usage: ELKStreaming
}