热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

基于Dubbo与Zipkin的微服务调用链路监控解决方案

本文提出了一种基于Dubbo与Zipkin的微服务调用链路监控解决方案。通过抽象配置层,支持HTTP和Kafka两种数据上报方式,实现了灵活且高效的调用链路追踪。该方案不仅提升了系统的可维护性和扩展性,还为故障排查提供了强大的支持。

dubbo+zipkin调用链监控

图片描述(最多50字)
收集器抽象

由于zipkin支持http以及kafka两种方式上报数据,所以在配置上需要做下抽象。

AbstractZipkinCollectorConfiguration

主要是针对下面两种收集方式的一些配置上的定义,最核心的是Sender接口的定义,http与kafka是两类完全不同的实现。

public abstract Sender getSender();
其次是协助性的构造函数,主要是配合构建收集器所需要的一些参数。

zipkinUrl
如果是http收集,那么对应的是zipkin api域名,如果是kafka,对应的是kafka集群的地址

topic
仅在收集方式为kafka是有效,http时传空值即可。

public AbstractZipkinCollectorConfiguration(String serviceName,String zipkinUrl,String topic){
this.zipkinUrl=zipkinUrl;
this.serviceName=serviceName;
this.topic=topic;
this.tracing=this.tracing();
}
配置上报方式,这里统一采用异常上传,并且配置上报的超时时间。

protected AsyncReporter spanReporter() {
return AsyncReporter
.builder(getSender())
.closeTimeout(500, TimeUnit.MILLISECONDS)
.build(SpanBytesEncoder.JSON_V2);
}
下面这两方法,是配合应用构建span使用的。

注意那个sampler()方法,默认是什么也不做的意思,我们要想看到数据就需要配置成Sampler.ALWAYS_SAMPLE,这样才能真正将数据上报到zipkin服务器。
protected Tracing tracing() {
this.tracing= Tracing
.newBuilder()
.localServiceName(this.serviceName)
.sampler(Sampler.ALWAYS_SAMPLE)
.spanReporter(spanReporter())
.build();
return this.tracing;
}
protected Tracing getTracing(){
return this.tracing;
}
HttpZipkinCollectorConfiguration

主要是实现getSender方法,可以借用OkHttpSender这个对象来快速构建,api版本采用v2。

public class HttpZipkinCollectorConfiguration extends AbstractZipkinCollectorConfiguration {
public HttpZipkinCollectorConfiguration(String serviceName,String zipkinUrl) {super(serviceName,zipkinUrl,null);
}
@Override
br/>super(serviceName,zipkinUrl,null);
}
@Override
public Sender getSender() {
return OkHttpSender.create(super.getZipkinUrl()+"/api/v2/spans");
}
}
OkHttpSender这个类需要引用这个包


io.zipkin.reporter2
zipkin-sender-okhttp3
${zipkin-reporter2.version}

KafkaZipkinCollectorConfiguration

同样也是实现getSender方法

public class KafkaZipkinCollectorConfiguration extends AbstractZipkinCollectorConfiguration {
public KafkaZipkinCollectorConfiguration(String serviceName,String zipkinUrl,String topic) {br/>super(serviceName,zipkinUrl,topic);
}
@Override
public Sender getSender() {
return KafkaSender
.newBuilder()
.bootstrapServers(super.getZipkinUrl())
.topic(super.getTopic())
.encoding(Encoding.JSON)
.build();
}
}
KafkaSender这个类需要引用这个包:


io.zipkin.reporter2
zipkin-sender-kafka11
${zipkin-reporter2.version}

收集器工厂

由于上面创建了两个收集器配置类,使用时只能是其中之一,所以实际运行的实例需要根据配置来动态生成。ZipkinCollectorConfigurationFactory就是负责生成收集器实例的。

private final AbstractZipkinCollectorConfiguration zipkinCollectorConfiguration;br/>@Autowired
public ZipkinCollectorConfigurationFactory(TraceConfig traceConfig){
if(Objects.equal("kafka", traceConfig.getZipkinSendType())){
zipkinCollectorCOnfiguration=new KafkaZipkinCollectorConfiguration(
traceConfig.getApplicationName(),
traceConfig.getZipkinUrl(),
traceConfig.getZipkinKafkaTopic());
}
else {
zipkinCollectorCOnfiguration= new HttpZipkinCollectorConfiguration(
traceConfig.getApplicationName(),
traceConfig.getZipkinUrl());
}
}
通过构建函数将我们的配置类TraceConfig注入进来,然后根据发送方式来构建实例。另外提供一个辅助函数:

public Tracing getTracing(){
return this.zipkinCollectorConfiguration.getTracing();
}
过滤器

在dubbo的过滤器中实现数据上传的功能逻辑相对简单,一般都在invoke方法执行前记录数据,然后方法执行完成后再次记录数据。这个逻辑不变,有变化的是数据上报的实现,上一个版本是通过发http请求实现需要编码,现在可以直接借用brave所提供的span来帮助我们完成,有两重要的方法:

finish
方法源码如下,在完成的时候会填写上完成的时间并上报数据,这一般应用于同步调用场景。

public void finish(TraceContext context, long finishTimestamp) {
MutableSpan span = this.spanMap.remove(context);
if(span != null && !this.noop.get()) {
synchronized(span) {
span.finish(Long.valueOf(finishTimestamp));
this.reporter.report(span.toSpan());
}
}
}
flush 与上面finish方法的不同点在于,在报数据时没有完成时间,这应该是适用于一些异步调用但不关心结果的场景,比如dubbo所提供的oneway方式调用。
public void flush(TraceContext context) {
MutableSpan span = this.spanMap.remove(context);
if(span != null && !this.noop.get()) {
synchronized(span) {
span.finish((Long)null);
this.reporter.report(span.toSpan());
}
}
}
消费者

做为消费方,有一个核心功能就是将traceId以及spanId传递到服务提供方,这里还是通过dubbo提供的附加参数方式实现。

@Override
public Result invoke(Invoker invoker, Invocation invocation) throws RpcException {
if(!RpcTraceContext.getTraceConfig().isEnabled()){
return invoker.invoke(invocation);
}
ZipkinCollectorConfigurationFactory zipkinCollectorCOnfigurationFactory=
SpringContextUtils.getApplicationContext().getBean(ZipkinCollectorConfigurationFactory.class);
Tracer tracer= zipkinCollectorConfigurationFactory.getTracing().tracer();
if(null==RpcTraceContext.getTraceId()){
RpcTraceContext.start();
RpcTraceContext.setTraceId(IdUtils.get());
RpcTraceContext.setParentId(null);
RpcTraceContext.setSpanId(IdUtils.get());
}
else {
RpcTraceContext.setParentId(RpcTraceContext.getSpanId());
RpcTraceContext.setSpanId(IdUtils.get());
}
TraceContext traceCOntext= TraceContext.newBuilder()
.traceId(RpcTraceContext.getTraceId())
.parentId(RpcTraceContext.getParentId())
.spanId(RpcTraceContext.getSpanId())
.sampled(true)
.build();
Span span=tracer.toSpan(traceContext).start();
invocation.getAttachments().put(RpcTraceContext.TRACE_ID_KEY, String.valueOf(span.context().traceId()));
invocation.getAttachments().put(RpcTraceContext.SPAN_ID_KEY, String.valueOf(span.context().spanId()));
Result result = invoker.invoke(invocation);
span.finish();
return result;
}
提供者

@Override
public Result invoke(Invoker invoker, Invocation invocation) throws RpcException {
if(!RpcTraceContext.getTraceConfig().isEnabled()){
return invoker.invoke(invocation);
}
Map attaches = invocation.getAttachments();
if (!attaches.containsKey(RpcTraceContext.TRACE_ID_KEY)){
return invoker.invoke(invocation);
}
Long traceId = Long.valueOf(attaches.get(RpcTraceContext.TRACE_ID_KEY));
Long spanId = Long.valueOf(attaches.get(RpcTraceContext.SPAN_ID_KEY));
attaches.remove(RpcTraceContext.TRACE_ID_KEY);
attaches.remove(RpcTraceContext.SPAN_ID_KEY);
RpcTraceContext.start();
RpcTraceContext.setTraceId(traceId);
RpcTraceContext.setParentId(spanId);
RpcTraceContext.setSpanId(IdUtils.get());
ZipkinCollectorConfigurationFactory zipkinCollectorCOnfigurationFactory=
SpringContextUtils.getApplicationContext().getBean(ZipkinCollectorConfigurationFactory.class);
Tracer tracer= zipkinCollectorConfigurationFactory.getTracing().tracer();
TraceContext traceCOntext= TraceContext.newBuilder()
.traceId(RpcTraceContext.getTraceId())
.parentId(RpcTraceContext.getParentId())
.spanId(RpcTraceContext.getSpanId())
.sampled(true)
.build();
Span span = tracer.toSpan(traceContext).start();
Result result = invoker.invoke(invocation);
span.finish();
return result;
}
异常流程

上面无论是消费者的过滤器还是服务提供者的过滤器,均未考虑服务在调用invoker.invoke时出错的场景,如果出错,后面的span.finish方法将不会按预期执行,也就记录不了信息。所以需要针对此问题做优化:可以在finally块中执行finish方法。

try {
result = invoker.invoke(invocation);
}
finally {
span.finish();
}
消费者在调用服务时,异步调用问题

上面过滤器中调用span.finish都是基于同步模式,而由于dubbo除了同步调用外还提供了两种调用方式

异步调用 通过callback机制的异步
oneway
只发起请求并不等待结果的异步调用,无callback一说

针对上面两类异步再加上同步调用,我们要想准确记录服务真正的时间,需要在消费方的过滤器中做如下处理:
创建一个用于回调的处理类,它的主要目的是为了在回调成功时记录时间,这里无论是成功还是失败。

private class AsyncSpanCallback implements ResponseCallback{
private Span span;
public AsyncSpanCallback(Span span){this.span=span;
}
@Override
br/>this.span=span;
}
@Override
public void done(Object o) {
br/>span.finish();
}
@Override
public void caught(Throwable throwable) {
span.finish();
}
}
再在调用invoke方法时,如果是oneway方式,则调用flush方法结果,如果是同步则直接调用finish方法,如果是异步则在回调时调用finish方法。

Result result = null;
boolean isOneway= RpcUtils.isOneway(invoker.getUrl(), invocation);
try {
result = invoker.invoke(invocation);
}
finally {
if(isOneway) {
span.flush();
}
else if(!isAsync) {
span.finish();
}
}


推荐阅读
  • 本文探讨了如何在 PHP 的 Eloquent ORM 中实现数据表之间的关联查询,并通过具体示例详细解释了如何将关联数据嵌入到查询结果中。这不仅提高了数据查询的效率,还简化了代码逻辑。 ... [详细]
  • PHP 5.2.5 安装与配置指南
    本文详细介绍了 PHP 5.2.5 的安装和配置步骤,帮助开发者解决常见的环境配置问题,特别是上传图片时遇到的错误。通过本教程,您可以顺利搭建并优化 PHP 运行环境。 ... [详细]
  • 本文探讨了如何优化和正确配置Kafka Streams应用程序以确保准确的状态存储查询。通过调整配置参数和代码逻辑,可以有效解决数据不一致的问题。 ... [详细]
  • 本文详细介绍了Java中org.neo4j.helpers.collection.Iterators.single()方法的功能、使用场景及代码示例,帮助开发者更好地理解和应用该方法。 ... [详细]
  • Explore a common issue encountered when implementing an OAuth 1.0a API, specifically the inability to encode null objects and how to resolve it. ... [详细]
  • 本文详细解析了Python中的os和sys模块,介绍了它们的功能、常用方法及其在实际编程中的应用。 ... [详细]
  • 2023年京东Android面试真题解析与经验分享
    本文由一位拥有6年Android开发经验的工程师撰写,详细解析了京东面试中常见的技术问题。涵盖引用传递、Handler机制、ListView优化、多线程控制及ANR处理等核心知识点。 ... [详细]
  • 使用Python在SAE上开发新浪微博应用的初步探索
    最近重新审视了新浪云平台(SAE)提供的服务,发现其已支持Python开发。本文将详细介绍如何利用Django框架构建一个简单的新浪微博应用,并分享开发过程中的关键步骤。 ... [详细]
  • PHP 5.5.0rc1 发布:深入解析 Zend OPcache
    2013年5月9日,PHP官方发布了PHP 5.5.0rc1和PHP 5.4.15正式版,这两个版本均支持64位环境。本文将详细介绍Zend OPcache的功能及其在Windows环境下的配置与测试。 ... [详细]
  • 本文探讨了如何通过一系列技术手段提升Spring Boot项目的并发处理能力,解决生产环境中因慢请求导致的系统性能下降问题。 ... [详细]
  • ServiceStack与Swagger的无缝集成指南
    本文详细介绍了如何在ServiceStack项目中集成Swagger,以实现API文档的自动生成和在线测试。通过本指南,您将了解从配置到部署的完整流程,并掌握如何优化API接口的开发和维护。 ... [详细]
  • 全面解析运维监控:白盒与黑盒监控及四大黄金指标
    本文深入探讨了白盒和黑盒监控的概念,以及它们在系统监控中的应用。通过详细分析基础监控和业务监控的不同采集方法,结合四个黄金指标的解读,帮助读者更好地理解和实施有效的监控策略。 ... [详细]
  • 本文深入探讨了MySQL中常见的面试问题,包括事务隔离级别、存储引擎选择、索引结构及优化等关键知识点。通过详细解析,帮助读者在面对BAT等大厂面试时更加从容。 ... [详细]
  • EasyMock实战指南
    本文介绍了如何使用EasyMock进行单元测试,特别是当测试对象的合作者依赖于外部资源或尚未实现时。通过具体的示例,展示了EasyMock在模拟对象行为方面的强大功能。 ... [详细]
  • window下kafka的安装以及测试
    目录一、安装JDK(需要安装依赖javaJDK)二、安装Kafka三、测试参考在Windows系统上安装消息队列kafka一、安装JDKÿ ... [详细]
author-avatar
手机用户2602925213
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有