热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

etcd源码系列之gRPCproxy解读(五)

grpcproxy是一个基于L7层的无状态的gRPC的etcd反向代理服务。这个L7指的是OSI模型中的第七层,会话层。它除了提供etcdclient的基本功能之外,同样提供且优化了以下功能:

grpc proxy是一个基于L7层的无状态的gRPC的etcd反向代理服务。这个L7指的是OSI模型中的第七层,会话层。它除了提供etcd client的基本功能之外,同样提供且优化了以下功能:

  • Watch API
    grpc Pproxy提供监听机制,客户端可以监听某个key或者某些key的变更(v2和v3的机制不同,参看后面文章)。用于监听和推送变更。
    它可以将多个客户端(c-watchers)对同一个key的监控合并到一个链接(s-watcher)到 etcd server的请求。同时它会广播从s-watcher收到的时间到所有的c-watchers。

    如上图所示, 3个client对同一个key A的watcher,注册到gRPC proxy中,gRPC proxy会合并生成一个s-watcher 注册到etcd server。

  • lease API
    lease API支持续约机制,客户端通过定时刷新(heartbean)来实现续约(v2和v3的实现机制也不一样)。用于集群监控以及服务注册发现。
    跟上图类似,为了减少etcd server的交互次数,gRPC proxy同样提供了合并功能:

    如上图所示, 3个client注册到gRPC proxy中(c-stream),通过心跳(heartbeat)来定时续约,gRPC proxy会合并生成一个s-stream 注册到etcd server。

  • 缓存请求
    gRPC proxy会缓存来自客户端的请求,保证etcd server 频繁的被客户端请求滥用。

1. 源码实现

1.1 startGRPCProxy解读

func startGRPCProxy(cmd *cobra.Command, args []string) {
    //1. 校验参数的合法性
    checkArgs()

    //2.判断是否校验 https
    tlsinfo := newTLS(grpcProxyListenCA, grpcProxyListenCert, grpcProxyListenKey)
    if tlsinfo == nil && grpcProxyListenAutoTLS {
        host := []string{"https://" + grpcProxyListenAddr}
        dir := filepath.Join(grpcProxyDataDir, "fixtures", "proxy")
        autoTLS, err := transport.SelfCert(dir, host)
        if err != nil {
            plog.Fatal(err)
        }
        tlsinfo = &autoTLS
    }
    if tlsinfo != nil {
        plog.Infof("ServerTLS: %s", tlsinfo)
    }
    //3.生成 cmux路由
    m := mustListenCMux(tlsinfo)

    //4. grpc cmux
    grpcl := m.Match(cmux.HTTP2())
    defer func() {
        grpcl.Close()
        plog.Infof("stopping listening for grpc-proxy client requests on %s", grpcProxyListenAddr)
    }()

    //5. 生成一个向etcd服务 注册的stream链接,前面提到的合并链接也就是它产生的。
    client := mustNewClient()
    //6. 一些性能和资源监控的封装,如Prometheus,PProf等
    srvhttp, httpl := mustHTTPListener(m, tlsinfo, client)
    errc := make(chan error)
    //7. grpc 服务
    go func() { errc <- newGRPCProxyServer(client).Serve(grpcl) }()
    //8. http 服务
    go func() { errc <- srvhttp.Serve(httpl) }()
    //9. cmux serve
    go func() { errc <- m.Serve() }()
    //10.下面这个代码,跟上面srvhttp有些重复,只是它把监控信息给通过grpcProxyMetricsListenAddr给独立了出来
    if len(grpcProxyMetricsListenAddr) > 0 {
        mhttpl := mustMetricsListener(tlsinfo)
        go func() {
            mux := http.NewServeMux()
            etcdhttp.HandlePrometheus(mux)
            grpcproxy.HandleHealth(mux, client)
            plog.Fatal(http.Serve(mhttpl, mux))
        }()
    }

    // grpc-proxy is initialized, ready to serve
    notifySystemd()

    fmt.Fprintln(os.Stderr, <-errc)
    os.Exit(1)
}

1.2 服务监控

在上面的代码的第6步中和第10步都是在监控服务器的性能,首先在5步中生成了一直指向etcd服务端口的client,然后启动了两个服务:

  • Prometheus:监控服务的资源

  • health: 监控etcd服务的健康状态

以上两个服务在第10步同样能生成,只要指定参数 metrics-addr
大概的效果,我们分别启动两个服务 $ etcd ,这个服务会默认暴露一个2379的服务端口,然后启动:./etcd grpc-proxy start --metrics-addr=http://127.0.0.1:6061 --enable-pprof=true ,这个也会默认的链接2379这个端口。

  • metrics资源监控

    然后我们通过web浏览器分别打开 http://127.0.0.1:23790/metricshttp://127.0.0.1:6061/metrics 出现两个一样的网页,大概的内容是:

    # HELP etcd_debugging_disk_backend_commit_rebalance_duration_seconds The latency distributions of commit.rebalance called by bboltdb backend.
    # TYPE etcd_debugging_disk_backend_commit_rebalance_duration_seconds histogram
    etcd_debugging_disk_backend_commit_rebalance_duration_seconds_bucket{le="0.001"} 0
    etcd_debugging_disk_backend_commit_rebalance_duration_seconds_bucket{le="0.002"} 0
    .........
    .....
  • health监控(etcd服务健康状况)

    再打开地址 http://127.0.0.1:6061/healthhttp://127.0.0.1:23790/health,会显示以下内容:

    {"health":"true"}

    这个是显示在 --endpoints--discovery-srv中所指定的etcd server是否正常存活。当终止 etcd 服务之后,再次调用地址会返回 false

  • pprof 调试

    上面有个参数是 --enable-pprof=true,当指定该参数的时候,可以打开地址 http://127.0.0.1:23790/debug/pprof/,来分析程序性能。

    /debug/pprof/
    
    profiles:
    0   block
    26  goroutine
    3   heap
    0   mutex
    12  threadcreate
    
    full goroutine stack dump

总结一下,gRPC proxy 默认会在 --listen-addr 监控etcd服务的状态是否正常,同时也可以指定一个 metrics-addr端口来监控服务。强调一下, --enable-pprof 这个参数只有在 listen-addr 这个地址打开才有效。

1.3 gRPC proxy 服务

到这个地方,就到了这个章节最重要的地方,开始介绍整个gRPC proxy所提供的服务API.

通过第7步进入到具体的代码流程中,newGRPCProxyServer,代码如下:

func newGRPCProxyServer(lg *zap.Logger, client *clientv3.Client) *grpc.Server {
    if grpcProxyEnableOrdering {
        vf := ordering.NewOrderViolationSwitchEndpointClosure(*client)
        client.KV = ordering.NewKV(client.KV, vf)
        lg.Info("waiting for linearized read from cluster to recover ordering")
        for {
            _, err := client.KV.Get(context.TODO(), "_", clientv3.WithKeysOnly())
            if err == nil {
                break
            }
            lg.Warn("ordering recovery failed, retrying in 1s", zap.Error(err))
            time.Sleep(time.Second)
        }
    }

    if len(grpcProxyNamespace) > 0 {
        client.KV = namespace.NewKV(client.KV, grpcProxyNamespace)
        client.Watcher = namespace.NewWatcher(client.Watcher, grpcProxyNamespace)
        client.Lease = namespace.NewLease(client.Lease, grpcProxyNamespace)
    }

    if len(grpcProxyLeasing) > 0 {
        client.KV, _, _ = leasing.NewKV(client, grpcProxyLeasing)
    }

    kvp, _ := grpcproxy.NewKvProxy(client)
    watchp, _ := grpcproxy.NewWatchProxy(client)
    if grpcProxyResolverPrefix != "" {
        grpcproxy.Register(client, grpcProxyResolverPrefix, grpcProxyAdvertiseClientURL, grpcProxyResolverTTL)
    }
    clusterp, _ := grpcproxy.NewClusterProxy(client, grpcProxyAdvertiseClientURL, grpcProxyResolverPrefix)
    leasep, _ := grpcproxy.NewLeaseProxy(client)
    mainp := grpcproxy.NewMaintenanceProxy(client)
    authp := grpcproxy.NewAuthProxy(client)
    electionp := grpcproxy.NewElectionProxy(client)
    lockp := grpcproxy.NewLockProxy(client)

    server := grpc.NewServer(
        grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor),
        grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor),
        grpc.MaxConcurrentStreams(math.MaxUint32),
    )

    pb.RegisterKVServer(server, kvp)
    pb.RegisterWatchServer(server, watchp)
    pb.RegisterClusterServer(server, clusterp)
    pb.RegisterLeaseServer(server, leasep)
    pb.RegisterMaintenanceServer(server, mainp)
    pb.RegisterAuthServer(server, authp)
    v3electionpb.RegisterElectionServer(server, electionp)
    v3lockpb.RegisterLockServer(server, lockp)

    // set zero values for metrics registered for this grpc server
    grpc_prometheus.Register(server)

    return server
}

上面主要是通过封装 etcd的client 提供各种服务,当接受到来自用户的请求时,通过复用client连接到etcd服务,后面我们来看一下gRPC proxy所封装的各种服务。
这里提一下grpcProxyEnableOrdering和grpcProxyNamespace两个参数的意义:

  • grpcProxyEnableOrdering: experimental-serializable-ordering
    保证grpc proxy的Revision(版本号)小于或等于etcd服务器之间的Revision(版本号),后面再解释这个版本号

  • grpcProxyNamespace:
    为所有的key请求加上前缀空间

1.4 KvProxy

这个是对etcd client的 kv的封装,通过对其结构体创建方法 Newc(c *clientv3.Client)的,我们可以看出:

func Newc(c *clientv3.Client) (pb.KVServer, <-chan struct{}) {
    kv := &kvProxy{
        kv:    c.KV,
        cache: cache.NewCache(cache.DefaultMaxEntries),
    }
    donec := make(chan struct{})
    close(donec)
    return kv, donec
}

方法上面通过一个 cache 的封装来缓存客户端请求,这个正好印证前面所说的gRPC proxy可以缓存客户端请求。我们来具体的看一下这个cache是如何处理的。

type Cache interface {
    ////添加查询请求到缓存中
    Add(req *pb.RangeRequest, resp *pb.RangeResponse)
    //从缓存中获取请求结果
    Get(req *pb.RangeRequest) (*pb.RangeResponse, error)
    Compact(revision int64)
    //判断缓存是否失效
    Invalidate(key []byte, endkey []byte)
    //缓存长度
    Size() int
    Close()
}

cache接口主要提供以上的方法来缓存来子客户端的请求信息,看一下具体的cache类:

// cache implements Cache
type cache struct {
    mu  sync.RWMutex
    lru *lru.Cache

    // a reverse index for cache invalidation
    cachedRanges adt.IntervalTree

    compactedRev int64
}

上面有个lru的缓存信息(算法为最近最少未使用),同时实现了一个IntervalTree(线段树),用来缓存范围查询,具体的算法可以查看对应的源码。
总的来说就是在client的kv上面封装了一层代码,加上了一层cache。

1.5 WatchProxy

同上,这个也是封装了client的watch API,如上面所示,gRPC proxy 支持watch的链接复用,

type watchProxy struct {
    cw  clientv3.Watcher
    ctx context.Context

    leader *leader

    ranges *watchRanges

    // mu protects adding outstanding watch servers through wg.
    mu sync.Mutex

    // wg waits until all outstanding watch servers quit.
    wg sync.WaitGroup

    // kv is used for permission checking
    kv clientv3.KV
}

上面这个结构体实际上很明显能够告诉我们,如果watchProxy能够复用连接,那一定是在watchRanges中实现的。
我们先看一下watchProxy.Watch方法 由于方法太长,所以我简略的说下步骤:

1. 检查watchProxy是否退出
2. wp.wg.Add(1): watch servers +1
3.生成一个watchProxyStream结构体
4.再次判断leader是否丢失链接
5.循环判断watchProxyStream的recvLoop以及sendLoop 方法

首先我们要知道leader的作用是什么?
打开 leader.go 文件中的发现有一个叫 recvLoop() 的方法,**这个方法的作用实际上就是通过对一个key(__lostleader)的监视来定时的判断client是否失效。**
所以实际上我们的真正的业务逻辑在watchProxyStream这个结构体中中。

先来看一下 watchProxyStream.recvLoop()这个方法:

func (wps *watchProxyStream) recvLoop() error {
    for {
        req, err := wps.stream.Recv()
        if err != nil {
            return err
        }
        switch uv := req.RequestUnion.(type) {
        case *pb.WatchRequest_CreateRequest:
            cr := uv.CreateRequest

            if err = wps.checkPermissionForWatch(cr.Key, cr.RangeEnd); err != nil && err == rpctypes.ErrPermissionDenied {
                // Return WatchResponse which is caused by permission checking if and only if
                // the error is permission denied. For other errors (e.g. timeout or connection closed),
                // the permission checking mechanism should do nothing for preserving error code.
                wps.watchCh <- &pb.WatchResponse{Header: &pb.ResponseHeader{}, WatchId: -1, Created: true, Canceled: true}
                continue
            }

            w := &watcher{
                wr:  watchRange{string(cr.Key), string(cr.RangeEnd)},
                id:  wps.nextWatcherID,
                wps: wps,

                nextrev:  cr.StartRevision,
                progress: cr.ProgressNotify,
                prevKV:   cr.PrevKv,
                filters:  v3rpc.FiltersFromRequest(cr),
            }
            if !w.wr.valid() {
                w.post(&pb.WatchResponse{WatchId: -1, Created: true, Canceled: true})
                continue
            }
            wps.nextWatcherID++
            w.nextrev = cr.StartRevision
            wps.watchers[w.id] = w
            wps.ranges.add(w)
        case *pb.WatchRequest_CancelRequest:
            wps.delete(uv.CancelRequest.WatchId)
        default:
            panic("not implemented")
        }
    }
}

首先我们来明确一下 wps.stream这个流是属于与gRPC proxy连接的客户端。所以当收到来自客户端 WatchRequest_CreateRequest 请求时,会创建一个 watcher,同时会在 wps.watchers 以及 wps.ranges 中添加该watcher,并且在收到这个请求的时候用 checkPermissionForWatch 会向etcd server 同时发起一个服务,判断是否允许接入链接。在收到来自etcd server的正确回答之后,会在 rangesadd 这个方法在加载这个watcher之后同样会向etcd server 发起请求,并且得到应答之后会广播,这里面实现了一个broadcast的结构体用来做广播。

上面说的这个ranges实际上是来源于watchProxy这个结构体,而watchers来源于同一个stream中,这说明,对于客户端来说,它同样可以复用stream流来处理watch。

我们总结一下,对一来自于同一个客户端的的watch是它的stream可以复用,不同客户端的链接都会被同一链接复用,gRPC只有在收到来之客户端stream的 WatchRequest_CreateRequest请求的时候才会向 etcd server 发起请求。

1.6 lease proxy

我门先来看一下lease proxy的源码:

type leaseProxy struct {
    // leaseClient handles req from LeaseGrant() that requires a lease ID.
    leaseClient pb.LeaseClient

    lessor clientv3.Lease

    ctx context.Context

    leader *leader

    // mu protects adding outstanding leaseProxyStream through wg.
    mu sync.RWMutex

    // wg waits until all outstanding leaseProxyStream quit.
    wg sync.WaitGroup
}

实际上大致的内容大同小异,leader的作用跟上面类似,leaseClient 和 lessor继承于clientv3,后面介绍。在接收到lease请求的时候,会生成一个 leaseProxyStream 结构体,这个结构体有三个方法 recvLoopsendLoop 和上面类似,同样有一个 keepAliveLoop,该方法是 核心方法,它是通过的一个 TTL的时间定时去跟etcd server 续约。

1.6 其它proxy

gRPC proxy 基本上实现etcd client的那一套API,通过对clientv3.Client的封装

2 使用场景

gRPC proxy 通过对 etcd client 的封装,实现了与etcd连接,不同客户端请求复用同一个client,其目的是为了加少etcd server的负载。这个目的也决定了proxy的使用场景,即降低 etcd 负载。


推荐阅读
  • 本文详细介绍了一种利用 ESP8266 01S 模块构建 Web 服务器的成功实践方案。通过具体的代码示例和详细的步骤说明,帮助读者快速掌握该模块的使用方法。在疫情期间,作者重新审视并研究了这一未被充分利用的模块,最终成功实现了 Web 服务器的功能。本文不仅提供了完整的代码实现,还涵盖了调试过程中遇到的常见问题及其解决方法,为初学者提供了宝贵的参考。 ... [详细]
  • Java Socket 关键参数详解与优化建议
    Java Socket 的 API 虽然被广泛使用,但其关键参数的用途却鲜为人知。本文详细解析了 Java Socket 中的重要参数,如 backlog 参数,它用于控制服务器等待连接请求的队列长度。此外,还探讨了其他参数如 SO_TIMEOUT、SO_REUSEADDR 等的配置方法及其对性能的影响,并提供了优化建议,帮助开发者提升网络通信的稳定性和效率。 ... [详细]
  • PHP 5.5.31 和 PHP 5.6.17 安全更新发布
    PHP 5.5.31 和 PHP 5.6.17 已正式发布,主要包含多个安全修复。强烈建议所有用户尽快升级至最新版本以确保系统安全。 ... [详细]
  • 深入探索HTTP协议的学习与实践
    在初次访问某个网站时,由于本地没有缓存,服务器会返回一个200状态码的响应,并在响应头中设置Etag和Last-Modified等缓存控制字段。这些字段用于后续请求时验证资源是否已更新,从而提高页面加载速度和减少带宽消耗。本文将深入探讨HTTP缓存机制及其在实际应用中的优化策略,帮助读者更好地理解和运用HTTP协议。 ... [详细]
  • 本文介绍了 Go 语言中的高性能、可扩展、轻量级 Web 框架 Echo。Echo 框架简单易用,仅需几行代码即可启动一个高性能 HTTP 服务。 ... [详细]
  • 本文详细解析了ASP.NET 2.0中的Callback机制,不仅介绍了基本的使用方法,还深入探讨了其背后的实现原理。通过对比Atlas框架,帮助读者更好地理解和应用这一机制。 ... [详细]
  • 本文详细介绍了在 CentOS 7 系统中配置 fstab 文件以实现开机自动挂载 NFS 共享目录的方法,并解决了常见的配置失败问题。 ... [详细]
  • 本文总结了一些开发中常见的问题及其解决方案,包括特性过滤器的使用、NuGet程序集版本冲突、线程存储、溢出检查、ThreadPool的最大线程数设置、Redis使用中的问题以及Task.Result和Task.GetAwaiter().GetResult()的区别。 ... [详细]
  • DVWA学习笔记系列:深入理解CSRF攻击机制
    DVWA学习笔记系列:深入理解CSRF攻击机制 ... [详细]
  • 【实例简介】本文详细介绍了如何在PHP中实现微信支付的退款功能,并提供了订单创建类的完整代码及调用示例。在配置过程中,需确保正确设置相关参数,特别是证书路径应根据项目实际情况进行调整。为了保证系统的安全性,存放证书的目录需要设置为可读权限。值得注意的是,普通支付操作无需证书,但在执行退款操作时必须提供证书。此外,本文还对常见的错误处理和调试技巧进行了说明,帮助开发者快速定位和解决问题。 ... [详细]
  • Hyperledger Fabric 1.4 节点 SDK 快速入门指南
    本文将详细介绍如何利用 Hyperledger Fabric 1.4 的 Node.js SDK 开发应用程序。通过最新版本的 Fabric Node.js SDK,开发者可以更高效地构建和部署基于区块链的应用,实现数据的安全共享和交易处理。文章将涵盖环境配置、SDK 安装、示例代码以及常见问题的解决方法,帮助读者快速上手并掌握核心功能。 ... [详细]
  • 如何使用 `org.apache.tomcat.websocket.server.WsServerContainer.findMapping()` 方法及其代码示例解析 ... [详细]
  • 本文深入解析了通过JDBC实现ActiveMQ消息持久化的机制。JDBC能够将消息可靠地存储在多种关系型数据库中,如MySQL、SQL Server、Oracle和DB2等。采用JDBC持久化方式时,数据库会自动生成三个关键表:`activemq_msgs`、`activemq_lock`和`activemq_ACKS`,分别用于存储消息数据、锁定信息和确认状态。这种机制不仅提高了消息的可靠性,还增强了系统的可扩展性和容错能力。 ... [详细]
  • 在Ubuntu系统中安装Android SDK的详细步骤及解决“Failed to fetch URL https://dlssl.google.com/”错误的方法
    在Ubuntu 11.10 x64系统中安装Android SDK的详细步骤,包括配置环境变量和解决“Failed to fetch URL https://dlssl.google.com/”错误的方法。本文详细介绍了如何在该系统上顺利安装并配置Android SDK,确保开发环境的稳定性和高效性。此外,还提供了解决网络连接问题的实用技巧,帮助用户克服常见的安装障碍。 ... [详细]
  • 尽管我们尽最大努力,任何软件开发过程中都难免会出现缺陷。为了更有效地提升对支持部门的协助与支撑,本文探讨了多种策略和最佳实践,旨在通过改进沟通、增强培训和支持流程来减少这些缺陷的影响,并提高整体服务质量和客户满意度。 ... [详细]
author-avatar
mobiledu2502929447
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有