热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

将KafKa的偏移量写入Mysql

packageSparkStreamingKafKa.OffSetMysqlimport

package SparkStreamingKafKa.OffSetMysql
import java.sql.{DriverManager, ResultSet}
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.{HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import redis.clients.jedis.Jedis
import scala.collection.mutable
object StreamingKafkaWCMysqlOffset1 {
//设置日志级别
Logger.getLogger("org").setLevel(Level.WARN)
def main(args: Array[String]): Unit = {
//conf 本地运行设置
val conf: SparkCOnf= new SparkConf()
.setMaster("local[*]")
.setAppName(this.getClass.getSimpleName)
//SparkStreaming
val ssc: StreamingCOntext= new StreamingContext(conf, Seconds(3))
val groupId = "hello_topic_group0"
// kafka的参数配置
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "Linux00:9092,Linux01:9092,Linux04:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> groupId,
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topic = "he8"
val topics = Array(topic)
val config: COnfig= ConfigFactory.load()
// 需要设置偏移量的值
val offsets = mutable.HashMap[TopicPartition, Long]()
val conn1 = DriverManager.getConnection(config.getString("db.url"), config.getString("db.user"), config.getString("db.password"))
val pstm = conn1.prepareStatement("select * from mysqloffset_copy where groupId = ? and topic = ? ")
pstm.setString(1, groupId)
pstm.setString(2, topic)
val result: ResultSet = pstm.executeQuery()
while (result.next()) {
// 把数据库中的偏移量数据加载了
val p = result.getInt("partition")
val f = result.getInt("untilOffset")
// offsets += (new TopicPartition(topic,p)-> f)
val partition: TopicPartition = new TopicPartition(topic, p)
offsets.put(partition, f)
}
val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
Subscribe[String, String](topics, kafkaParams,offsets)
)
//转换成RDD
stream.foreachRDD(rdd => {
//手动指定分区的地方
val ranges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
println("长度=" + ranges.length)
ranges.foreach(println)
val result: RDD[(String, Int)] = rdd.map(_.value()).map((_, 1)).reduceByKey(_ + _)
result.foreach(println)
result.foreachPartition(p => {
val jedis: Jedis = ToolsRedisMysql.getJedis()
p.foreach(t => {
jedis.hincrBy("wc1", t._1, t._2)
})
jedis.close()
})
val cOnn= DriverManager.getConnection(config.getString("db.url"), config.getString("db.user"), config.getString("db.password"))
// 把偏移量的Array 写入到mysql中
ranges.foreach(t => {
// 思考,需要保存哪些数据呢? 起始的offset不需要 还需要加上 groupid

val pstm = conn.prepareStatement("replace into mysqloffset_copy values (?,?,?,?)")
pstm.setString(1, t.topic)
pstm.setInt(2, t.partition)
pstm.setLong(3, t.untilOffset)
pstm.setString(4, groupId)
pstm.execute()
pstm.close()
})
})
ssc.start()
ssc.awaitTermination()
}
}

推荐阅读
  • 关键词:Golang, Cookie, 跟踪位置, net/http/cookiejar, package main, golang.org/x/net/publicsuffix, io/ioutil, log, net/http, net/http/cookiejar ... [详细]
  • 本文介绍了Python高级网络编程及TCP/IP协议簇的OSI七层模型。首先简单介绍了七层模型的各层及其封装解封装过程。然后讨论了程序开发中涉及到的网络通信内容,主要包括TCP协议、UDP协议和IPV4协议。最后还介绍了socket编程、聊天socket实现、远程执行命令、上传文件、socketserver及其源码分析等相关内容。 ... [详细]
  • 本文介绍了解决Netty拆包粘包问题的一种方法——使用特殊结束符。在通讯过程中,客户端和服务器协商定义一个特殊的分隔符号,只要没有发送分隔符号,就代表一条数据没有结束。文章还提供了服务端的示例代码。 ... [详细]
  • HDU 2372 El Dorado(DP)的最长上升子序列长度求解方法
    本文介绍了解决HDU 2372 El Dorado问题的一种动态规划方法,通过循环k的方式求解最长上升子序列的长度。具体实现过程包括初始化dp数组、读取数列、计算最长上升子序列长度等步骤。 ... [详细]
  • 本文讨论了如何优化解决hdu 1003 java题目的动态规划方法,通过分析加法规则和最大和的性质,提出了一种优化的思路。具体方法是,当从1加到n为负时,即sum(1,n)sum(n,s),可以继续加法计算。同时,还考虑了两种特殊情况:都是负数的情况和有0的情况。最后,通过使用Scanner类来获取输入数据。 ... [详细]
  • 个人学习使用:谨慎参考1Client类importcom.thoughtworks.gauge.Step;importcom.thoughtworks.gauge.T ... [详细]
  • 怎么在PHP项目中实现一个HTTP断点续传功能发布时间:2021-01-1916:26:06来源:亿速云阅读:96作者:Le ... [详细]
  • CEPH LIO iSCSI Gateway及其使用参考文档
    本文介绍了CEPH LIO iSCSI Gateway以及使用该网关的参考文档,包括Ceph Block Device、CEPH ISCSI GATEWAY、USING AN ISCSI GATEWAY等。同时提供了多个参考链接,详细介绍了CEPH LIO iSCSI Gateway的配置和使用方法。 ... [详细]
  • 本文介绍了一个适用于PHP应用快速接入TRX和TRC20数字资产的开发包,该开发包支持使用自有Tron区块链节点的应用场景,也支持基于Tron官方公共API服务的轻量级部署场景。提供的功能包括生成地址、验证地址、查询余额、交易转账、查询最新区块和查询交易信息等。详细信息可参考tron-php的Github地址:https://github.com/Fenguoz/tron-php。 ... [详细]
  • AFNetwork框架(零)使用NSURLSession进行网络请求
    本文介绍了AFNetwork框架中使用NSURLSession进行网络请求的方法,包括NSURLSession的配置、请求的创建和执行等步骤。同时还介绍了NSURLSessionDelegate和NSURLSessionConfiguration的相关内容。通过本文可以了解到AFNetwork框架中使用NSURLSession进行网络请求的基本流程和注意事项。 ... [详细]
  • 使用freemaker生成Java代码的步骤及示例代码
    本文介绍了使用freemaker这个jar包生成Java代码的步骤,通过提前编辑好的模板,可以避免写重复代码。首先需要在springboot的pom.xml文件中加入freemaker的依赖包。然后编写模板,定义要生成的Java类的属性和方法。最后编写生成代码的类,通过加载模板文件和数据模型,生成Java代码文件。本文提供了示例代码,并展示了文件目录结构。 ... [详细]
  • 7.4 基本输入源
    一、文件流1.在spark-shell中创建文件流进入spark-shell创建文件流。另外打开一个终端窗口,启动进入spark-shell上面在spark-shell中执行的程序 ... [详细]
  • 《Spark核心技术与高级应用》——1.2节Spark的重要扩展
    本节书摘来自华章社区《Spark核心技术与高级应用》一书中的第1章,第1.2节Spark的重要扩展,作者于俊向海代其锋马海平,更多章节内容可以访问云栖社区“华章社区”公众号查看1. ... [详细]
  • Spark Streaming和Kafka整合之路(最新版本)
    2019独角兽企业重金招聘Python工程师标准最近完成了SparkStreaming和Kafka的整合工作,耗时虽然不长,但是当中还是遇到了不少 ... [详细]
  • 你知道Kafka和Redis的各自优缺点吗?一文带你优化选择,不走弯路 ... [详细]
author-avatar
我是一颗菠菜
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有