作者:奇力0_843 | 来源:互联网 | 2023-08-29 23:41
概要本文将涉及StructuredStreaming的两种输入形式:socket、kafka,以及两种输出形式:memory、kafka。在低版本的Spark中要写入kafka需要
概要
- 本文将涉及StructuredStreaming的两种输入形式:
socket、kafka
,以及两种输出形式:memory
、kafka
。 - 在低版本的Spark中要写入
kafka
需要构建ForeachWriter
,高版本(2.3+)可以直接使用xxx.writeStream.format("kafka")
。
实例
- SparkStreaming 与 Socket
Spark Streaming基础学习【一】WordCount blog.csdn.net
2. StructuredStreaming 与 Socket
首先打开socket nc -lk 6009
(可能需要安装 yum install nc
)
并用telnet xx.xx.xx.xx 6009
监听这个端口,确认功能正常。
Spark-shell里如下:
// 加载数据 val loadDF = {
spark.readStream
.format("socket") // 备选参数有 socket, kafka, file .option("host","xx.xx.xx.xx") // 发信端的ip .option("port","6009") // 发信端的port .load() // 加载 }
// 指定输出到内存(即存在一张表里) loadDF.writeStream
.outputMode("append") // 备选参数有 update, complete 详细解释参见下方 .queryName("tmpTable") // query的名字 .format("memory") // 写到内存的一张表里,用query的名字作为表名 .start() // 开启 .awaitTermination() // 保持活跃直到socket端停止 // socket发信端传过数据后,这边显示一下 spark.sql("select * from tmpTable").show
结果如下:
3. StructuredStreaming 与 Kafka
首先创建kafka测试topic并“生产”消息。
kafka-console-producer --broker-list nodename:9092 --topic test.topic --property "parse.key=true" --property "key.separator=:"
然后spark侧“订阅”该topic
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "nodename:9092")
.option("subscribe", "test.topic")
.option("startingOffsets", "latest")
.option("minPartitions", "10")
.option("failOnDataLoss", "true")
.load()
.selectExpr("CAST(key as STRING)","CAST(value as STRING)")
.as[(String,String)]
踩坑提示:
- 开启kafka的时候提供的是服务器的“名字”如
xxxdatanode
,而不是提供ip地址 export SPARK_KAFKA_VERSION=0.10
- spark-shell(或spark-submit)添加一个参数
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0
关于Kafka的简介
- “topic”可以理解为数据库的一张表。
- “producer”生产数据到指定的”TopicA”;
- “consumer”从”TopicA”消费数据(又分 [仅解析最新] 或 [从头开始解析] 两种模式)。
- “zookeeper”起到监听、挂起、重启Kafka任务的作用。
- 参考过的资料:
kafka入门:简介、使用场景、设计原理、主要配置及集群搭建(转) – 李克华 – 博客园 www.cnblogs.com
为什么Kafka那么快 – z69183787的专栏 – CSDN博客 blog.csdn.net
震惊了!原来这才是kafka! www.jianshu.com
SparkStreaming与StructuredStreaming的对比:
- StructuredStreaming支持 实时流 以及 一个实例支持多个流。
- 更好的元数据管理。
- 更友好的API,很多基于DataFrame的封装基本可以直接使用。
- 从个人使用体验来看,StructuredStreaming写起来非常得“流畅”、“顺手”。
- 参考过的资料:
是时候丢掉Spark Streaming 升级到Structured Streaming了 www.jianshu.com
Quora上相关的讨论 www.quora.com
报错及解决: