热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

ETCD数据库源码分析——服务端PUT流程

在ETCD数据库源码分析——etcdgRPC服务API注册服务小节,有如下grpc服务注册代码,包含了服务端PUT流程所属的KVServer。Regis

在ETCD数据库源码分析——etcd gRPC 服务 API注册服务小节,有如下grpc服务注册代码,包含了服务端PUT流程所属的KVServer。
在这里插入图片描述
RegisterKVServer函数定义在/api/etcdserverpb/rpc.pb.go文件中,这里pb.RegisterKVServer(grpcServer, NewQutaKVServer(s))中的s指的就是ETCD数据库中的最重要的etcdserver结构体。

func RegisterKVServer(s *grpc.Server, srv KVServer) {s.RegisterService(&_KV_serviceDesc, srv)
}

在/api/etcdserverpb/rpc.pb.go里面,可以看到上面定义的ServiceName和MethodName,可以找到Put方法对应的函数_KV_Put_Handler。
在这里插入图片描述
从_KV_Put_Handler函数可以看到其最重要的语句就是srv.(KVServer).Put(ctx, in),其实就是调用的quotaKVServer.(KVServer).Put(ctx, in)

func _KV_Put_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {in := new(PutRequest)if err := dec(in); err != nil { return nil, err }if interceptor == nil { return srv.(KVServer).Put(ctx, in) }info := &grpc.UnaryServerInfo{Server: srv,FullMethod: "/etcdserverpb.KV/Put",}handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(KVServer).Put(ctx, req.(*PutRequest)) }return interceptor(ctx, in, info, handler)
}

因此这里从NewQuotaKVServer看出,quotaKVServer和pb.KVServer接口是组合关系,NewQuotaKVServer函数中调用NewKVServer函数初始化继承了pb.KVServer接口的kvServer结构体。

// server/etcdserver/api/v3rpc/quota.go
type quotaKVServer struct {pb.KVServerqa quotaAlarmer
}
func NewQuotaKVServer(s *etcdserver.EtcdServer) pb.KVServer {return &quotaKVServer{NewKVServer(s),quotaAlarmer{newBackendQuota(s, "kv"), s, s.MemberId()},}
}

etcdserver.RaftKV接口继承了KVServer接口。而pb.RegisterKVServer(grpcServer, NewQutaKVServer(s))中的s指的就是ETCD数据库中的最重要的etcdserver结构体,所以这里的etcdserver.RaftKV指向的就是etcdserver结构体。

type kvServer struct {hdr header // 用于填充响应消息的头信息kv etcdserver.RaftKV// maxTxnOps is the max operations per txn.// e.g suppose maxTxnOps = 128.// Txn.Success can have at most 128 operations,// and Txn.Failure can have at most 128 operations.maxTxnOps uint
}
func NewKVServer(s *etcdserver.EtcdServer) pb.KVServer {return &kvServer{hdr: newHeader(s), kv: s, maxTxnOps: s.Cfg.MaxTxnOps}
}
func (s *kvServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {if err := checkPutRequest(r); err != nil { return nil, err }resp, err := s.kv.Put(ctx, r)if err != nil { return nil, togRPCError(err) }s.hdr.fill(resp.Header)return resp, nil
}

kvServer结构体Put函数处理流程如下:首先会对请求消息进行各方面的检查,检查完之后会将所有的请求交给其内封装的RaftKV接口进行处理,待处理完成得到响应消息之后,会通过header.fill()方法填充响应的头信息,最后将完整的响应消息返回给客户端。因此,往下追踪 srv.(KVServer).Put(ctx, in)其实就是调用(s *EtcdServer) Put()函数。

// server/etcdserver/v3_server.go
func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {ctx = context.WithValue(ctx, traceutil.StartTimeKey, time.Now())resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Put: r})if err != nil { return nil, err }return resp.(*pb.PutResponse), nil
}
func (s *EtcdServer) raftRequest(ctx context.Context, r pb.InternalRaftRequest) (proto.Message, error) {return s.raftRequestOnce(ctx, r)
}
func (s *EtcdServer) raftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (proto.Message, error) {result, err := s.processInternalRaftRequestOnce(ctx, r)if err != nil { return nil, err }if result.Err != nil { return nil, result.Err }if startTime, ok := ctx.Value(traceutil.StartTimeKey).(time.Time); ok && result.Trace != nil {applyStart := result.Trace.GetStartTime()// The trace object is created in toApply. Here reset the start time to trace// the raft request time by the difference between the request start time// and toApply start timeresult.Trace.SetStartTime(startTime)result.Trace.InsertStep(0, applyStart, "process raft request")result.Trace.LogIfLong(traceThreshold)}return result.Resp, nil
}

从上面流程看出最终调用的是 (s *EtcdServer) processInternalRaftRequestOnce(…)函数,在该函数里面有一句关键调用 s.r.Propose(cctx, data)。s是EtcdServer, r是其里面的成员变量raftNode, 这就是进入raft协议相关的节奏了。

func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (*apply2.Result, error) {ai :&#61; s.getAppliedIndex()ci :&#61; s.getCommittedIndex()// 如果apply和commit的raft log差距太大需要拒绝客户的请求if ci > ai&#43;maxGapBetweenApplyAndCommitIndex { return nil, errors.ErrTooManyRequests }r.Header &#61; &pb.RequestHeader{ ID: s.reqIDGen.Next(), } // 利用reqIDGen产生递增标号// check authinfo if it is not InternalAuthenticateRequest 对于非InternalAuthenticateRequest的请求&#xff0c;需要检查authinfo的信息if r.Authenticate &#61;&#61; nil {authInfo, err :&#61; s.AuthInfoFromCtx(ctx)if err !&#61; nil { return nil, err }if authInfo !&#61; nil {r.Header.Username &#61; authInfo.Usernamer.Header.AuthRevision &#61; authInfo.Revision}}data, err :&#61; r.Marshal()if err !&#61; nil { return nil, err }if len(data) > int(s.Cfg.MaxRequestBytes) { return nil, errors.ErrRequestTooLarge } // 发送的数据太大id :&#61; r.IDif id &#61;&#61; 0 { id &#61; r.Header.ID }ch :&#61; s.w.Register(id) // Register waits returns a chan that waits on the given ID// The chan will be triggered when Trigger is called with the same ID.// 这里将id注册到etcdsever结构体中的Wait中&#xff0c;等待Wait.Trigger(id, nil)// 正常情况下当raft log被应用后会调用Wait.Trigger(id, nil)触发管道cctx, cancel :&#61; context.WithTimeout(ctx, s.Cfg.ReqTimeout())defer cancel()start :&#61; time.Now()err &#61; s.r.Propose(cctx, data) // 调用raft模块的s.r.Propose(cctx, data)if err !&#61; nil {proposalsFailed.Inc()s.w.Trigger(id, nil) // GC waitreturn nil, err}proposalsPending.Inc()defer proposalsPending.Dec()select {case x :&#61; <-ch: // Wait.Trigger(id, nil)管道被触发return x.(*apply2.Result), nilcase <-cctx.Done():proposalsFailed.Inc()s.w.Trigger(id, nil) // GC waitreturn nil, s.parseProposeCtxErr(cctx.Err(), start)case <-s.done:return nil, errors.ErrStopped}
}

欢迎关注微信公众号肥叔菌&#xff0c;最新内容优先在微信公众号发布。


推荐阅读
  • Imtryingtofigureoutawaytogeneratetorrentfilesfromabucket,usingtheAWSSDKforGo.我正 ... [详细]
  • 云原生边缘计算之KubeEdge简介及功能特点
    本文介绍了云原生边缘计算中的KubeEdge系统,该系统是一个开源系统,用于将容器化应用程序编排功能扩展到Edge的主机。它基于Kubernetes构建,并为网络应用程序提供基础架构支持。同时,KubeEdge具有离线模式、基于Kubernetes的节点、群集、应用程序和设备管理、资源优化等特点。此外,KubeEdge还支持跨平台工作,在私有、公共和混合云中都可以运行。同时,KubeEdge还提供数据管理和数据分析管道引擎的支持。最后,本文还介绍了KubeEdge系统生成证书的方法。 ... [详细]
  • 本文介绍了设计师伊振华受邀参与沈阳市智慧城市运行管理中心项目的整体设计,并以数字赋能和创新驱动高质量发展的理念,建设了集成、智慧、高效的一体化城市综合管理平台,促进了城市的数字化转型。该中心被称为当代城市的智能心脏,为沈阳市的智慧城市建设做出了重要贡献。 ... [详细]
  • SpringBoot uri统一权限管理的实现方法及步骤详解
    本文详细介绍了SpringBoot中实现uri统一权限管理的方法,包括表结构定义、自动统计URI并自动删除脏数据、程序启动加载等步骤。通过该方法可以提高系统的安全性,实现对系统任意接口的权限拦截验证。 ... [详细]
  • 本文介绍了OpenStack的逻辑概念以及其构成简介,包括了软件开源项目、基础设施资源管理平台、三大核心组件等内容。同时还介绍了Horizon(UI模块)等相关信息。 ... [详细]
  • 本文介绍了在go语言中利用(*interface{})(nil)传递参数类型的原理及应用。通过分析Martini框架中的injector类型的声明,解释了values映射表的作用以及parent Injector的含义。同时,讨论了该技术在实际开发中的应用场景。 ... [详细]
  • Linux服务器密码过期策略、登录次数限制、私钥登录等配置方法
    本文介绍了在Linux服务器上进行密码过期策略、登录次数限制、私钥登录等配置的方法。通过修改配置文件中的参数,可以设置密码的有效期、最小间隔时间、最小长度,并在密码过期前进行提示。同时还介绍了如何进行公钥登录和修改默认账户用户名的操作。详细步骤和注意事项可参考本文内容。 ... [详细]
  • 本文讨论了使用差分约束系统求解House Man跳跃问题的思路与方法。给定一组不同高度,要求从最低点跳跃到最高点,每次跳跃的距离不超过D,并且不能改变给定的顺序。通过建立差分约束系统,将问题转化为图的建立和查询距离的问题。文章详细介绍了建立约束条件的方法,并使用SPFA算法判环并输出结果。同时还讨论了建边方向和跳跃顺序的关系。 ... [详细]
  • 个人学习使用:谨慎参考1Client类importcom.thoughtworks.gauge.Step;importcom.thoughtworks.gauge.T ... [详细]
  • Go GUIlxn/walk 学习3.菜单栏和工具栏的具体实现
    本文介绍了使用Go语言的GUI库lxn/walk实现菜单栏和工具栏的具体方法,包括消息窗口的产生、文件放置动作响应和提示框的应用。部分代码来自上一篇博客和lxn/walk官方示例。文章提供了学习GUI开发的实际案例和代码示例。 ... [详细]
  • Go Cobra命令行工具入门教程
    本文介绍了Go语言实现的命令行工具Cobra的基本概念、安装方法和入门实践。Cobra被广泛应用于各种项目中,如Kubernetes、Hugo和Github CLI等。通过使用Cobra,我们可以快速创建命令行工具,适用于写测试脚本和各种服务的Admin CLI。文章还通过一个简单的demo演示了Cobra的使用方法。 ... [详细]
  • Go语言实现堆排序的详细教程
    本文主要介绍了Go语言实现堆排序的详细教程,包括大根堆的定义和完全二叉树的概念。通过图解和算法描述,详细介绍了堆排序的实现过程。堆排序是一种效率很高的排序算法,时间复杂度为O(nlgn)。阅读本文大约需要15分钟。 ... [详细]
  • JDK源码学习之HashTable(附带面试题)的学习笔记
    本文介绍了JDK源码学习之HashTable(附带面试题)的学习笔记,包括HashTable的定义、数据类型、与HashMap的关系和区别。文章提供了干货,并附带了其他相关主题的学习笔记。 ... [详细]
  • 海马s5近光灯能否直接更换为H7?
    本文主要介绍了海马s5车型的近光灯是否可以直接更换为H7灯泡,并提供了完整的教程下载地址。此外,还详细讲解了DSP功能函数中的数据拷贝、数据填充和浮点数转换为定点数的相关内容。 ... [详细]
  • 微信官方授权及获取OpenId的方法,服务器通过SpringBoot实现
    主要步骤:前端获取到code(wx.login),传入服务器服务器通过参数AppID和AppSecret访问官方接口,获取到OpenId ... [详细]
author-avatar
Panzerkampfwagen-VI_238
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有