在ETCD数据库源码分析——etcd gRPC 服务 API注册服务小节,有如下grpc服务注册代码,包含了服务端PUT流程所属的KVServer。
RegisterKVServer函数定义在/api/etcdserverpb/rpc.pb.go文件中,这里pb.RegisterKVServer(grpcServer, NewQutaKVServer(s))
中的s指的就是ETCD数据库中的最重要的etcdserver结构体。
func RegisterKVServer(s *grpc.Server, srv KVServer) {s.RegisterService(&_KV_serviceDesc, srv)
}
在/api/etcdserverpb/rpc.pb.go里面,可以看到上面定义的ServiceName和MethodName,可以找到Put方法对应的函数_KV_Put_Handler。
从_KV_Put_Handler函数可以看到其最重要的语句就是srv.(KVServer).Put(ctx, in)
,其实就是调用的quotaKVServer.(KVServer).Put(ctx, in)
。
func _KV_Put_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {in := new(PutRequest)if err := dec(in); err != nil { return nil, err }if interceptor == nil { return srv.(KVServer).Put(ctx, in) }info := &grpc.UnaryServerInfo{Server: srv,FullMethod: "/etcdserverpb.KV/Put",}handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(KVServer).Put(ctx, req.(*PutRequest)) }return interceptor(ctx, in, info, handler)
}
因此这里从NewQuotaKVServer看出,quotaKVServer和pb.KVServer接口是组合关系,NewQuotaKVServer函数中调用NewKVServer函数初始化继承了pb.KVServer接口的kvServer结构体。
type quotaKVServer struct {pb.KVServerqa quotaAlarmer
}
func NewQuotaKVServer(s *etcdserver.EtcdServer) pb.KVServer {return "aKVServer{NewKVServer(s),quotaAlarmer{newBackendQuota(s, "kv"), s, s.MemberId()},}
}
etcdserver.RaftKV接口继承了KVServer接口。而pb.RegisterKVServer(grpcServer, NewQutaKVServer(s))
中的s指的就是ETCD数据库中的最重要的etcdserver结构体,所以这里的etcdserver.RaftKV指向的就是etcdserver结构体。
type kvServer struct {hdr header kv etcdserver.RaftKVmaxTxnOps uint
}
func NewKVServer(s *etcdserver.EtcdServer) pb.KVServer {return &kvServer{hdr: newHeader(s), kv: s, maxTxnOps: s.Cfg.MaxTxnOps}
}
func (s *kvServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {if err := checkPutRequest(r); err != nil { return nil, err }resp, err := s.kv.Put(ctx, r)if err != nil { return nil, togRPCError(err) }s.hdr.fill(resp.Header)return resp, nil
}
kvServer结构体Put函数处理流程如下:首先会对请求消息进行各方面的检查,检查完之后会将所有的请求交给其内封装的RaftKV接口进行处理,待处理完成得到响应消息之后,会通过header.fill()方法填充响应的头信息,最后将完整的响应消息返回给客户端。因此,往下追踪 srv.(KVServer).Put(ctx, in)其实就是调用(s *EtcdServer) Put()函数。
func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {ctx = context.WithValue(ctx, traceutil.StartTimeKey, time.Now())resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Put: r})if err != nil { return nil, err }return resp.(*pb.PutResponse), nil
}
func (s *EtcdServer) raftRequest(ctx context.Context, r pb.InternalRaftRequest) (proto.Message, error) {return s.raftRequestOnce(ctx, r)
}
func (s *EtcdServer) raftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (proto.Message, error) {result, err := s.processInternalRaftRequestOnce(ctx, r)if err != nil { return nil, err }if result.Err != nil { return nil, result.Err }if startTime, ok := ctx.Value(traceutil.StartTimeKey).(time.Time); ok && result.Trace != nil {applyStart := result.Trace.GetStartTime()result.Trace.SetStartTime(startTime)result.Trace.InsertStep(0, applyStart, "process raft request")result.Trace.LogIfLong(traceThreshold)}return result.Resp, nil
}
从上面流程看出最终调用的是 (s *EtcdServer) processInternalRaftRequestOnce(…)函数,在该函数里面有一句关键调用 s.r.Propose(cctx, data)。s是EtcdServer, r是其里面的成员变量raftNode, 这就是进入raft协议相关的节奏了。
func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (*apply2.Result, error) {ai :&#61; s.getAppliedIndex()ci :&#61; s.getCommittedIndex()if ci > ai&#43;maxGapBetweenApplyAndCommitIndex { return nil, errors.ErrTooManyRequests }r.Header &#61; &pb.RequestHeader{ ID: s.reqIDGen.Next(), } if r.Authenticate &#61;&#61; nil {authInfo, err :&#61; s.AuthInfoFromCtx(ctx)if err !&#61; nil { return nil, err }if authInfo !&#61; nil {r.Header.Username &#61; authInfo.Usernamer.Header.AuthRevision &#61; authInfo.Revision}}data, err :&#61; r.Marshal()if err !&#61; nil { return nil, err }if len(data) > int(s.Cfg.MaxRequestBytes) { return nil, errors.ErrRequestTooLarge } id :&#61; r.IDif id &#61;&#61; 0 { id &#61; r.Header.ID }ch :&#61; s.w.Register(id) cctx, cancel :&#61; context.WithTimeout(ctx, s.Cfg.ReqTimeout())defer cancel()start :&#61; time.Now()err &#61; s.r.Propose(cctx, data) if err !&#61; nil {proposalsFailed.Inc()s.w.Trigger(id, nil) return nil, err}proposalsPending.Inc()defer proposalsPending.Dec()select {case x :&#61; <-ch: return x.(*apply2.Result), nilcase <-cctx.Done():proposalsFailed.Inc()s.w.Trigger(id, nil) return nil, s.parseProposeCtxErr(cctx.Err(), start)case <-s.done:return nil, errors.ErrStopped}
}
欢迎关注微信公众号肥叔菌&#xff0c;最新内容优先在微信公众号发布。