热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Sparkmetrics实现KafkaSink

背景监控是Spark非常重要的一部分。Spark的运行情况是由ListenerBus以及MetricsSystem来完成的。通过Spark的Metrics系统,我们可

背景

监控是Spark非常重要的一部分。Spark的运行情况是由ListenerBus以及MetricsSystem 来完成的。通过Spark的Metrics系统,我们可以把Spark Metrics的收集到的信息发送到各种各样的Sink,比如HTTP、JMX以及CSV文件。
目前支持的Sink包括:

  • ConsoleSink

  • CSVSink

  • JmxSink

  • MetricsServlet

  • GraphiteSink

  • GangliaSink

有时我们需要实时获取metrics数据通过spark分析展示等需求,这个时候若有个KafkaSink将metrics指标数据实时往kafka发送那就太方便了,故有了这篇博文。

实践

所有的Sink都需要继承Sink这个特质:

private[spark] trait Sink { def start(): Unit def stop(): Unit def report(): Unit}

当该Sink注册到metrics系统中时,会调用start方法进行一些初始化操作,再通过report方式进行真正的输出操作,stop方法可以进行一些连接关闭等操作。直接上代码:

package org.apache.spark.metrics.sink
import java.util.concurrent.TimeUnitimport java.util.{Locale, Properties}
import com.codahale.metrics.MetricRegistryimport org.apache.kafka.clients.producer.KafkaProducerimport org.apache.spark.SecurityManagerimport org.apache.spark.internal.Logging
private[spark] class KafkaSink(val property: Properties, val registry: MetricRegistry, securityMgr: SecurityManager) extends Sink with Logging{val KAFKA_KEY_PERIOD = "period" val KAFKA_DEFAULT_PERIOD = 10val KAFKA_KEY_UNIT = "unit" val KAFKA_DEFAULT_UNIT = "SECONDS"val KAFKA_TOPIC = "topic" val KAFKA_DEFAULT_TOPIC = "kafka-sink-topic"val KAFAK_BROKERS = "kafka-brokers" val KAFAK_DEFAULT_BROKERS = "XXX:9092"val TOPIC = Option(property.getProperty(KAFKA_TOPIC)).getOrElse(KAFKA_DEFAULT_TOPIC) val BROKERS = Option(property.getProperty(KAFAK_BROKERS)).getOrElse(throw new IllegalStateException("kafka-brokers is null!"))private val kafkaProducerCOnfig= new Properties() kafkaProducerConfig.put("bootstrap.servers",BROKERS) kafkaProducerConfig.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") kafkaProducerConfig.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")private val producer = new KafkaProducer[String, String](kafkaProducerConfig)private val reporter: KafkaReporter = KafkaReporter.forRegistry(registry) .topic(TOPIC) .build(producer)val pollPeriod = Option(property.getProperty(KAFKA_KEY_PERIOD)) match { case Some(s) => s.toInt case NOne=> KAFKA_DEFAULT_PERIOD }val pollUnit: TimeUnit = Option(property.getProperty(KAFKA_KEY_UNIT)) match { case Some(s) => TimeUnit.valueOf(s.toUpperCase(Locale.ROOT)) case NOne=> TimeUnit.valueOf(KAFKA_DEFAULT_UNIT) }override def start(): Unit = { log.info("I4 Metrics System KafkaSink Start ......") reporter.start(pollPeriod, pollUnit) }override def stop(): Unit = { log.info("I4 Metrics System KafkaSink Stop ......") reporter.stop() producer.close() }override def report(): Unit = { log.info("I4 Metrics System KafkaSink Report ......") reporter.report() }}

KafkaReporter类:

package org.apache.spark.metrics.sink;import com.alibaba.fastjson.JSONObject;
import com.codahale.metrics.*;
import com.twitter.bijection.Injection;
import com.twitter.bijection.avro.GenericAvroCodecs;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.Map;
import java.util.SortedMap;
import java.util.concurrent.TimeUnit;public class KafkaReporter extends ScheduledReporter {private static final Logger LOGGER = LoggerFactory.getLogger(KafkaReporter.class);public static KafkaReporter.Builder forRegistry(MetricRegistry registry) {return new KafkaReporter.Builder(registry);}private KafkaProducer producer;private Clock clock;private String topic;private KafkaReporter(MetricRegistry registry,TimeUnit rateUnit,TimeUnit durationUnit,MetricFilter filter,Clock clock,String topic,KafkaProducer producer) {super(registry, "kafka-reporter", filter, rateUnit, durationUnit);this.producer = producer;this.topic = topic;this.clock = clock;}@Overridepublic void report(SortedMap gauges, SortedMap counters, SortedMap histograms, SortedMap meters, SortedMap timers) {final long timestamp = TimeUnit.MILLISECONDS.toSeconds(clock.getTime());// Gaugefor (Map.Entry entry : gauges.entrySet()) {reportGauge(timestamp,entry.getKey(), entry.getValue());}// Histogram
// for (Map.Entry entry : histograms.entrySet()) {
// reportHistogram(timestamp, entry.getKey(), entry.getValue());
// }}private void reportGauge(long timestamp, String name, Gauge gauge) {report(timestamp, name, gauge.getValue());}private void reportHistogram(long timestamp, String name, Histogram histogram) {final Snapshot snapshot = histogram.getSnapshot();report(timestamp, name, snapshot.getMax());}private void report(long timestamp, String name, Object values) {JSONObject jsOnObject= new JSONObject();jsonObject.put("name",name);jsonObject.put("timestamp",timestamp);jsonObject.put("value",values);producer.send(new ProducerRecord(topic,name, jsonObject.toJSONString()));}public static class Builder {private final MetricRegistry registry;private TimeUnit rateUnit;private TimeUnit durationUnit;private MetricFilter filter;private Clock clock;private String topic;private Builder(MetricRegistry registry) {this.registry = registry;this.rateUnit = TimeUnit.SECONDS;this.duratiOnUnit= TimeUnit.MILLISECONDS;this.filter = MetricFilter.ALL;this.clock = Clock.defaultClock();}/*** Convert rates to the given time unit.** @param rateUnit a unit of time* @return {@code this}*/public KafkaReporter.Builder convertRatesTo(TimeUnit rateUnit) {this.rateUnit = rateUnit;return this;}/*** Convert durations to the given time unit.** @param durationUnit a unit of time* @return {@code this}*/public KafkaReporter.Builder convertDurationsTo(TimeUnit durationUnit) {this.duratiOnUnit= durationUnit;return this;}/*** Use the given {@link Clock} instance for the time.** @param clock a {@link Clock} instance* @return {@code this}*/public Builder withClock(Clock clock) {this.clock = clock;return this;}/*** Only report metrics which match the given filter.** @param filter a {@link MetricFilter}* @return {@code this}*/public KafkaReporter.Builder filter(MetricFilter filter) {this.filter = filter;return this;}/*** Only report metrics which match the given filter.** @param topic a* @return {@code this}*/public KafkaReporter.Builder topic(String topic) {this.topic = topic;return this;}/*** Builds a {@link KafkaReporter} with the given properties, writing {@code .csv} files to the* given directory.** @return a {@link KafkaReporter}*/public KafkaReporter build(KafkaProducer producer) {return new KafkaReporter(registry,rateUnit,durationUnit,filter,clock,topic,producer);}}
}

其中的report方法就是获取各种类型指标,并进行对应的输出操作的时机。

如何使用

可在配置文件或者程序中设定需要注册的sink,并带上对应的参数即可:

spark.metrics.conf.*.sink.kafka.class=org.apache.spark.metrics.sink.KafkaSinkspark.metrics.conf.*.sink.kafka.kafka-brokers=XXX:9092

原文:https://www.jianshu.com/p/cee005368b61



推荐阅读
  • 本文详细介绍了在 CentOS 7 系统中配置 fstab 文件以实现开机自动挂载 NFS 共享目录的方法,并解决了常见的配置失败问题。 ... [详细]
  • 本文介绍了在 Java 编程中遇到的一个常见错误:对象无法转换为 long 类型,并提供了详细的解决方案。 ... [详细]
  • 1.创建目录mkdir-phomerocketmqnamesvr1data&&mkdir-phomerocketmqnamesvr1log&&mkdir-phomerocketm ... [详细]
  • 技术分享:使用 Flask、AngularJS 和 Jinja2 构建高效前后端交互系统
    技术分享:使用 Flask、AngularJS 和 Jinja2 构建高效前后端交互系统 ... [详细]
  • 在对WordPress Duplicator插件0.4.4版本的安全评估中,发现其存在跨站脚本(XSS)攻击漏洞。此漏洞可能被利用进行恶意操作,建议用户及时更新至最新版本以确保系统安全。测试方法仅限于安全研究和教学目的,使用时需自行承担风险。漏洞编号:HTB23162。 ... [详细]
  • Python 伦理黑客技术:深入探讨后门攻击(第三部分)
    在《Python 伦理黑客技术:深入探讨后门攻击(第三部分)》中,作者详细分析了后门攻击中的Socket问题。由于TCP协议基于流,难以确定消息批次的结束点,这给后门攻击的实现带来了挑战。为了解决这一问题,文章提出了一系列有效的技术方案,包括使用特定的分隔符和长度前缀,以确保数据包的准确传输和解析。这些方法不仅提高了攻击的隐蔽性和可靠性,还为安全研究人员提供了宝贵的参考。 ... [详细]
  • 在Java Web服务开发中,Apache CXF 和 Axis2 是两个广泛使用的框架。CXF 由于其与 Spring 框架的无缝集成能力,以及更简便的部署方式,成为了许多开发者的首选。本文将详细介绍如何使用 CXF 框架进行 Web 服务的开发,包括环境搭建、服务发布和客户端调用等关键步骤,为开发者提供一个全面的实践指南。 ... [详细]
  • 本文详细探讨了使用纯JavaScript开发经典贪吃蛇游戏的技术细节和实现方法。通过具体的代码示例,深入解析了游戏逻辑、动画效果及用户交互的实现过程,为开发者提供了宝贵的参考和实践经验。 ... [详细]
  • 本文介绍了 Java 中 io.netty.channel.kqueue.KQueueStaticallyReferencedJniMethods.evfiltSock() 方法的使用及其代码示例,帮助开发者更好地理解和应用该方法。 ... [详细]
  • Hadoop的文件操作位于包org.apache.hadoop.fs里面,能够进行新建、删除、修改等操作。比较重要的几个类:(1)Configurati ... [详细]
  • Spring – Bean Life Cycle
    Spring – Bean Life Cycle ... [详细]
  • Spring Boot 中配置全局文件上传路径并实现文件上传功能
    本文介绍如何在 Spring Boot 项目中配置全局文件上传路径,并通过读取配置项实现文件上传功能。通过这种方式,可以更好地管理和维护文件路径。 ... [详细]
  • javascript分页类支持页码格式
    前端时间因为项目需要,要对一个产品下所有的附属图片进行分页显示,没考虑ajax一张张请求,所以干脆一次性全部把图片out,然 ... [详细]
  • Hyperledger Fabric 1.4 节点 SDK 快速入门指南
    本文将详细介绍如何利用 Hyperledger Fabric 1.4 的 Node.js SDK 开发应用程序。通过最新版本的 Fabric Node.js SDK,开发者可以更高效地构建和部署基于区块链的应用,实现数据的安全共享和交易处理。文章将涵盖环境配置、SDK 安装、示例代码以及常见问题的解决方法,帮助读者快速上手并掌握核心功能。 ... [详细]
  • 在本地环境中部署了两个不同版本的 Flink 集群,分别为 1.9.1 和 1.9.2。近期在尝试启动 1.9.1 版本的 Flink 任务时,遇到了 TaskExecutor 启动失败的问题。尽管 TaskManager 日志显示正常,但任务仍无法成功启动。经过详细分析,发现该问题是由 Kafka 版本不兼容引起的。通过调整 Kafka 客户端配置并升级相关依赖,最终成功解决了这一故障。 ... [详细]
author-avatar
柠檬泪滴味道186
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有