热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

sparkstreaming集成kafka,使用window时出现错误

当使用sparkstreaming2.0.0集成kafka0.10.0时出现KafkaConsumer多线程争用的问题。部分代码如下:

当使用 spark streaming 2.0.0 集成 kafka 0.10.0时出现 KafkaConsumer 多线程争用的问题。
部分代码如下:



1
2
3
4
5
6
7
8
9
10
  val ssc = new StreamingContext(sc, Seconds(5))

  val stream = KafkaUtils.createDirectStream(ssc,

    PreferConsistent,

    Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets))

   

  val data = stream.map(_.value())

  val word = data.map(word => (word,1))

  val result = data.reduceByKeyAndWindow({ (x, y) => x + y }, { (x, y) => x - y }, Minutes(2),Minutes(1))

  word.print()

  result.print()

Exception:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 34.0 failed 1 times, most recent failure: Lost task 1.0 in stage 34.0 (TID 126, localhost): java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access

    at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1430)

    at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1131)

    at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95)

    at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69)

    at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)

    at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)

    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)

    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)

    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)

    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)

    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)

    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)

    at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192)

    at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)

    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)

    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)

    at org.apache.spark.scheduler.Task.run(Task.scala:85)

    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)

    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

    at java.lang.Thread.run(Thread.java:745)



Driver stacktrace:

    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450)

    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438)

    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437)

    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)

    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437)

    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)

    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)

    at scala.Option.foreach(Option.scala:257)

    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)

    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659)

    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)

    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607)

    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)

    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871)

    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884)

    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897)

    at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1305)

    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)

    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)

    at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)

    at org.apache.spark.rdd.RDD.take(RDD.scala:1279)

    at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:734)

    at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:733)

    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)

    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)

    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)

    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)

    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)

    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)

    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)

    at scala.util.Try$.apply(Try.scala:192)

    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)

    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:245)

    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:245)

    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:245)

    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)

    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:244)

    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

    at java.lang.Thread.run(Thread.java:745)

Caused by: java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access

    at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1430)

    at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1131)

    at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95)

    at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69)

    at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)

    at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)

    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)

    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)

    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)

    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)

    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)

    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)

    at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192)

    at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)

    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)

    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)

    at org.apache.spark.scheduler.Task.run(Task.scala:85)

    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)

    ... 3 more

只有当 slideDuration 比 batchDuration大很多时才会发生。比如:
当 batchDuration 为 5S,slideDuration 为 60S 时会发生错误
当 batchDuration 为 5s, slideDuration 为 10S 时不会发生错误





   



推荐阅读
  • 讨伐Java多线程与高并发——MQ篇
    本文是学习Java多线程与高并发知识时做的笔记。这部分内容比较多,按照内容分为5个部分:多线程基础篇JUC篇同步容器和并发容器篇线程池篇MQ篇本篇 ... [详细]
  • 本文介绍了在Mac上搭建php环境后无法使用localhost连接mysql的问题,并通过将localhost替换为127.0.0.1或本机IP解决了该问题。文章解释了localhost和127.0.0.1的区别,指出了使用socket方式连接导致连接失败的原因。此外,还提供了相关链接供读者深入了解。 ... [详细]
  • [大整数乘法] java代码实现
    本文介绍了使用java代码实现大整数乘法的过程,同时也涉及到大整数加法和大整数减法的计算方法。通过分治算法来提高计算效率,并对算法的时间复杂度进行了研究。详细代码实现请参考文章链接。 ... [详细]
  • 本文介绍了操作系统的定义和功能,包括操作系统的本质、用户界面以及系统调用的分类。同时还介绍了进程和线程的区别,包括进程和线程的定义和作用。 ... [详细]
  • 使用eclipse创建一个Java项目的步骤
    本文介绍了使用eclipse创建一个Java项目的步骤,包括启动eclipse、选择New Project命令、在对话框中输入项目名称等。同时还介绍了Java Settings对话框中的一些选项,以及如何修改Java程序的输出目录。 ... [详细]
  • HashMap的相关问题及其底层数据结构和操作流程
    本文介绍了关于HashMap的相关问题,包括其底层数据结构、JDK1.7和JDK1.8的差异、红黑树的使用、扩容和树化的条件、退化为链表的情况、索引的计算方法、hashcode和hash()方法的作用、数组容量的选择、Put方法的流程以及并发问题下的操作。文章还提到了扩容死链和数据错乱的问题,并探讨了key的设计要求。对于对Java面试中的HashMap问题感兴趣的读者,本文将为您提供一些有用的技术和经验。 ... [详细]
  • 深入理解Java虚拟机的并发编程与性能优化
    本文主要介绍了Java内存模型与线程的相关概念,探讨了并发编程在服务端应用中的重要性。同时,介绍了Java语言和虚拟机提供的工具,帮助开发人员处理并发方面的问题,提高程序的并发能力和性能优化。文章指出,充分利用计算机处理器的能力和协调线程之间的并发操作是提高服务端程序性能的关键。 ... [详细]
  • springboot启动不了_Spring Boot + MyBatis 多模块搭建教程
    作者:枫本非凡来源:www.cnblogs.comorzlinp9717399.html一、前言1、创建父工程最近公司项目准备开始重构,框 ... [详细]
  • ejava,刘聪dejava
    本文目录一览:1、什么是Java?2、java ... [详细]
  • Java编程思想一书中第21章并发中关于线程间协作的一节中有个关于汽车打蜡与抛光的小例子(原书的704页)。这个例子主要展示的是两个线程如何通过wait ... [详细]
  • java线程池的实现原理源码分析
    这篇文章主要介绍“java线程池的实现原理源码分析”,在日常操作中,相信很多人在java线程池的实现原理源码分析问题上存在疑惑,小编查阅了各式资 ... [详细]
  • 开发笔记:python协程的理解
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了python协程的理解相关的知识,希望对你有一定的参考价值。一、介绍什么是并发?并发的本质就是 ... [详细]
  • ConsumerConfiguration在kafka0.9使用JavaConsumer替代了老版本的scalaConsumer。新版的配置如下:bootstrap. ... [详细]
  • druid接入kafka indexing service整个流程
    先介绍下我们的druid集群配置Overload1台Coordinator1台Middlemanager3台Broker3台Historical一共12台,其中cold6台,hot ... [详细]
  • 你知道Kafka和Redis的各自优缺点吗?一文带你优化选择,不走弯路 ... [详细]
author-avatar
百变睛灵_345
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有