热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

ETCD数据库源码分析——服务端PUT流程

在ETCD数据库源码分析——etcdgRPC服务API注册服务小节,有如下grpc服务注册代码,包含了服务端PUT流程所属的KVServer。Regis

在ETCD数据库源码分析——etcd gRPC 服务 API注册服务小节,有如下grpc服务注册代码,包含了服务端PUT流程所属的KVServer。
在这里插入图片描述
RegisterKVServer函数定义在/api/etcdserverpb/rpc.pb.go文件中,这里pb.RegisterKVServer(grpcServer, NewQutaKVServer(s))中的s指的就是ETCD数据库中的最重要的etcdserver结构体。

func RegisterKVServer(s *grpc.Server, srv KVServer) {s.RegisterService(&_KV_serviceDesc, srv)
}

在/api/etcdserverpb/rpc.pb.go里面,可以看到上面定义的ServiceName和MethodName,可以找到Put方法对应的函数_KV_Put_Handler。
在这里插入图片描述
从_KV_Put_Handler函数可以看到其最重要的语句就是srv.(KVServer).Put(ctx, in),其实就是调用的quotaKVServer.(KVServer).Put(ctx, in)

func _KV_Put_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {in := new(PutRequest)if err := dec(in); err != nil { return nil, err }if interceptor == nil { return srv.(KVServer).Put(ctx, in) }info := &grpc.UnaryServerInfo{Server: srv,FullMethod: "/etcdserverpb.KV/Put",}handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(KVServer).Put(ctx, req.(*PutRequest)) }return interceptor(ctx, in, info, handler)
}

因此这里从NewQuotaKVServer看出,quotaKVServer和pb.KVServer接口是组合关系,NewQuotaKVServer函数中调用NewKVServer函数初始化继承了pb.KVServer接口的kvServer结构体。

// server/etcdserver/api/v3rpc/quota.go
type quotaKVServer struct {pb.KVServerqa quotaAlarmer
}
func NewQuotaKVServer(s *etcdserver.EtcdServer) pb.KVServer {return &quotaKVServer{NewKVServer(s),quotaAlarmer{newBackendQuota(s, "kv"), s, s.MemberId()},}
}

etcdserver.RaftKV接口继承了KVServer接口。而pb.RegisterKVServer(grpcServer, NewQutaKVServer(s))中的s指的就是ETCD数据库中的最重要的etcdserver结构体,所以这里的etcdserver.RaftKV指向的就是etcdserver结构体。

type kvServer struct {hdr header // 用于填充响应消息的头信息kv etcdserver.RaftKV// maxTxnOps is the max operations per txn.// e.g suppose maxTxnOps = 128.// Txn.Success can have at most 128 operations,// and Txn.Failure can have at most 128 operations.maxTxnOps uint
}
func NewKVServer(s *etcdserver.EtcdServer) pb.KVServer {return &kvServer{hdr: newHeader(s), kv: s, maxTxnOps: s.Cfg.MaxTxnOps}
}
func (s *kvServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {if err := checkPutRequest(r); err != nil { return nil, err }resp, err := s.kv.Put(ctx, r)if err != nil { return nil, togRPCError(err) }s.hdr.fill(resp.Header)return resp, nil
}

kvServer结构体Put函数处理流程如下:首先会对请求消息进行各方面的检查,检查完之后会将所有的请求交给其内封装的RaftKV接口进行处理,待处理完成得到响应消息之后,会通过header.fill()方法填充响应的头信息,最后将完整的响应消息返回给客户端。因此,往下追踪 srv.(KVServer).Put(ctx, in)其实就是调用(s *EtcdServer) Put()函数。

// server/etcdserver/v3_server.go
func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {ctx = context.WithValue(ctx, traceutil.StartTimeKey, time.Now())resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Put: r})if err != nil { return nil, err }return resp.(*pb.PutResponse), nil
}
func (s *EtcdServer) raftRequest(ctx context.Context, r pb.InternalRaftRequest) (proto.Message, error) {return s.raftRequestOnce(ctx, r)
}
func (s *EtcdServer) raftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (proto.Message, error) {result, err := s.processInternalRaftRequestOnce(ctx, r)if err != nil { return nil, err }if result.Err != nil { return nil, result.Err }if startTime, ok := ctx.Value(traceutil.StartTimeKey).(time.Time); ok && result.Trace != nil {applyStart := result.Trace.GetStartTime()// The trace object is created in toApply. Here reset the start time to trace// the raft request time by the difference between the request start time// and toApply start timeresult.Trace.SetStartTime(startTime)result.Trace.InsertStep(0, applyStart, "process raft request")result.Trace.LogIfLong(traceThreshold)}return result.Resp, nil
}

从上面流程看出最终调用的是 (s *EtcdServer) processInternalRaftRequestOnce(…)函数,在该函数里面有一句关键调用 s.r.Propose(cctx, data)。s是EtcdServer, r是其里面的成员变量raftNode, 这就是进入raft协议相关的节奏了。

func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (*apply2.Result, error) {ai :&#61; s.getAppliedIndex()ci :&#61; s.getCommittedIndex()// 如果apply和commit的raft log差距太大需要拒绝客户的请求if ci > ai&#43;maxGapBetweenApplyAndCommitIndex { return nil, errors.ErrTooManyRequests }r.Header &#61; &pb.RequestHeader{ ID: s.reqIDGen.Next(), } // 利用reqIDGen产生递增标号// check authinfo if it is not InternalAuthenticateRequest 对于非InternalAuthenticateRequest的请求&#xff0c;需要检查authinfo的信息if r.Authenticate &#61;&#61; nil {authInfo, err :&#61; s.AuthInfoFromCtx(ctx)if err !&#61; nil { return nil, err }if authInfo !&#61; nil {r.Header.Username &#61; authInfo.Usernamer.Header.AuthRevision &#61; authInfo.Revision}}data, err :&#61; r.Marshal()if err !&#61; nil { return nil, err }if len(data) > int(s.Cfg.MaxRequestBytes) { return nil, errors.ErrRequestTooLarge } // 发送的数据太大id :&#61; r.IDif id &#61;&#61; 0 { id &#61; r.Header.ID }ch :&#61; s.w.Register(id) // Register waits returns a chan that waits on the given ID// The chan will be triggered when Trigger is called with the same ID.// 这里将id注册到etcdsever结构体中的Wait中&#xff0c;等待Wait.Trigger(id, nil)// 正常情况下当raft log被应用后会调用Wait.Trigger(id, nil)触发管道cctx, cancel :&#61; context.WithTimeout(ctx, s.Cfg.ReqTimeout())defer cancel()start :&#61; time.Now()err &#61; s.r.Propose(cctx, data) // 调用raft模块的s.r.Propose(cctx, data)if err !&#61; nil {proposalsFailed.Inc()s.w.Trigger(id, nil) // GC waitreturn nil, err}proposalsPending.Inc()defer proposalsPending.Dec()select {case x :&#61; <-ch: // Wait.Trigger(id, nil)管道被触发return x.(*apply2.Result), nilcase <-cctx.Done():proposalsFailed.Inc()s.w.Trigger(id, nil) // GC waitreturn nil, s.parseProposeCtxErr(cctx.Err(), start)case <-s.done:return nil, errors.ErrStopped}
}

欢迎关注微信公众号肥叔菌&#xff0c;最新内容优先在微信公众号发布。


推荐阅读
  • 本文介绍了 Go 语言中的高性能、可扩展、轻量级 Web 框架 Echo。Echo 框架简单易用,仅需几行代码即可启动一个高性能 HTTP 服务。 ... [详细]
  • python模块之正则
    re模块可以读懂你写的正则表达式根据你写的表达式去执行任务用re去操作正则正则表达式使用一些规则来检测一些字符串是否符合个人要求,从一段字符串中找到符合要求的内容。在 ... [详细]
  • 目录预备知识导包构建数据集神经网络结构训练测试精度可视化计算模型精度损失可视化输出网络结构信息训练神经网络定义参数载入数据载入神经网络结构、损失及优化训练及测试损失、精度可视化qu ... [详细]
  • 2020年9月15日,Oracle正式发布了最新的JDK 15版本。本次更新带来了许多新特性,包括隐藏类、EdDSA签名算法、模式匹配、记录类、封闭类和文本块等。 ... [详细]
  • 本文节选自《NLTK基础教程——用NLTK和Python库构建机器学习应用》一书的第1章第1.2节,作者Nitin Hardeniya。本文将带领读者快速了解Python的基础知识,为后续的机器学习应用打下坚实的基础。 ... [详细]
  • 本文将带你快速了解 SpringMVC 框架的基本使用方法,通过实现一个简单的 Controller 并在浏览器中访问,展示 SpringMVC 的强大与简便。 ... [详细]
  • Spring – Bean Life Cycle
    Spring – Bean Life Cycle ... [详细]
  • 在多线程并发环境中,普通变量的操作往往是线程不安全的。本文通过一个简单的例子,展示了如何使用 AtomicInteger 类及其核心的 CAS 无锁算法来保证线程安全。 ... [详细]
  • 兆芯X86 CPU架构的演进与现状(国产CPU系列)
    本文详细介绍了兆芯X86 CPU架构的发展历程,从公司成立背景到关键技术授权,再到具体芯片架构的演进,全面解析了兆芯在国产CPU领域的贡献与挑战。 ... [详细]
  • 使用方法:将要控制的角色拖到TargetBody,将相机的焦点拖到CamerPivot,,建议CameraPivot是一个放在TargetBody下的子物体,并且位置应该是在Tar ... [详细]
  • 本文将介绍如何在混合开发(Hybrid)应用中实现Native与HTML5的交互,包括基本概念、学习目标以及具体的实现步骤。 ... [详细]
  • 包含phppdoerrorcode的词条 ... [详细]
  • Hadoop的文件操作位于包org.apache.hadoop.fs里面,能够进行新建、删除、修改等操作。比较重要的几个类:(1)Configurati ... [详细]
  • 本文详细介绍了Java反射机制的基本概念、获取Class对象的方法、反射的主要功能及其在实际开发中的应用。通过具体示例,帮助读者更好地理解和使用Java反射。 ... [详细]
  • IOS Run loop详解
    为什么80%的码农都做不了架构师?转自http:blog.csdn.netztp800201articledetails9240913感谢作者分享Objecti ... [详细]
author-avatar
Panzerkampfwagen-VI_238
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有