热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Spark流式处理与Kafka

概要本文将涉及StructuredStreaming的两种输入形式:socket、kafka,以及两种输出形式:memory、kafka。在低版本的Spark中要写入kafka需要

概要

  • 本文将涉及StructuredStreaming的两种输入形式:socket、kafka,以及两种输出形式:memorykafka
  • 在低版本的Spark中要写入kafka需要构建ForeachWriter,高版本(2.3+)可以直接使用xxx.writeStream.format("kafka")

实例

  1. SparkStreaming 与 Socket

Spark Streaming基础学习【一】WordCountblog.csdn.net《Spark 流式处理与Kafka》

2. StructuredStreaming 与 Socket

首先打开socket nc -lk 6009 (可能需要安装 yum install nc )

并用telnet xx.xx.xx.xx 6009 监听这个端口,确认功能正常。

《Spark 流式处理与Kafka》
《Spark 流式处理与Kafka》

Spark-shell里如下:

// 加载数据 val loadDF = {
spark.readStream
.format("socket") // 备选参数有 socket, kafka, file .option("host","xx.xx.xx.xx") // 发信端的ip .option("port","6009") // 发信端的port .load() // 加载 }
// 指定输出到内存(即存在一张表里) loadDF.writeStream
.outputMode("append") // 备选参数有 update, complete 详细解释参见下方 .queryName("tmpTable") // query的名字 .format("memory") // 写到内存的一张表里,用query的名字作为表名 .start() // 开启 .awaitTermination() // 保持活跃直到socket端停止 // socket发信端传过数据后,这边显示一下 spark.sql("select * from tmpTable").show

结果如下:

《Spark 流式处理与Kafka》
《Spark 流式处理与Kafka》

3. StructuredStreaming 与 Kafka

首先创建kafka测试topic并“生产”消息。

kafka-console-producer --broker-list nodename:9092 --topic test.topic --property "parse.key=true" --property "key.separator=:"

然后spark侧“订阅”该topic

spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "nodename:9092")
.option("subscribe", "test.topic")
.option("startingOffsets", "latest")
.option("minPartitions", "10")
.option("failOnDataLoss", "true")
.load()
.selectExpr("CAST(key as STRING)","CAST(value as STRING)")
.as[(String,String)]

踩坑提示

  1. 开启kafka的时候提供的是服务器的“名字”如xxxdatanode,而不是提供ip地址
  2. export SPARK_KAFKA_VERSION=0.10
  3. spark-shell(或spark-submit)添加一个参数

--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0

关于Kafka的简介

    • “topic”可以理解为数据库的一张表。
    • “producer”生产数据到指定的”TopicA”;
    • “consumer”从”TopicA”消费数据(又分 [仅解析最新] 或 [从头开始解析] 两种模式)。
    • “zookeeper”起到监听、挂起、重启Kafka任务的作用。
    • 参考过的资料:
      • 第一个有讲到zookeeper的作用

kafka入门:简介、使用场景、设计原理、主要配置及集群搭建(转) – 李克华 – 博客园www.cnblogs.com《Spark 流式处理与Kafka》
为什么Kafka那么快 – z69183787的专栏 – CSDN博客blog.csdn.net《Spark 流式处理与Kafka》
震惊了!原来这才是kafka!www.jianshu.com《Spark 流式处理与Kafka》

SparkStreaming与StructuredStreaming的对比:

    • StructuredStreaming支持 实时流 以及 一个实例支持多个流
    • 更好的元数据管理。
    • 更友好的API,很多基于DataFrame的封装基本可以直接使用。
    • 从个人使用体验来看,StructuredStreaming写起来非常得“流畅”、“顺手”。
    • 参考过的资料:

是时候丢掉Spark Streaming 升级到Structured Streaming了www.jianshu.com
Quora上相关的讨论www.quora.com

报错及解决:


推荐阅读
  • 本文介绍了在实现了System.Collections.Generic.IDictionary接口的泛型字典类中如何使用foreach循环来枚举字典中的键值对。同时还讨论了非泛型字典类和泛型字典类在foreach循环中使用的不同类型,以及使用KeyValuePair类型在foreach循环中枚举泛型字典类的优势。阅读本文可以帮助您更好地理解泛型字典类的使用和性能优化。 ... [详细]
  • ubuntu用sqoop将数据从hive导入mysql时,命令: ... [详细]
  • 本文介绍了使用cacti监控mssql 2005运行资源情况的操作步骤,包括安装必要的工具和驱动,测试mssql的连接,配置监控脚本等。通过php连接mssql来获取SQL 2005性能计算器的值,实现对mssql的监控。详细的操作步骤和代码请参考附件。 ... [详细]
  • 解决.net项目中未注册“microsoft.ACE.oledb.12.0”提供程序的方法
    在开发.net项目中,通过microsoft.ACE.oledb读取excel文件信息时,报错“未在本地计算机上注册“microsoft.ACE.oledb.12.0”提供程序”。本文提供了解决这个问题的方法,包括错误描述和代码示例。通过注册提供程序和修改连接字符串,可以成功读取excel文件信息。 ... [详细]
  • POCOCLibraies属于功能广泛、轻量级别的开源框架库,它拥有媲美Boost库的功能以及较小的体积广泛应用在物联网平台、工业自动化等领域。POCOCLibrai ... [详细]
  • 渗透测试基础bypass绕过阻挡我们的WAF(下)
    渗透测试基础-bypass ... [详细]
  • Metasploit攻击渗透实践
    本文介绍了Metasploit攻击渗透实践的内容和要求,包括主动攻击、针对浏览器和客户端的攻击,以及成功应用辅助模块的实践过程。其中涉及使用Hydra在不知道密码的情况下攻击metsploit2靶机获取密码,以及攻击浏览器中的tomcat服务的具体步骤。同时还讲解了爆破密码的方法和设置攻击目标主机的相关参数。 ... [详细]
  • Spring特性实现接口多类的动态调用详解
    本文详细介绍了如何使用Spring特性实现接口多类的动态调用。通过对Spring IoC容器的基础类BeanFactory和ApplicationContext的介绍,以及getBeansOfType方法的应用,解决了在实际工作中遇到的接口及多个实现类的问题。同时,文章还提到了SPI使用的不便之处,并介绍了借助ApplicationContext实现需求的方法。阅读本文,你将了解到Spring特性的实现原理和实际应用方式。 ... [详细]
  • 前景:当UI一个查询条件为多项选择,或录入多个条件的时候,比如查询所有名称里面包含以下动态条件,需要模糊查询里面每一项时比如是这样一个数组条件:newstring[]{兴业银行, ... [详细]
  • phpcomposer 那个中文镜像是不是凉了 ... [详细]
  • 本文详细介绍了git常用命令及其操作方法,包括查看、添加、提交、删除、找回等操作,以及如何重置修改文件、抛弃工作区修改、将工作文件提交到本地暂存区、从版本库中删除文件等。同时还介绍了如何从暂存区恢复到工作文件、恢复最近一次提交过的状态,以及如何合并多个操作等。 ... [详细]
  • java实现rstp格式转换使用ffmpeg实现linux命令第一步安装node.js和ffmpeg第二步搭建node.js启动websocket接收服务
    java实现rstp格式转换使用ffmpeg实现linux命令第一步安装node.js和ffmpeg第二步搭建node.js启动websocket接收服务第三步java实现 ... [详细]
  • Ihaveaforminadirectivetemplate:我在指令模板中有一个表单:<formn ... [详细]
  • 浅谈Java8 的foreach跳出循环break/return_java
    这篇文章主要介绍了Java8的foreach跳出循环breakreturn,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完 ... [详细]
  • 作为一个扩展,你需要了解DotNetZip用法,请参见:C#.NET使用第三方类库DotNetZip解压压缩Zip文件你也需要了解单文件内嵌入资源文件基本方法,参见:WPF调用嵌入 ... [详细]
author-avatar
奇力0_843
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有