grpc proxy是一个基于L7层的无状态的gRPC的etcd反向代理服务。这个L7指的是OSI模型中的第七层,会话层。它除了提供etcd client的基本功能之外,同样提供且优化了以下功能:
Watch API
grpc Pproxy提供监听机制,客户端可以监听某个key或者某些key的变更(v2和v3的机制不同,参看后面文章)。用于监听和推送变更。
它可以将多个客户端(c-watchers)对同一个key的监控合并到一个链接(s-watcher)到 etcd server的请求。同时它会广播从s-watcher收到的时间到所有的c-watchers。
如上图所示, 3个client对同一个key A的watcher,注册到gRPC proxy中,gRPC proxy会合并生成一个s-watcher 注册到etcd server。
lease API
lease API支持续约机制,客户端通过定时刷新(heartbean)来实现续约(v2和v3的实现机制也不一样)。用于集群监控以及服务注册发现。
跟上图类似,为了减少etcd server的交互次数,gRPC proxy同样提供了合并功能:
如上图所示, 3个client注册到gRPC proxy中(c-stream),通过心跳(heartbeat)来定时续约,gRPC proxy会合并生成一个s-stream 注册到etcd server。
缓存请求
gRPC proxy会缓存来自客户端的请求,保证etcd server 频繁的被客户端请求滥用。
func startGRPCProxy(cmd *cobra.Command, args []string) {
//1. 校验参数的合法性
checkArgs()
//2.判断是否校验 https
tlsinfo := newTLS(grpcProxyListenCA, grpcProxyListenCert, grpcProxyListenKey)
if tlsinfo == nil && grpcProxyListenAutoTLS {
host := []string{"https://" + grpcProxyListenAddr}
dir := filepath.Join(grpcProxyDataDir, "fixtures", "proxy")
autoTLS, err := transport.SelfCert(dir, host)
if err != nil {
plog.Fatal(err)
}
tlsinfo = &autoTLS
}
if tlsinfo != nil {
plog.Infof("ServerTLS: %s", tlsinfo)
}
//3.生成 cmux路由
m := mustListenCMux(tlsinfo)
//4. grpc cmux
grpcl := m.Match(cmux.HTTP2())
defer func() {
grpcl.Close()
plog.Infof("stopping listening for grpc-proxy client requests on %s", grpcProxyListenAddr)
}()
//5. 生成一个向etcd服务 注册的stream链接,前面提到的合并链接也就是它产生的。
client := mustNewClient()
//6. 一些性能和资源监控的封装,如Prometheus,PProf等
srvhttp, httpl := mustHTTPListener(m, tlsinfo, client)
errc := make(chan error)
//7. grpc 服务
go func() { errc <- newGRPCProxyServer(client).Serve(grpcl) }()
//8. http 服务
go func() { errc <- srvhttp.Serve(httpl) }()
//9. cmux serve
go func() { errc <- m.Serve() }()
//10.下面这个代码,跟上面srvhttp有些重复,只是它把监控信息给通过grpcProxyMetricsListenAddr给独立了出来
if len(grpcProxyMetricsListenAddr) > 0 {
mhttpl := mustMetricsListener(tlsinfo)
go func() {
mux := http.NewServeMux()
etcdhttp.HandlePrometheus(mux)
grpcproxy.HandleHealth(mux, client)
plog.Fatal(http.Serve(mhttpl, mux))
}()
}
// grpc-proxy is initialized, ready to serve
notifySystemd()
fmt.Fprintln(os.Stderr, <-errc)
os.Exit(1)
}
在上面的代码的第6步中和第10步都是在监控服务器的性能,首先在5步中生成了一直指向etcd服务端口的client,然后启动了两个服务:
Prometheus:监控服务的资源
health: 监控etcd服务的健康状态
以上两个服务在第10步同样能生成,只要指定参数 metrics-addr
。
大概的效果,我们分别启动两个服务 $ etcd
,这个服务会默认暴露一个2379的服务端口,然后启动:./etcd grpc-proxy start --metrics-addr=http://127.0.0.1:6061 --enable-pprof=true
,这个也会默认的链接2379这个端口。
metrics资源监控
然后我们通过web浏览器分别打开 http://127.0.0.1:23790/metrics
和http://127.0.0.1:6061/metrics
出现两个一样的网页,大概的内容是:
# HELP etcd_debugging_disk_backend_commit_rebalance_duration_seconds The latency distributions of commit.rebalance called by bboltdb backend.
# TYPE etcd_debugging_disk_backend_commit_rebalance_duration_seconds histogram
etcd_debugging_disk_backend_commit_rebalance_duration_seconds_bucket{le="0.001"} 0
etcd_debugging_disk_backend_commit_rebalance_duration_seconds_bucket{le="0.002"} 0
.........
.....
health监控(etcd服务健康状况)
再打开地址 http://127.0.0.1:6061/health
和 http://127.0.0.1:23790/health
,会显示以下内容:
{"health":"true"}
这个是显示在 --endpoints
或 --discovery-srv
中所指定的etcd server是否正常存活。当终止 etcd
服务之后,再次调用地址会返回 false
。
pprof 调试
上面有个参数是 --enable-pprof=true
,当指定该参数的时候,可以打开地址 http://127.0.0.1:23790/debug/pprof/
,来分析程序性能。
/debug/pprof/
profiles:
0 block
26 goroutine
3 heap
0 mutex
12 threadcreate
full goroutine stack dump
总结一下,gRPC proxy 默认会在 --listen-addr
监控etcd服务的状态是否正常,同时也可以指定一个 metrics-addr
端口来监控服务。强调一下, --enable-pprof
这个参数只有在 listen-addr
这个地址打开才有效。
到这个地方,就到了这个章节最重要的地方,开始介绍整个gRPC proxy所提供的服务API.
通过第7步进入到具体的代码流程中,newGRPCProxyServer
,代码如下:
func newGRPCProxyServer(lg *zap.Logger, client *clientv3.Client) *grpc.Server {
if grpcProxyEnableOrdering {
vf := ordering.NewOrderViolationSwitchEndpointClosure(*client)
client.KV = ordering.NewKV(client.KV, vf)
lg.Info("waiting for linearized read from cluster to recover ordering")
for {
_, err := client.KV.Get(context.TODO(), "_", clientv3.WithKeysOnly())
if err == nil {
break
}
lg.Warn("ordering recovery failed, retrying in 1s", zap.Error(err))
time.Sleep(time.Second)
}
}
if len(grpcProxyNamespace) > 0 {
client.KV = namespace.NewKV(client.KV, grpcProxyNamespace)
client.Watcher = namespace.NewWatcher(client.Watcher, grpcProxyNamespace)
client.Lease = namespace.NewLease(client.Lease, grpcProxyNamespace)
}
if len(grpcProxyLeasing) > 0 {
client.KV, _, _ = leasing.NewKV(client, grpcProxyLeasing)
}
kvp, _ := grpcproxy.NewKvProxy(client)
watchp, _ := grpcproxy.NewWatchProxy(client)
if grpcProxyResolverPrefix != "" {
grpcproxy.Register(client, grpcProxyResolverPrefix, grpcProxyAdvertiseClientURL, grpcProxyResolverTTL)
}
clusterp, _ := grpcproxy.NewClusterProxy(client, grpcProxyAdvertiseClientURL, grpcProxyResolverPrefix)
leasep, _ := grpcproxy.NewLeaseProxy(client)
mainp := grpcproxy.NewMaintenanceProxy(client)
authp := grpcproxy.NewAuthProxy(client)
electionp := grpcproxy.NewElectionProxy(client)
lockp := grpcproxy.NewLockProxy(client)
server := grpc.NewServer(
grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor),
grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor),
grpc.MaxConcurrentStreams(math.MaxUint32),
)
pb.RegisterKVServer(server, kvp)
pb.RegisterWatchServer(server, watchp)
pb.RegisterClusterServer(server, clusterp)
pb.RegisterLeaseServer(server, leasep)
pb.RegisterMaintenanceServer(server, mainp)
pb.RegisterAuthServer(server, authp)
v3electionpb.RegisterElectionServer(server, electionp)
v3lockpb.RegisterLockServer(server, lockp)
// set zero values for metrics registered for this grpc server
grpc_prometheus.Register(server)
return server
}
上面主要是通过封装 etcd的client 提供各种服务,当接受到来自用户的请求时,通过复用client连接到etcd服务,后面我们来看一下gRPC proxy所封装的各种服务。
这里提一下grpcProxyEnableOrdering和grpcProxyNamespace两个参数的意义:
grpcProxyEnableOrdering: experimental-serializable-ordering
保证grpc proxy的Revision(版本号)小于或等于etcd服务器之间的Revision(版本号),后面再解释这个版本号
grpcProxyNamespace:
为所有的key请求加上前缀空间
这个是对etcd client的 kv的封装,通过对其结构体创建方法 Newc(c *clientv3.Client)
的,我们可以看出:
func Newc(c *clientv3.Client) (pb.KVServer, <-chan struct{}) {
kv := &kvProxy{
kv: c.KV,
cache: cache.NewCache(cache.DefaultMaxEntries),
}
donec := make(chan struct{})
close(donec)
return kv, donec
}
方法上面通过一个 cache
的封装来缓存客户端请求,这个正好印证前面所说的gRPC proxy可以缓存客户端请求。我们来具体的看一下这个cache是如何处理的。
type Cache interface {
////添加查询请求到缓存中
Add(req *pb.RangeRequest, resp *pb.RangeResponse)
//从缓存中获取请求结果
Get(req *pb.RangeRequest) (*pb.RangeResponse, error)
Compact(revision int64)
//判断缓存是否失效
Invalidate(key []byte, endkey []byte)
//缓存长度
Size() int
Close()
}
cache接口主要提供以上的方法来缓存来子客户端的请求信息,看一下具体的cache类:
// cache implements Cache
type cache struct {
mu sync.RWMutex
lru *lru.Cache
// a reverse index for cache invalidation
cachedRanges adt.IntervalTree
compactedRev int64
}
上面有个lru的缓存信息(算法为最近最少未使用),同时实现了一个IntervalTree(线段树),用来缓存范围查询,具体的算法可以查看对应的源码。
总的来说就是在client的kv上面封装了一层代码,加上了一层cache。
同上,这个也是封装了client的watch API,如上面所示,gRPC proxy 支持watch的链接复用,
type watchProxy struct {
cw clientv3.Watcher
ctx context.Context
leader *leader
ranges *watchRanges
// mu protects adding outstanding watch servers through wg.
mu sync.Mutex
// wg waits until all outstanding watch servers quit.
wg sync.WaitGroup
// kv is used for permission checking
kv clientv3.KV
}
上面这个结构体实际上很明显能够告诉我们,如果watchProxy能够复用连接,那一定是在watchRanges中实现的。
我们先看一下watchProxy.Watch方法 由于方法太长,所以我简略的说下步骤:
1. 检查watchProxy是否退出
2. wp.wg.Add(1): watch servers +1
3.生成一个watchProxyStream结构体
4.再次判断leader是否丢失链接
5.循环判断watchProxyStream的recvLoop以及sendLoop 方法
首先我们要知道leader的作用是什么?
打开 leader.go
文件中的发现有一个叫 recvLoop()
的方法,**这个方法的作用实际上就是通过对一个key(__lostleader)的监视来定时的判断client是否失效。**
所以实际上我们的真正的业务逻辑在watchProxyStream这个结构体中中。
先来看一下 watchProxyStream.recvLoop()这个方法:
func (wps *watchProxyStream) recvLoop() error {
for {
req, err := wps.stream.Recv()
if err != nil {
return err
}
switch uv := req.RequestUnion.(type) {
case *pb.WatchRequest_CreateRequest:
cr := uv.CreateRequest
if err = wps.checkPermissionForWatch(cr.Key, cr.RangeEnd); err != nil && err == rpctypes.ErrPermissionDenied {
// Return WatchResponse which is caused by permission checking if and only if
// the error is permission denied. For other errors (e.g. timeout or connection closed),
// the permission checking mechanism should do nothing for preserving error code.
wps.watchCh <- &pb.WatchResponse{Header: &pb.ResponseHeader{}, WatchId: -1, Created: true, Canceled: true}
continue
}
w := &watcher{
wr: watchRange{string(cr.Key), string(cr.RangeEnd)},
id: wps.nextWatcherID,
wps: wps,
nextrev: cr.StartRevision,
progress: cr.ProgressNotify,
prevKV: cr.PrevKv,
filters: v3rpc.FiltersFromRequest(cr),
}
if !w.wr.valid() {
w.post(&pb.WatchResponse{WatchId: -1, Created: true, Canceled: true})
continue
}
wps.nextWatcherID++
w.nextrev = cr.StartRevision
wps.watchers[w.id] = w
wps.ranges.add(w)
case *pb.WatchRequest_CancelRequest:
wps.delete(uv.CancelRequest.WatchId)
default:
panic("not implemented")
}
}
}
首先我们来明确一下 wps.stream这个流是属于与gRPC proxy连接的客户端。所以当收到来自客户端 WatchRequest_CreateRequest
请求时,会创建一个 watcher
,同时会在 wps.watchers
以及 wps.ranges
中添加该watcher,并且在收到这个请求的时候用 checkPermissionForWatch
会向etcd server 同时发起一个服务,判断是否允许接入链接。在收到来自etcd server的正确回答之后,会在 ranges
中 add
这个方法在加载这个watcher之后同样会向etcd server 发起请求,并且得到应答之后会广播,这里面实现了一个broadcast的结构体用来做广播。
上面说的这个ranges实际上是来源于watchProxy这个结构体,而watchers来源于同一个stream中,这说明,对于客户端来说,它同样可以复用stream流来处理watch。
我们总结一下,对一来自于同一个客户端的的watch是它的stream可以复用,不同客户端的链接都会被同一链接复用,gRPC只有在收到来之客户端stream的 WatchRequest_CreateRequest
请求的时候才会向 etcd server 发起请求。
我门先来看一下lease proxy的源码:
type leaseProxy struct {
// leaseClient handles req from LeaseGrant() that requires a lease ID.
leaseClient pb.LeaseClient
lessor clientv3.Lease
ctx context.Context
leader *leader
// mu protects adding outstanding leaseProxyStream through wg.
mu sync.RWMutex
// wg waits until all outstanding leaseProxyStream quit.
wg sync.WaitGroup
}
实际上大致的内容大同小异,leader的作用跟上面类似,leaseClient 和 lessor继承于clientv3,后面介绍。在接收到lease请求的时候,会生成一个 leaseProxyStream
结构体,这个结构体有三个方法 recvLoop
和 sendLoop
和上面类似,同样有一个 keepAliveLoop
,该方法是 核心方法,它是通过的一个 TTL的时间定时去跟etcd server 续约。
gRPC proxy 基本上实现etcd client的那一套API,通过对clientv3.Client的封装。
gRPC proxy 通过对 etcd client 的封装,实现了与etcd连接,不同客户端请求复用同一个client,其目的是为了加少etcd server的负载。这个目的也决定了proxy的使用场景,即降低 etcd
负载。