热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

SparkStreaming+Kafka+ES使用笔记

emm非专业开发,一点笔记Kafka当Kafka内容特别大时会报错,此时设置下fetch.message.max.bytes为一个比较大的值就好。

emm

非专业开发,一点笔记

Kafka


  1. 当Kafka内容特别大时会报错,此时设置下fetch.message.max.bytes为一个比较大的值就好。

val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers,"fetch.message.max.bytes" -> "10485760" )

  1. 关于partitions
    KafkaDirectStream提供的分片数与Kafka的分片数相同
    如果运算量远远超过了重新分片的消耗,可以用repartition,否则还是增加同时运行的Jobs数量。

Spark Streaming


  1. 增加同时运行的任务数量

SparkStreaming默认只启动一个Job,所以使用核心再多如果任务数量不够的话核心也不能充分利用。

为了提高任务个数需要使用设置spark.streaming.concurrentJobs参数:

spark-submit --conf spark.streaming.concurrentJobs=8 ....

实际上一个Job会分成多个Tasks,每个CPU核心执行一个Task,Task执行完成则Core被释放,也就是说8个partitions的Streaming,使用32个核心并不是只能执行4个Jobs,可以根据Spark WebUI的executor页面核心的使用量,适当的增大concurrentJobs或减少核心使用量。
像我这就是利用率比较低,并且Streaming任务一直跟得上,就可以适当降低Cores的数量。
在这里插入图片描述
2. GC优化
使用CMS内存收集器:

spark-submit --conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC"

自从使用了这个收集器,GC时间下来了,内存也不容易超限了,一口气上五楼都不费劲了~具体原理不清楚,回头补。

  1. 有关cache
    数据有复用的位置一定要记得cache,否则会从头开始执行处理流程。
    被cache的类型需要能够进行序列化。

  2. 有关序列化与反序列化
    Driver会将Task的内容打包序列化发给Executor,所以需要Task中所有被引用的类型都可以序列化。
    如果类型不可序列化则会报object not serializable的错误,此时需要自己实现序列化与反序列化方法,一般只需要实现反序列化方法(readObject)
    scala需要加上serializable注解,java实现serializable接口。

private def readObject(in: ObjectInputStream):Unit = {//调用默认的ReadObject函数in.defaultReadObject()//重新初始化一些无法被序列化的内容this.init(this.config_map)}

对于无法序列化的属性(Mysql连接、Redis连接等等等)需要在属性前加上transient修饰符,表示在序列化时忽略,然后在readObject中再进行构造。

scala还有一个lazy修饰符,表明使用时再进行构建,所以也可以使用lazy+transient修饰符,让其在使用时重新进行构建。

@transient lazy val logger:Logger = LogManager.getLogger(this.getClass.getName)

如下例子进行参考,一个包装的Kafka的类:

class KafkaSink[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {/* This is the key idea that allows us to work around running intoNotSerializableExceptions. */lazy val producer = createProducer()def send(topic: String, key: K, value: V): Future[RecordMetadata] =producer.send(new ProducerRecord[K, V](topic, key, value))def send(topic: String, value: V): Future[RecordMetadata] =producer.send(new ProducerRecord[K, V](topic, value))
}object KafkaSink {import scala.collection.JavaConversions._def apply[K, V](config: Map[String, Object]): KafkaSink[K, V] = {val createProducerFunc = () => {val producer = new KafkaProducer[K, V](config)sys.addShutdownHook {// Ensure that, on executor JVM shutdown, the Kafka producer sends// any buffered messages to Kafka before shutting down.producer.close()}producer}new KafkaSink(createProducerFunc)}def apply[K, V](config: java.util.Properties): KafkaSink[K, V] = apply(config.toMap)
}

5.广播变量
Task过程中使用的变量每次都会序列化传输一次,如果想验证可以使用上面的方法重写readObject打印一些调试信息进行记录。

而一些长时间不变并且比较大、复杂的内容可以使用广播变量进行保存,保证每个executor只存在一份该变量。

比如 Redis连接、MySQL连接、规则配置什么的就可以使用广播变量。

广播变量包裹的类同样需要能够序列化。

广播变量为只读变量。

详细广播变量的使用可以看如下文章:
https://www.jianshu.com/p/3bd18acd2f7f

  1. 利用广播变量进行配置更新:

详细可以看这篇文章:
https://www.cnblogs.com/liuliliuli2017/p/6782687.html
某个大佬写的包装类:

// This wrapper lets us update brodcast variables within DStreams' foreachRDD
// without running into serialization issues
case class BroadcastWrapper[T: ClassTag](@transient private val ssc: StreamingContext,@transient private val _v: T) {@transient private var v = ssc.sparkContext.broadcast(_v)def update(newValue: T, blocking: Boolean = false): Unit = {// 删除RDD是否需要锁定v.unpersist(blocking)v = ssc.sparkContext.broadcast(newValue)}def value: T = v.valueprivate def writeObject(out: ObjectOutputStream): Unit = {out.writeObject(v)}private def readObject(in: ObjectInputStream): Unit = {v = in.readObject().asInstanceOf[Broadcast[T]]}
}

ElasticSearch


  1. index名字只能小写,赤裸裸的教训啊= =
  2. 如果只使用单个ES账号,可以使用全局配置的es.nodes等参数,如果使用多个ES源的话可以写入多个配置MAP:

var esin_setting = Map[String,String]("es.nodes"->"es1","es.port"->"7001")var esout_setting = Map[String,String]("es.nodes"->"es2","es.port"->"7001","es.scroll.size"->"5000")
val rdd = sc.esRDD("indexin", query,esin_setting)
rdd.saveToEs("esout/type1",esout_setting)

3.更多复杂的配置项,可以参考ElasticSearch的官网配置文档:
https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html


推荐阅读
  • 在过去,我曾使用过自建MySQL服务器中的MyISAM和InnoDB存储引擎(也曾尝试过Memory引擎)。今年初,我开始转向阿里云的关系型数据库服务,并深入研究了其高效的压缩存储引擎TokuDB。TokuDB在数据压缩和处理大规模数据集方面表现出色,显著提升了存储效率和查询性能。通过实际应用,我发现TokuDB不仅能够有效减少存储成本,还能显著提高数据处理速度,特别适用于高并发和大数据量的场景。 ... [详细]
  • 本文深入解析了JDK 8中HashMap的源代码,重点探讨了put方法的工作机制及其内部参数的设定原理。HashMap允许键和值为null,但键为null的情况只能出现一次,因为null键在内部通过索引0进行存储。文章详细分析了capacity(容量)、size(大小)、loadFactor(加载因子)以及红黑树转换阈值的设定原则,帮助读者更好地理解HashMap的高效实现和性能优化策略。 ... [详细]
  • 本指南从零开始介绍Scala编程语言的基础知识,重点讲解了Scala解释器REPL(读取-求值-打印-循环)的使用方法。REPL是Scala开发中的重要工具,能够帮助初学者快速理解和实践Scala的基本语法和特性。通过详细的示例和练习,读者将能够熟练掌握Scala的基础概念和编程技巧。 ... [详细]
  • Android中将独立SO库封装进JAR包并实现SO库的加载与调用
    在Android开发中,将独立的SO库封装进JAR包并实现其加载与调用是一个常见的需求。本文详细介绍了如何将SO库嵌入到JAR包中,并确保在外部应用调用该JAR包时能够正确加载和使用这些SO库。通过这种方式,开发者可以更方便地管理和分发包含原生代码的库文件,提高开发效率和代码复用性。文章还探讨了常见的问题及其解决方案,帮助开发者避免在实际应用中遇到的坑。 ... [详细]
  • 本指南介绍了如何在ASP.NET Web应用程序中利用C#和JavaScript实现基于指纹识别的登录系统。通过集成指纹识别技术,用户无需输入传统的登录ID即可完成身份验证,从而提升用户体验和安全性。我们将详细探讨如何配置和部署这一功能,确保系统的稳定性和可靠性。 ... [详细]
  • 线程能否先以安全方式获取对象,再进行非安全发布? ... [详细]
  • 本文介绍了如何利用 Delphi 中的 IdTCPServer 和 IdTCPClient 控件实现高效的文件传输。这些控件在默认情况下采用阻塞模式,并且服务器端已经集成了多线程处理,能够支持任意大小的文件传输,无需担心数据包大小的限制。与传统的 ClientSocket 相比,Indy 控件提供了更为简洁和可靠的解决方案,特别适用于开发高性能的网络文件传输应用程序。 ... [详细]
  • 本文介绍了如何利用ObjectMapper实现JSON与JavaBean之间的高效转换。ObjectMapper是Jackson库的核心组件,能够便捷地将Java对象序列化为JSON格式,并支持从JSON、XML以及文件等多种数据源反序列化为Java对象。此外,还探讨了在实际应用中如何优化转换性能,以提升系统整体效率。 ... [详细]
  • 开发日志:201521044091 《Java编程基础》第11周学习心得与总结
    开发日志:201521044091 《Java编程基础》第11周学习心得与总结 ... [详细]
  • Java中不同类型的常量池(字符串常量池、Class常量池和运行时常量池)的对比与关联分析
    在研究Java虚拟机的过程中,笔者发现存在多种类型的常量池,包括字符串常量池、Class常量池和运行时常量池。通过查阅CSDN、博客园等相关资料,对这些常量池的特性、用途及其相互关系进行了详细探讨。本文将深入分析这三种常量池的差异与联系,帮助读者更好地理解Java虚拟机的内部机制。 ... [详细]
  • 本文探讨了如何通过编程手段在Linux系统中禁用硬件预取功能。基于Intel® Core™微架构的应用性能优化需求,文章详细介绍了相关配置方法和代码实现,旨在帮助开发人员有效控制硬件预取行为,提升应用程序的运行效率。 ... [详细]
  • 本文探讨了使用JavaScript在不同页面间传递参数的技术方法。具体而言,从a.html页面跳转至b.html时,如何携带参数并使b.html替代当前页面显示,而非新开窗口。文中详细介绍了实现这一功能的代码及注释,帮助开发者更好地理解和应用该技术。 ... [详细]
  • 本文深入探讨了HTTP头部中的Expires与Cache-Control字段及其缓存机制。Cache-Control字段主要用于控制HTTP缓存行为,其在HTTP/1.1中得到了广泛应用,而HTTP/1.0中主要使用Pragma:no-cache来实现类似功能。Expires字段则定义了资源的过期时间,帮助浏览器决定是否从缓存中读取资源。文章详细解析了这两个字段的具体用法、相互关系以及在不同场景下的应用效果,为开发者提供了全面的缓存管理指南。 ... [详细]
  • 当遇到“Warning: Default Activity not found”这一警告时,通常是由于上次强制退出导致 Android Studio 的配置文件损坏所致。解决方法如下:1. 关闭 Android Studio;2. 导航至 Android Studio 的安装目录,找到并删除 `.idea` 和 `build` 文件夹;3. 重新启动 Android Studio 并同步项目。这些步骤有助于恢复项目的正常配置,确保应用能够顺利运行。 ... [详细]
  • MySQL索引详解及其优化策略
    本文详细解析了MySQL索引的概念、数据结构及管理方法,并探讨了如何正确使用索引以提升查询性能。文章还深入讲解了联合索引与覆盖索引的应用场景,以及它们在优化数据库性能中的重要作用。此外,通过实例分析,进一步阐述了索引在高读写比系统中的必要性和优势。 ... [详细]
author-avatar
雅枝建彰3
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有