热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

ETCD组件在grpc中的实践

一、前言grpc中没有像go-micro那样集成可插拔式的etcd库使用,如何使得grpc能够使用服务注册发现及命名解析的功能,因此本文基于etcd实现了NameResolver。

一、前言

grpc中没有像go-micro那样集成可插拔式的etcd库使用,如何使得grpc能够使用服务注册发现及命名解析的功能,因此本文基于etcd实现了Name Resolver。

 

二、所需的grpc版本及高版本grpc、protobuf与etcd兼容问题

grpc相关库:

google.golang.org/grpc v1.26.0

google.golang.org/grpc/balancer/roundrobin

google.golang.org/grpc/resolver

 

etcd相关库:

go.etcd.io/etcd/clientv3

github.com/coreos/etcd/mvcc/mvccpb

 

此处需要注意的是,新版本grpc不兼容etcd相关库, 如果grpc版本大于1.26.0或者protobuf版本过高会出现以下问题:

1. grpc版本过高,新版本不支持etcd  需降级

# github.com/coreos/etcd/clientv3/balancer/picker
../../vendor/github.com/coreos/etcd/clientv3/balancer/picker/err.go:37:44: undefined: balancer.PickOptions
../../vendor/github.com/coreos/etcd/clientv3/balancer/picker/roundrobin_balanced.go:55:54: undefined: balancer.PickOptions
# github.com/coreos/etcd/clientv3/balancer/resolver/endpoint
../../vendor/github.com/coreos/etcd/clientv3/balancer/resolver/endpoint/endpoint.go:114:78: undefined: resolver.BuildOption
../../vendor/github.com/coreos/etcd/clientv3/balancer/resolver/endpoint/endpoint.go:182:31: undefined: resolver.ResolveNowOption

解决办法:在go.mod 加入:replace google.golang.org/grpc => google.golang.org/grpc v1.26.0

2. grpc版本过低  protobuf版本过高

# ut-blogger/api/protos/user
../../api/protos/user/user.pb.go:327:11: undefined: grpc.SupportPackageIsVersion6
../../api/protos/user/user.pb.go:338:5: undefined: grpc.ClientConnInterface

解决办法:降级protoc-gen-go的版本 go get [email protected] 重新生成proto

 

三、自定义实现naming

package etcdservice
import (
"context"
"go.etcd.io/etcd/clientv3"
"log"
"strings"
"time"
)
const Schema = "grpcEtcd"
// Register 注册地址到ETCD组件中 使用 ; 分割
func Register(etcdAddr, name string, addr string, ttl int64) error {
var err error
if cli == nil {
cli, err = clientv3.New(clientv3.Config{
Endpoints: strings.Split(etcdAddr, ";"),
DialTimeout: 15 * time.Second,
})
if err != nil {
log.Printf("connect to etcd err:%s", err)
return err
}
}
ticker := time.NewTicker(time.Second * time.Duration(ttl))
go func() {
for {
getResp, err := cli.Get(context.Background(), "/"+Schema+"/"+name+"/"+addr)
if err != nil {
log.Printf("getResp:%+v\n", getResp)
log.Printf("Register:%s", err)
} else if getResp.Count == 0 {
err = withAlive(name, addr, ttl)
if err != nil {
log.Printf("keep alive:%s", err)
}
}
<-ticker.C
}
}()
return nil
}
// withAlive 创建租约
func withAlive(name string, addr string, ttl int64) error {
leaseResp, err := cli.Grant(context.Background(), ttl)
if err != nil {
return err
}
log.Printf("key:%v\n", "/"+Schema+"/"+name+"/"+addr)
_, err = cli.Put(context.Background(), "/"+Schema+"/"+name+"/"+addr, addr, clientv3.WithLease(leaseResp.ID))
if err != nil {
log.Printf("put etcd error:%s", err)
return err
}
ch, err := cli.KeepAlive(context.Background(), leaseResp.ID)
if err != nil {
log.Printf("keep alive error:%s", err)
return err
}
// 清空 keep alive 返回的channel
go func() {
for {
<-ch
}
}()
return nil
}
// UnRegister remove service from etcd
func UnRegister(name string, addr string) {
if cli != nil {
cli.Delete(context.Background(), "/"+Schema+"/"+name+"/"+addr)
}
}

 

四、自定义实现resolver

package etcdservice
import (
"context"
"log"
"strings"
"time"
"go.etcd.io/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb"
"google.golang.org/grpc/resolver"
)
var cli *clientv3.Client
// etcdResolver 解析struct
type etcdResolver struct {
rawAddr string
cc resolver.ClientConn
}
// NewResolver initialize an etcd client
func NewResolver(etcdAddr string) resolver.Builder {
return &etcdResolver{rawAddr: etcdAddr}
}
// Build 构建etcd client
func (r *etcdResolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
var err error
if cli == nil {
cli, err = clientv3.New(clientv3.Config{
Endpoints: strings.Split(r.rawAddr, ";"),
DialTimeout: 15 * time.Second,
})
if err != nil {
return nil, err
}
}
r.cc = cc
go r.watch("/" + target.Scheme + "/" + target.Endpoint + "/")
return r, nil
}
// Scheme etcd resolve scheme
func (r etcdResolver) Scheme() string {
return Schema
}
// ResolveNow
func (r etcdResolver) ResolveNow(rn resolver.ResolveNowOptions) {
log.Println("ResolveNow")
}
// Close closes the resolver
func (r etcdResolver) Close() {
log.Println("Close")
}
// watch 监听resolve列表变化
func (r *etcdResolver) watch(keyPrefix string) {
var addrList []resolver.Address
getResp, err := cli.Get(context.Background(), keyPrefix, clientv3.WithPrefix())
if err != nil {
log.Println(err)
} else {
for i := range getResp.Kvs {
addrList = append(addrList, resolver.Address{Addr: strings.TrimPrefix(string(getResp.Kvs[i].Key), keyPrefix)})
}
}
// 新版本etcd去除了NewAddress方法 以UpdateState代替
r.cc.UpdateState(resolver.State{Addresses: addrList})
rch := cli.Watch(context.Background(), keyPrefix, clientv3.WithPrefix())
for n := range rch {
for _, ev := range n.Events {
addr := strings.TrimPrefix(string(ev.Kv.Key), keyPrefix)
switch ev.Type {
case mvccpb.PUT:
if !exist(addrList, addr) {
addrList = append(addrList, resolver.Address{Addr: addr})
r.cc.UpdateState(resolver.State{Addresses: addrList})
}
case mvccpb.DELETE:
if s, ok := remove(addrList, addr); ok {
addrList = s
r.cc.UpdateState(resolver.State{Addresses: addrList})
}
}
log.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
}
}
}
// exist 判断resolve address是否存在
func exist(l []resolver.Address, addr string) bool {
for i := range l {
if l[i].Addr == addr {
return true
}
}
return false
}
// remove 从resolver列表移除
func remove(s []resolver.Address, addr string) ([]resolver.Address, bool) {
for i := range s {
if s[i].Addr == addr {
s[i] = s[len(s)-1]
return s[:len(s)-1], true
}
}
return nil, false
}

 

五、服务端调用ETCD服务注册

EtcdAddr、ServiceName和Ttl可以从配置文件读取,读取配置文件的方式很多,本文不在此阐述

//将服务地址注册到etcd中
go etcdservice.Register(s.o.EtcdAddr, s.o.ServiceName, addr, s.o.Ttl)
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGQUIT)
go func() {
sig := <-ch
etcdservice.UnRegister(s.o.ServiceName, addr)
if i, ok := sig.(syscall.Signal); ok {
os.Exit(int(i))
} else {
os.Exit(0)
}
}()

 

六、 客户端grpc服务 ectd服务发现

客户端调用grpc服务以serviceName的形式从etcd中获取服务节点,此处采用roundrobin的形式作为负载均衡

//注册etcd解析器
r := etcdservice.NewResolver(o.EtcdAddr)
resolver.Register(r)
// 客户端连接服务器
conn, err := grpc2.Dial(r.Scheme()+"://"+o.Caller+"/"+o.Callee, grpc2.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, roundrobin.Name)), grpc2.WithInsecure())
if err != nil {
log.Println("连接服务器失败", err)
return nil, errors.Wrap(err, "notify client dial error")
}

 



推荐阅读
  • 本文介绍了设计师伊振华受邀参与沈阳市智慧城市运行管理中心项目的整体设计,并以数字赋能和创新驱动高质量发展的理念,建设了集成、智慧、高效的一体化城市综合管理平台,促进了城市的数字化转型。该中心被称为当代城市的智能心脏,为沈阳市的智慧城市建设做出了重要贡献。 ... [详细]
  • 关键词:Golang, Cookie, 跟踪位置, net/http/cookiejar, package main, golang.org/x/net/publicsuffix, io/ioutil, log, net/http, net/http/cookiejar ... [详细]
  • 本文由编程笔记#小编为大家整理,主要介绍了logistic回归(线性和非线性)相关的知识,包括线性logistic回归的代码和数据集的分布情况。希望对你有一定的参考价值。 ... [详细]
  • 本文介绍了如何使用PHP向系统日历中添加事件的方法,通过使用PHP技术可以实现自动添加事件的功能,从而实现全局通知系统和迅速记录工具的自动化。同时还提到了系统exchange自带的日历具有同步感的特点,以及使用web技术实现自动添加事件的优势。 ... [详细]
  • Python实现变声器功能(萝莉音御姐音)的方法及步骤
    本文介绍了使用Python实现变声器功能(萝莉音御姐音)的方法及步骤。首先登录百度AL开发平台,选择语音合成,创建应用并填写应用信息,获取Appid、API Key和Secret Key。然后安装pythonsdk,可以通过pip install baidu-aip或python setup.py install进行安装。最后,书写代码实现变声器功能,使用AipSpeech库进行语音合成,可以设置音量等参数。 ... [详细]
  • Linux重启网络命令实例及关机和重启示例教程
    本文介绍了Linux系统中重启网络命令的实例,以及使用不同方式关机和重启系统的示例教程。包括使用图形界面和控制台访问系统的方法,以及使用shutdown命令进行系统关机和重启的句法和用法。 ... [详细]
  • Android Studio Bumblebee | 2021.1.1(大黄蜂版本使用介绍)
    本文介绍了Android Studio Bumblebee | 2021.1.1(大黄蜂版本)的使用方法和相关知识,包括Gradle的介绍、设备管理器的配置、无线调试、新版本问题等内容。同时还提供了更新版本的下载地址和启动页面截图。 ... [详细]
  • 本文讨论了一个关于cuowu类的问题,作者在使用cuowu类时遇到了错误提示和使用AdjustmentListener的问题。文章提供了16个解决方案,并给出了两个可能导致错误的原因。 ... [详细]
  • 本文介绍了Hyperledger Fabric外部链码构建与运行的相关知识,包括在Hyperledger Fabric 2.0版本之前链码构建和运行的困难性,外部构建模式的实现原理以及外部构建和运行API的使用方法。通过本文的介绍,读者可以了解到如何利用外部构建和运行的方式来实现链码的构建和运行,并且不再受限于特定的语言和部署环境。 ... [详细]
  • sklearn数据集库中的常用数据集类型介绍
    本文介绍了sklearn数据集库中常用的数据集类型,包括玩具数据集和样本生成器。其中详细介绍了波士顿房价数据集,包含了波士顿506处房屋的13种不同特征以及房屋价格,适用于回归任务。 ... [详细]
  • XML介绍与使用的概述及标签规则
    本文介绍了XML的基本概念和用途,包括XML的可扩展性和标签的自定义特性。同时还详细解释了XML标签的规则,包括标签的尖括号和合法标识符的组成,标签必须成对出现的原则以及特殊标签的使用方法。通过本文的阅读,读者可以对XML的基本知识有一个全面的了解。 ... [详细]
  • 本文介绍了Web学习历程记录中关于Tomcat的基本概念和配置。首先解释了Web静态Web资源和动态Web资源的概念,以及C/S架构和B/S架构的区别。然后介绍了常见的Web服务器,包括Weblogic、WebSphere和Tomcat。接着详细讲解了Tomcat的虚拟主机、web应用和虚拟路径映射的概念和配置过程。最后简要介绍了http协议的作用。本文内容详实,适合初学者了解Tomcat的基础知识。 ... [详细]
  • 展开全部下面的代码是创建一个立方体Thisexamplescreatesanddisplaysasimplebox.#Thefirstlineloadstheinit_disp ... [详细]
  • 不同优化算法的比较分析及实验验证
    本文介绍了神经网络优化中常用的优化方法,包括学习率调整和梯度估计修正,并通过实验验证了不同优化算法的效果。实验结果表明,Adam算法在综合考虑学习率调整和梯度估计修正方面表现较好。该研究对于优化神经网络的训练过程具有指导意义。 ... [详细]
  • 个人学习使用:谨慎参考1Client类importcom.thoughtworks.gauge.Step;importcom.thoughtworks.gauge.T ... [详细]
author-avatar
就是!有梦想
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有