热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

sparkstreamingkafka之createDirectStream模式

完整工程用例最近一直在用directstream方式消费kafka中的数据,特此总结,整个代码工程分为三个部分一.完整工程代码如下(某些地方特意做了说明,这个代码的部分函数直接用的

完整工程用例

最近一直在用directstream方式消费kafka中的数据,特此总结,整个代码工程分为三个部分
一. 完整工程代码如下(某些地方特意做了说明, 这个代码的部分函数直接用的是spark-streaming-kafka-0.8_2.11)

package directStream
import kafka.message.MessageAndMetadata;
import kafka.serializer.StringDecoder
import kafka.common.TopicAndPartition
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.TopicPartition
//import java.util._
import org.apache.spark.{SparkContext,SparkConf,TaskContext, SparkException}
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.kafka.{KafkaUtils,HasOffsetRanges, OffsetRange,KafkaCluster}
import com.typesafe.config.ConfigFactory
import scalikejdbc._
import scala.collection.JavaConverters._
object SetupJdbc {
def apply(driver: String, host: String, user: String, password: String): Unit = {
Class.forName(driver)
ConnectionPool.singleton(host, user, password)
}
}
object SimpleApp{
def main(args: Array[String]): Unit = {
val cOnf= ConfigFactory.load // 加载工程resources目录下application.conf文件,该文件中配置了databases信息,以及topic及group消息
val kafkaParams = Map[String, String](
"metadata.broker.list" -> conf.getString("kafka.brokers"),
"group.id" -> conf.getString("kafka.group"),
"auto.offset.reset" -> "smallest"

val jdbcDriver = conf.getString("jdbc.driver")
val jdbcUrl = conf.getString("jdbc.url")
val jdbcUser = conf.getString("jdbc.user")
val jdbcPassword = conf.getString("jdbc.password")
val topic = conf.getString("kafka.topics")
val group = conf.getString("kafka.group")
val ssc = setupSsc(kafkaParams, jdbcDriver, jdbcUrl, jdbcUser, jdbcPassword,topic, group)()
ssc.start()
ssc.awaitTermination()
}
def createStream(taskOffsetInfo: Map[TopicAndPartition, Long], kafkaParams: Map[String, String], conf:SparkConf, ssc: StreamingContext, topics:String):InputDStream[_] = {
// 若taskOffsetInfo 不为空, 说明这不是第一次启动该任务, database已经保存了该topic下该group的已消费的offset, 则对比kafka中该topic有效的offset的最小值和数据库保存的offset,去比较大作为新的offset.
if(taskOffsetInfo.size != 0){
val kc = new KafkaCluster(kafkaParams)
val earliestLeaderOffsets = kc.getEarliestLeaderOffsets(taskOffsetInfo.keySet)
if(earliestLeaderOffsets.isLeft)
throw new SparkException("get kafka partition failed:")
val earliestOffSets = earliestLeaderOffsets.right.get
val offsets = earliestOffSets.map(r =>
new TopicAndPartition(r._1.topic, r._1.partition) -> r._2.offset.toLong)
val newOffsets = taskOffsetInfo.map(r => {
val t = offsets(r._1)
if (t > r._2) {
r._1 -> t
} else {
r._1 -> r._2
}
}
)
val messageHandler = (mmd: MessageAndMetadata[String, String]) => 1L
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, Long](ssc, kafkaParams, newOffsets, messageHandler) //val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
} else {
val topicSet = topics.split(",").toSet
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams,topicSet)
}
} def setupSsc(
kafkaParams: Map[String, String],
jdbcDriver: String,
jdbcUrl: String,
jdbcUser: String,
jdbcPassword: String,
topics:String,
group:String
)(): StreamingCOntext= {
val cOnf= new SparkConf()
.setMaster("mesos://10.142.113.239:5050")
.setAppName("offset")
.set("spark.worker.timeout", "500")
.set("spark.cores.max", "10")
.set("spark.streaming.kafka.maxRatePerPartition", "500")
.set("spark.rpc.askTimeout", "600s")
.set("spark.network.timeout", "600s")
.set("spark.streaming.backpressure.enabled", "true")
.set("spark.task.maxFailures", "1")
.set("spark.speculationfalse", "false")
val ssc = new StreamingContext(conf, Seconds(5))
SetupJdbc(jdbcDriver, jdbcUrl, jdbcUser, jdbcPassword) // connect to mysql
// begin from the the offsets committed to the database
val fromOffsets = DB.readOnly { implicit session =>
sql"select topic, part, offset from streaming_task where group_id=$group".
map { resultSet =>
new TopicAndPartition(resultSet.string(1), resultSet.int(2)) -> resultSet.long(3)
}.list.apply().toMap
}
val stream = createStream(fromOffsets, kafkaParams, conf, ssc, topics) stream.foreachRDD { rdd =>
if(rdd.count != 0){
// you task
val t = rdd.map(record => (record, 1))
val results = t.reduceByKey {_+_}.collect

// persist the offset into the database
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
DB.localTx { implicit session =>
offsetRanges.foreach { osr =>
sql"""replace into streaming_task values(${osr.topic}, ${group}, ${osr.partition}, ${osr.untilOffset})""".update.apply()
if(osr.partition == 0){
println(osr.partition, osr.untilOffset)
}
}
}
}
}
ssc
}
}

二. 工程的resources文件下的有个application.conf配置文件,其配置如下

jdbc {
driver = "com.mysql.jdbc.Driver"
url = "jdbc:mysql://xxx.xxx.xxx.xxx:xxxx/xxxx"
user = "xxxx"
password = "xxxx"
}
kafka {
topics = "xxxx"
brokers = "xxxx.xxx.xxx.:xxx,xxx.xxx.xxx.xxx:9092,xxx.xxxx.xxx:xxxx"
group = "xxxxxx"
}
jheckpointDir = "hdfs://xxx.xxx.xxx.xxx:9000/shouzhucheckpoint"
batchDuratiOnMs= xxxx

三. 配置文件中可以看到, 我把offset 保存在 mysql里,这里我定义了一个table 名称为streaming_task, 表的结构信息如下:

+----------+--------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+----------+--------------+------+-----+---------+-------+
| topic | varchar(100) | NO | PRI | NULL | |
| group_id | varchar(50) | NO | PRI | | |
| part | int(4) | NO | PRI | 0 | |
| offset | mediumtext | YES | | NULL | |
+----------+--------------+------+-----+---------+-------+

部分解释如下:

一. 选用direct 的原因
官方为spark提供了两种方式来消费kafka中的数据, 高阶api由kafka自己来来维护offset, 有篇blog总结的比较好

第一种是利用 Kafka 消费者高级 API 在 Spark 的工作节点上创建消费者线程,订阅 Kafka 中的消息,数据会传输到 Spark 工作节点的执行器中,但是默认配置下这种方法在 Spark Job 出错时会导致数据丢失,如果要保证数据可靠性,需要在 Spark Streaming 中开启Write Ahead Logs(WAL),也就是上文提到的 Kafka 用来保证数据可靠性和一致性的数据保存方式。可以选择让 Spark 程序把 WAL 保存在分布式文件系统(比如 HDFS)中,

第二种方式不需要建立消费者线程,使用 createDirectStream 接口直接去读取 Kafka 的 WAL,将 Kafka 分区与 RDD 分区做一对一映射,相较于第一种方法,不需再维护一份 WAL 数据,提高了性能。读取数据的偏移量由 Spark Streaming 程序通过检查点机制自身处理,避免在程序出错的情况下重现第一种方法重复读取数据的情况,消除了 Spark Streaming 与 ZooKeeper/Kafka 数据不一致的风险。保证每条消息只会被 Spark Streaming 处理一次。以下代码片通过第二种方式读取 Kafka 中的数据:

在我在使用第一种方式的时候,如果数据量太大, 往往会出现报错,了解这这两种方式的不同后, 果断选用了第二种,

二. 引入KafkaCluster类的原因

引入KafkaCluster是为了在整个任务启动之前, 首先获取topic的有效的最旧offset. 这跟kafka的在实际的使用场景,大公司都是按时间删除kafka上数据有关,如果任务挂的时间太久,在还未能启动任务之前,database中保存的offset已经在kafak中失效,这时候为了最大程度的减少损失,只能从该topic的最旧数据开始消费..

三. 存入database的原因

看上面的代码,估计好多人也扒过KafkaCluster的源码, 这个类里面其实有一个setConsumerOffsets这样的方法, 其实在处理过一个batch的数据后, 更新一下该topic下group的offset即可,但是还是在开始启动这个 job 的时候还得验证该offset否有效. 貌似这样还不用外部数据库,岂不方便? 其实这样做确实挺方便,
有些场景下这样做无可厚非, 但我觉得: 如果处理完数据,要写到外部数据库, 此时,如果能把写数据和写offset放在一个事务中(前提是这个数据库是支持事务), 那么就可以即可保证严格消费一次

四. conf 中两个特殊设置设置

为了确保task不会重复执行请设置下面两个参数:

  • spark.task.maxFailures=1, Task重试次数为1,即不重试
  • spark.speculation=false 关闭推测执行, 重点说下这个参数spark.speculation这个参数表示空闲的资源节点会不会尝试执行还在运行,并且运行时间过长的Task,避免单个节点运行速度过慢导致整个任务卡在一个节点上。这个参数最好设置为true。与之相配合可以一起设置的参数有spark.speculation.×开头的参数(设置spark.speculation=true将执行事件过长的节点去掉并重新分配任务而spark.speculation.interval用来设置执行间隔)

推荐阅读
  • 本文介绍了解决java开源项目apache commons email简单使用报错的方法,包括使用正确的JAR包和正确的代码配置,以及相关参数的设置。详细介绍了如何使用apache commons email发送邮件。 ... [详细]
  • 在Android开发中,使用Picasso库可以实现对网络图片的等比例缩放。本文介绍了使用Picasso库进行图片缩放的方法,并提供了具体的代码实现。通过获取图片的宽高,计算目标宽度和高度,并创建新图实现等比例缩放。 ... [详细]
  • 向QTextEdit拖放文件的方法及实现步骤
    本文介绍了在使用QTextEdit时如何实现拖放文件的功能,包括相关的方法和实现步骤。通过重写dragEnterEvent和dropEvent函数,并结合QMimeData和QUrl等类,可以轻松实现向QTextEdit拖放文件的功能。详细的代码实现和说明可以参考本文提供的示例代码。 ... [详细]
  • CSS3选择器的使用方法详解,提高Web开发效率和精准度
    本文详细介绍了CSS3新增的选择器方法,包括属性选择器的使用。通过CSS3选择器,可以提高Web开发的效率和精准度,使得查找元素更加方便和快捷。同时,本文还对属性选择器的各种用法进行了详细解释,并给出了相应的代码示例。通过学习本文,读者可以更好地掌握CSS3选择器的使用方法,提升自己的Web开发能力。 ... [详细]
  • 《数据结构》学习笔记3——串匹配算法性能评估
    本文主要讨论串匹配算法的性能评估,包括模式匹配、字符种类数量、算法复杂度等内容。通过借助C++中的头文件和库,可以实现对串的匹配操作。其中蛮力算法的复杂度为O(m*n),通过随机取出长度为m的子串作为模式P,在文本T中进行匹配,统计平均复杂度。对于成功和失败的匹配分别进行测试,分析其平均复杂度。详情请参考相关学习资源。 ... [详细]
  • 在重复造轮子的情况下用ProxyServlet反向代理来减少工作量
    像不少公司内部不同团队都会自己研发自己工具产品,当各个产品逐渐成熟,到达了一定的发展瓶颈,同时每个产品都有着自己的入口,用户 ... [详细]
  • 本文介绍了如何使用C#制作Java+Mysql+Tomcat环境安装程序,实现一键式安装。通过将JDK、Mysql、Tomcat三者制作成一个安装包,解决了客户在安装软件时的复杂配置和繁琐问题,便于管理软件版本和系统集成。具体步骤包括配置JDK环境变量和安装Mysql服务,其中使用了MySQL Server 5.5社区版和my.ini文件。安装方法为通过命令行将目录转到mysql的bin目录下,执行mysqld --install MySQL5命令。 ... [详细]
  • 怎么在PHP项目中实现一个HTTP断点续传功能发布时间:2021-01-1916:26:06来源:亿速云阅读:96作者:Le ... [详细]
  • 本文介绍了在iOS开发中使用UITextField实现字符限制的方法,包括利用代理方法和使用BNTextField-Limit库的实现策略。通过这些方法,开发者可以方便地限制UITextField的字符个数和输入规则。 ... [详细]
  • Android工程师面试准备及设计模式使用场景
    本文介绍了Android工程师面试准备的经验,包括面试流程和重点准备内容。同时,还介绍了建造者模式的使用场景,以及在Android开发中的具体应用。 ... [详细]
  • 基于Socket的多个客户端之间的聊天功能实现方法
    本文介绍了基于Socket的多个客户端之间实现聊天功能的方法,包括服务器端的实现和客户端的实现。服务器端通过每个用户的输出流向特定用户发送消息,而客户端通过输入流接收消息。同时,还介绍了相关的实体类和Socket的基本概念。 ... [详细]
  • EPPlus绘制刻度线的方法及示例代码
    本文介绍了使用EPPlus绘制刻度线的方法,并提供了示例代码。通过ExcelPackage类和List对象,可以实现在Excel中绘制刻度线的功能。具体的方法和示例代码在文章中进行了详细的介绍和演示。 ... [详细]
  • AFNetwork框架(零)使用NSURLSession进行网络请求
    本文介绍了AFNetwork框架中使用NSURLSession进行网络请求的方法,包括NSURLSession的配置、请求的创建和执行等步骤。同时还介绍了NSURLSessionDelegate和NSURLSessionConfiguration的相关内容。通过本文可以了解到AFNetwork框架中使用NSURLSession进行网络请求的基本流程和注意事项。 ... [详细]
  • 2019独角兽企业重金招聘Python工程师标准
    本文介绍了2019独角兽企业对Python工程师的招聘标准,包括在AndroidManifest中定义meta-data的方法和获取meta-data值的代码。同时提供了获取meta-data值的具体实现方法。转载文章链接:https://my.oschina.net/u/244918/blog/685127 ... [详细]
  • 使用freemaker生成Java代码的步骤及示例代码
    本文介绍了使用freemaker这个jar包生成Java代码的步骤,通过提前编辑好的模板,可以避免写重复代码。首先需要在springboot的pom.xml文件中加入freemaker的依赖包。然后编写模板,定义要生成的Java类的属性和方法。最后编写生成代码的类,通过加载模板文件和数据模型,生成Java代码文件。本文提供了示例代码,并展示了文件目录结构。 ... [详细]
author-avatar
快乐碧云轩在江湖
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有