版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
传送门:大数据系列文章目录
官方网址:http://spark.apache.org/、 http://spark.apache.org/sql/
Apache Kafka 是目前最流行的一个分布式的实时流消息系统,给下游订阅消费系统提供了并行处理和可靠容错机制,现在大公司在流式数据的处理场景, Kafka基本是标配。 StructuredStreaming很好的集成Kafka,可以从Kafka拉取消息,然后就可以把流数据看做一个DataFrame, 一张无限增长的大表,在这个大表上做查询, Structured Streaming保证了端到端的 exactly-once,用户只需要关心业务即可,不用费心去关心底层是怎么做的。
StructuredStreaming集成Kafka,官方文档如下:
http://spark.apache.org/docs/2.4.5/structured-streaming-kafka-integration.html
目前仅支持Kafka 0.10.+版本及以上,底层使用Kafka New Consumer API拉取数据,如果公司Kafka版本为0.8.0版本, StructuredStreaming集成Kafka参考文档:
https://github.com/jerryshao/spark-kafka-0-8-sql
StructuredStreaming既可以从Kafka读取数据,又可以向Kafka 写入数据,添加Maven依赖:
<dependency>
<groupId>org.apache.sparkgroupId>
<artifactId>spark-sql-kafka-0-10_2.11artifactId>
<version>2.4.5version>
dependency>
kafka 数据消费
Kafka把生产者发送的数据放在不同的分区里面&#xff0c;这样就可以并行进行消费了。 每个分区里面的数据都是递增有序的&#xff0c;跟structured commit log类似&#xff0c;生产者和消费者使用Kafka 进行解耦&#xff0c;消费者不管你生产者发送的速率如何&#xff0c;只要按照一定的节奏进行消费就可以了。 每条消息在一个分区里面都有一个唯一的序列号offset&#xff08;偏移量&#xff09; &#xff0c; Kafka 会对内部存储的消息设置一个过期时间&#xff0c;如果过期了&#xff0c;就会标记删除&#xff0c;不管这条消息有没有被消费。
Kafka 可以被看成一个无限的流&#xff0c;里面的流数据是短暂存在的&#xff0c;如果不消费&#xff0c;消息就过期滚动没了。涉及一个问题&#xff1a;如果开始消费&#xff0c;就要定一下从什么位置开始。
当第一次开始消费一个Kafka 流的时候&#xff0c;上述策略任选其一&#xff0c;如果之前已经消费了&#xff0c;而且做了
checkpoint &#xff0c;比如消费程序升级了&#xff0c;这时候就会从上次结束的位置开始继续消费。目前
StructuredStreaming和Flink框架从Kafka消费数据时&#xff0c;采用的就是上述的策略。
Structured Streaming消费Kafka数据&#xff0c;采用的是poll方式拉取数据&#xff0c;与Spark Streaming中New
Consumer API集成方式一致。从Kafka Topics中读取消息&#xff0c;需要指定数据源&#xff08;kafka&#xff09;、 Kafka集群
的连接地址&#xff08;kafka.bootstrap.servers&#xff09;、消费的topic&#xff08;subscribe或subscribePattern&#xff09; &#xff0c; 指定topic
的时候&#xff0c;可以使用正则来指定&#xff0c;也可以指定一个 topic 的集合。
官方提供三种方式从Kafka topic中消费数据&#xff0c;主要区别在于每次消费Topic名称指定的不同
从Kafka 获取数据后Schema字段信息如下&#xff0c;既包含数据信息有包含元数据信息&#xff1a;
在实际开发时&#xff0c;往往需要获取每条数据的消息&#xff0c;存储在value字段中&#xff0c;由于是binary类型&#xff0c;需要转换为字符串String类型&#xff1b;此外了方便数据操作&#xff0c;通常将获取的key和value的DataFrame转换为Dataset强类型&#xff0c;伪代码如下&#xff1a;
从Kafka数据源读取数据时&#xff0c;可以设置相关参数&#xff0c;包含必须参数和可选参数&#xff1a;
范例演示&#xff1a; 从Kafka消费数据&#xff0c;进行词频统计&#xff0c; Topic为wordsTopic。
# 启动Zookeeper
/export/server/zookeeper/bin/zkServer.sh start
# 启动Kafka Broker
/export/server/kafka/bin/kafka-server-start.sh -daemon /export/server/kafka/config/server.properties
## &#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61; wordsTopic &#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;
# 查看Topic信息
/export/server/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.10.10:2181/kafka200
# 创建topic
/export/server/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.10.10:2181/kafka200 --replicatio
n-factor 1 --partitions 3 --topic wordsTopic
# 模拟生产者
/export/server/kafka/bin/kafka-console-producer.sh --broker-list 192.168.10.10:9092 --topic wordsTopic
package kafka.source
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery}
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* &#64;author lwh
* &#64;date 2022/10/10
* &#64;description
**/
object StructuredKafkaSource {
def main(args: Array[String]): Unit &#61; {
// 构建SparkSession实例对象
val spark: SparkSession &#61; SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[3]")
// 设置Shuffle分区数目
.config("spark.sql.shuffle.partitions", "3")
.getOrCreate()
// 导入隐式转换和函数库
import spark.implicits._
import org.apache.spark.sql.functions._
// TODO: 从Kafka读取数据&#xff0c;底层采用New Consumer API
val kafkaStreamDF: DataFrame &#61; spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "node01:9092")
.option("subscribe", "wordsTopic")
// TODO: 设置每批次消费数据最大值
.option("maxOffsetsPerTrigger", "100000")
.load()
// TODO: 进行词频统计
val resultStreamDF: DataFrame &#61; kafkaStreamDF
// 获取value字段的值&#xff0c;转换为String类型
.selectExpr("CAST(value AS STRING)")
// 转换为Dataset类型
.as[String]
// 过滤数据
.filter(line &#61;> null !&#61; line && line.trim.length > 0)
// 分割单词
.flatMap(line &#61;> line.trim.split("\\s&#43;"))
// 按照单词分组&#xff0c;聚合
.groupBy($"value").count()
// 设置Streaming应用输出及启动
val query: StreamingQuery &#61; resultStreamDF.writeStream
.outputMode(OutputMode.Complete())
.format("console").option("numRows", "10").option("truncate", "false")
.start()
query.awaitTermination() // 查询器等待流式应用终止
query.stop() // 等待所有任务运行完成才停止运行
}
}
kafka 接收器
往Kafka里面写数据类似读取数据&#xff0c; 可以在DataFrame上调用writeStream来写入Kafka&#xff0c;设置参数指定value&#xff0c;其中key是可选的&#xff0c;如果不指定就是null。如果key为null&#xff0c;有时候可能导致分区数据不均匀。
将DataFrame写入Kafka时&#xff0c; Schema信息中所需的字段&#xff1a;
需要写入哪个topic&#xff0c;可以像上述所示在操作DataFrame 的时候在每条record上加一列topic字段指定&#xff0c; 也可以在DataStreamWriter上指定option配置。
写入数据至Kafka&#xff0c;需要设置Kafka Brokers地址信息及可选配置&#xff1a;
官方提供示例代码如下&#xff1a;
在实际实时流式项目中&#xff0c;无论使用Storm、 SparkStreaming、 Flink及Structured Streaming处理流式数据时&#xff0c;往往先从Kafka 消费原始的流式数据&#xff0c;经过ETL后将其存储到Kafka Topic中&#xff0c;以便其他业务相关应用消费数据&#xff0c;实时处理分析&#xff0c;技术架构流程图如下所示&#xff1a;
接下来模拟产生运营商基站数据&#xff0c;实时发送到Kafka 中&#xff0c;使用StructuredStreaming消费&#xff0c;经过ETL&#xff08;获取通话状态为success数据&#xff09;后&#xff0c;写入Kafka中&#xff0c;便于其他实时应用消费处理分析。
模拟产生运营商基站通话日志数据&#xff0c;封装到样例类中&#xff0c;字段信息如下&#xff1a;
package kafka.mock
/**
* 基站通话日志数据&#xff0c;字段如下&#xff1a;
*
* &#64;param stationId 基站标识符ID
* &#64;param callOut 主叫号码
* &#64;param callIn 被叫号码
* &#64;param callStatus 通话状态
* &#64;param callTime 通话时间
* &#64;param duration 通话时长
*/
case class StationLog(
stationId: String, //
callOut: String, //
callIn: String, //
callStatus: String, //
callTime: Long, //
duration: Long //
){
override def toString: String &#61; {
s"$stationId,$callOut,$callIn,$callStatus,$callTime,$duration"
}
}
创建Topic&#xff0c;相关命令如下&#xff1a;
# 启动Zookeeper
/export/server/zookeeper/bin/zkServer.sh start
# 启动Kafka Broker
/export/server/kafka/bin/kafka-server-start.sh -daemon /export/server/kafka/config/server.properties
## &#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61; stationTopic &#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;
# 创建topic
/export/server/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.10.10:2181/kafka200 --replicationfactor 1 --partitions 3 --topic stationTopic
# 模拟生产者
/export/server/kafka/bin/kafka-console-producer.sh --broker-list 192.168.10.10:9092 --topic stationTopic
# 模拟消费者
/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.10.10:9092 --topic stationTopic --from-beginning
## &#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61; etlTopic &#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;
# 创建topic
/export/server/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.10.10:2181/kafka200 --replicationfactor 1 --partitions 3 --topic etlTopic
# 模拟生产者
/export/servers/kafka/bin/kafka-console-producer.sh --broker-list 192.168.10.10:9092 --topic etlTopic
# 模拟消费者
/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.10.10:9092 --topic etlTopic --from-beginning
编写代码&#xff0c;实时产生日志数据&#xff0c;发送Kafka Topic&#xff1a;
package kafka.mock
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import scala.util.Random
/**
* 模拟产生基站日志数据&#xff0c;实时发送Kafka Topic中&#xff0c;数据字段信息&#xff1a;
* 基站标识符ID, 主叫号码, 被叫号码, 通话状态, 通话时间&#xff0c;通话时长
*/
object MockStationLog {
def main(args: Array[String]): Unit &#61; {
// 发送Kafka Topic
val props &#61; new Properties()
props.put("bootstrap.servers", "node01:9092")
props.put("acks", "1")
props.put("retries", "3")
props.put("key.serializer", classOf[StringSerializer].getName)
props.put("value.serializer", classOf[StringSerializer].getName)
val producer &#61; new KafkaProducer[String, String](props)
val random &#61; new Random()
val allStatus &#61;Array(
"fail", "busy", "barring", "success", "success", "success",
"success", "success", "success", "success", "success", "success"
)
while (true){
val callOut: String &#61; "1860000%04d".format(random.nextInt(10000))
val callIn: String &#61; "1890000%04d".format(random.nextInt(10000))
val callStatus: String &#61; allStatus(random.nextInt(allStatus.length))
val callDuration &#61; if("success".equals(callStatus)) (1 &#43; random.nextInt(10)) * 1000L else 0L
// 随机产生一条基站日志数据
val stationLog: StationLog &#61; StationLog(
"station_" &#43; random.nextInt(10), //
callOut, //
callIn, //
callStatus, //
System.currentTimeMillis(), //
callDuration //
)
println(stationLog.toString)
Thread.sleep(100 &#43; random.nextInt(100))
//生产写入Kafka
val record &#61; new ProducerRecord[String, String]("stationTopic", stationLog.toString)
producer.send(record)
}
producer.close() // 关闭连接
}
}
运行程序&#xff0c;基站通话日志数据格式如下&#xff1a;
station_7,18600009710,18900000269,success,1590709965144,4000
station_6,18600003894,18900000028,success,1590709965333,8000
station_7,18600007207,18900001057,busy,1590709965680,0
编写应用实时从Kafka的【stationTopic】消费数据&#xff0c;经过处理分析后&#xff0c;存储至Kafka的【etlTopic】&#xff0c;其中需要设置检查点目录&#xff0c;保证应用一次且仅一次的语义。
package kafka.sink
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
/**
* &#64;ClassName StructKafkaSink
* &#64;Description TODO 使用结构化流生产数据到Kafka
* &#64;Create By Frank
*/
object StructKafkaSink {
def main(args: Array[String]): Unit &#61; {
//todo:1-构建SparkSession
val spark &#61; SparkSession
.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[2]")
.config("spark.sql.shuffle.partitions",2)
.getOrCreate()
//修改日志级别
spark.sparkContext.setLogLevel("WARN")
//导包
import spark.implicits._
import org.apache.spark.sql.functions._
//todo:2-处理数据
//step1:读取数据
val kafkaData &#61; spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "node01:9092,node02:9092,node03:9092")
.option("subscribe", "stationTopic")
.load()
//step2:处理数据
val rsData: Dataset[String] &#61; kafkaData
.selectExpr("CAST(value AS STRING)") //取出value
.as[String] //将Value的值转换为String类型
.filter(line &#61;> line !&#61; null && line.trim.split(",").length &#61;&#61; 6 && line.trim.split(",")(3) &#61;&#61; "success")
//step3:保存结果
val query &#61; rsData
.writeStream
.outputMode(OutputMode.Append()) //注意&#xff1a;ETL没有聚合&#xff0c;所以只能使用Append
.format("kafka")
.option("kafka.bootstrap.servers", "node01:9092,node02:9092,node03:9092")
.option("topic", "etlTopic") //要求&#xff1a;必须要有value这一列
.option("checkpointLocation","datastruct/kafka/chk1")
.start()
//todo:3-启动并持久运行
query.awaitTermination()
query.stop()
}
}
Kafka 特定配置
从Kafka消费数据时&#xff0c;相关配置属性可以通过带有kafka.prefix的DataStreamReader.option进行设置&#xff0c;例如前面设置Kafka Brokers地址属性&#xff1a; stream.option(“kafka.bootstrap.servers”, “host:port”)&#xff0c;更多关于Kafka 生产者Producer Config配置属和消费者Consumer Config配置属性&#xff0c;参考文档&#xff1a;
http://kafka.apache.org/20/documentation.html#producerconfigs
http://kafka.apache.org/20/documentation.html#newconsumerconfigs
注意以下Kafka参数属性可以不设置&#xff0c; 如果设置的话&#xff0c; Kafka source或者sink可能会抛出错误&#xff1a;
group.id&#xff1a; Kafka source将会自动为每次查询创建唯一的分组ID&#xff1b;
auto.offset.reset&#xff1a; 在将source选项startingOffsets设置为指定从哪里开始。 结构化流管理
内部消费的偏移量&#xff0c;而不是依赖Kafka消费者来完成。这将确保在topic/partitons动态订阅时不
会遗漏任何数据。注意&#xff0c; 只有在启动新的流式查询时才会应用startingOffsets&#xff0c;并且恢复操作
始终会从查询停止的位置启动&#xff1b;
key.deserializer/value.deserializer&#xff1a; Keys/Values总是被反序列化为ByteArrayDeserializer
的字节数组&#xff0c;使用DataFrame操作显式反序列化keys/values&#xff1b;
key.serializer/value.serializer&#xff1a; keys/values总是使用ByteArraySerializer或StringSerializer
进行序列化&#xff0c;使用DataFrame操作将keysvalues/显示序列化为字符串或字节数组&#xff1b;
enable.auto.commit&#xff1a; Kafka source不提交任何offset&#xff1b;