高可用的分布式key-value存储,可用于配置共享和服务发现,类似项目有zookeeper
提供restful的http接口,使用简单。基于raft算法(主从、选举等)的强一致性,高可用的服务存储目录
应用场景:服务发现和服务注册,配置中心,分布式锁,master选举
参考:
https://coreos.com/etcd/docs/latest/demo.html
https://github.com/etcd-io/etcd
1、环境变量设置ETCDCTL_API=3
2、启动etcd:
./etcd.exe
提示已在指定ip和端口启动
3、通过使用 etcdctl 来和已经启动的集群交互:
./etcdctl.exe --write-out=table --endpoints=localhost:2379 member list
多台机器集群参考http://etcd.doczh.cn/documentation/op-guide/clustering.html
启动etcd.exe后
package main
import (
"context"
"fmt"
"go.etcd.io/etcd/clientv3"
"log"
"strconv"
"time"
)
func main() {
config := clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 10 * time.Second,
}
cli, err := clientv3.New(config)
if err != nil {
panic(err)
}
defer cli.Close()
go put(cli)
rch := cli.Watch(context.Background(), "foo", clientv3.WithPrefix())
for wresp := range rch {
for _, ev := range wresp.Events {
fmt.Printf("%s %q : %qn", ev.Type, ev.Kv.Key, ev.Kv.Value)
}
}
}
func put(cli *clientv3.Client) {
num := 1
for {
time.Sleep(time.Second)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
_, err := cli.Put(ctx, "foo", strconv.Itoa(num))
cancel()
if err != nil {
log.Fatal(err)
}
num += 1
}
}
//output
//PUT "foo" : "1"
//PUT "foo" : "2"
//PUT "foo" : "3"
//PUT "foo" : "4"
//。。。
type KV interface {
Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)
Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)
Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error)
Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error)
Do(ctx context.Context, op Op) (OpResponse, error)
Txn(ctx context.Context) Txn
}
kv
//put操作
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
_, err = cli.Put(ctx, "sample_key", "sample_value")
cancel()
if err != nil {
log.Fatal(err)
}
----------
//处理put的错误情况
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
_, err = cli.Put(ctx, "", "sample_value")
cancel()
if err != nil {
switch err {
case context.Canceled:
fmt.Printf("ctx is canceled by another routine: %vn", err)
case context.DeadlineExceeded:
fmt.Printf("ctx is attached with a deadline is exceeded: %vn", err)
case rpctypes.ErrEmptyKey:
fmt.Printf("client-side error: %vn", err)
default:
fmt.Printf("bad cluster endpoints, which are not etcd servers: %vn", err)
}
}
// Output: client-side error: etcdserver: key is not provided
----------
//get操作
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
resp, err := cli.Get(ctx, "foo")
cancel()
if err != nil {
log.Fatal(err)
}
for _, ev := range resp.Kvs {
fmt.Printf("%s : %sn", ev.Key, ev.Value)
}
// Output: foo : bar
----------
//get某key的某个历史版本
//presp.Header.Revision是int64类型,可以直接填入int64(1)、int64(2)等等获取对应历史版本
presp, err := cli.Put(context.TODO(), "foo", "bar1")
if err != nil {
log.Fatal(err)
}
_, err = cli.Put(context.TODO(), "foo", "bar2")
if err != nil {
log.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
resp, err := cli.Get(ctx, "foo", clientv3.WithRev(presp.Header.Revision))
cancel()
if err != nil {
log.Fatal(err)
}
for _, ev := range resp.Kvs {
fmt.Printf("%s : %sn", ev.Key, ev.Value)
}
// Output: foo : bar1
----------
//排序取出
for i := range make([]int, 3) {
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
_, err = cli.Put(ctx, fmt.Sprintf("key_%d", i), "value")
cancel()
if err != nil {
log.Fatal(err)
}
}
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
//获得以"key"为前缀的所有key,并以key为排序依据逆序取出
resp, err := cli.Get(ctx, "key", clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortDescend))
cancel()
if err != nil {
log.Fatal(err)
}
for _, ev := range resp.Kvs {
fmt.Printf("%s : %sn", ev.Key, ev.Value)
}
// Output:
// key_2 : value
// key_1 : value
// key_0 : value
----------
//delete
// count keys about to be deleted
gresp, err := cli.Get(ctx, "key", clientv3.WithPrefix())
if err != nil {
log.Fatal(err)
}
// delete the keys
dresp, err := cli.Delete(ctx, "key", clientv3.WithPrefix())
if err != nil {
log.Fatal(err)
}
fmt.Println("Deleted all keys:", int64(len(gresp.Kvs)) == dresp.Deleted)
// Output:
// Deleted all keys: true
----------
//compact,删除某版本之前的历史修改
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
resp, err := cli.Get(ctx, "foo")
cancel()
if err != nil {
log.Fatal(err)
}
compRev := resp.Header.Revision // 选择一个版本,删除其之前的版本
ctx, cancel = context.WithTimeout(context.Background(), requestTimeout)
_, err = cli.Compact(ctx, compRev)
cancel()
if err != nil {
log.Fatal(err)
}
----------
//txn,将几个请求放入一个事务中执行
kvc := clientv3.NewKV(cli)
_, err = kvc.Put(context.TODO(), "key", "xyz")
if err != nil {
log.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
_, err = kvc.Txn(ctx).
If(clientv3.Compare(clientv3.Value("key"), ">", "abc")).
Then(clientv3.OpPut("key", "XYZ")). // 因为"xyz" > "abc"所以执行了这一条,修改为"XYZ"
Else(clientv3.OpPut("key", "ABC")). // 不执行
Commit()
cancel()
if err != nil {
log.Fatal(err)
}
----------
//do,不采用事务,执行多条指令
ops := []clientv3.Op{
clientv3.OpPut("put-key", "123"),
clientv3.OpGet("put-key"),
clientv3.OpPut("put-key", "456")}
for _, op := range ops {
if _, err := cli.Do(context.TODO(), op); err != nil {
log.Fatal(err)
}
}
watch
func Example() {
config := clientv3.Config{
Endpoints:[]string{"127.0.0.1:2379"},
DialTimeout:10*time.Second,
}
cli,err := clientv3.New(config)
if err != nil {panic(err)}
defer cli.Close()
------------------------------------------------------
//watch基本操作,监控对key的put、delete操作,并返回新值。put的值和之前相同也会返回
rch := cli.Watch(context.Background(), "foo") //返回一个channel类型
for wresp := range rch { //阻塞直到rch有值,然后等待下一个操作
for _, ev := range wresp.Events {
fmt.Printf("%s %q : %qn", ev.Type, ev.Kv.Key, ev.Kv.Value)
}
}
// PUT "foo" : "bar"
}
----------
//watch以"foo"为前缀的key
rch := cli.Watch(context.Background(), "foo", clientv3.WithPrefix())
for wresp := range rch {
for _, ev := range wresp.Events {
fmt.Printf("%s %q : %qn", ev.Type, ev.Kv.Key, ev.Kv.Value)
}
}
----------
// watche在['foo1', 'foo4')范围内的key。具体为foo1、foo2、foo3。
rch := cli.Watch(context.Background(), "foo1", clientv3.WithRange("foo4"))
for wresp := range rch {
for _, ev := range wresp.Events {
fmt.Printf("%s %q : %qn", ev.Type, ev.Kv.Key, ev.Kv.Value)
}
}
----------
//ProgressNotify
rch := cli.Watch(context.Background(), "foo", clientv3.WithProgressNotify())
wresp := <-rch
fmt.Printf("wresp.Header.Revision: %dn", wresp.Header.Revision)
fmt.Println("wresp.IsProgressNotify:", wresp.IsProgressNotify())
// wresp.Header.Revision: 0
// wresp.IsProgressNotify: true
// IsProgressNotify returns true if the WatchResponse is progress notification
其他
func Examples() {
cli, err := clientv3.New(clientv3.Config{
Endpoints:[]string{"127.0.0.1:2379"},
DialTimeout: dialTimeout,
})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
//cluster操作
---------------
resp, err := cli.MemberList(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Println("members:", len(resp.Members))
// Output: members: 3
---------------
peerURLs := endpoints[2:]
mresp, err := cli.MemberAdd(context.Background(), peerURLs)
if err != nil {
log.Fatal(err)
}
fmt.Println("added member.PeerURLs:", mresp.Member.PeerURLs)
// added member.PeerURLs: [http://localhost:32380]
--------------
resp, err := cli.MemberList(context.Background())
if err != nil {
log.Fatal(err)
}
_, err = cli.MemberRemove(context.Background(), resp.Members[0].ID)
if err != nil {
log.Fatal(err)
}
-------------
resp, err := cli.MemberList(context.Background())
if err != nil {
log.Fatal(err)
}
peerURLs := []string{"http://localhost:12380"}
_, err = cli.MemberUpdate(context.Background(), resp.Members[0].ID, peerURLs)
if err != nil {
log.Fatal(err)
}
-------------------------------------------------------------
//lease,超时过期的key
-------------
// minimum lease TTL is 5-second
resp, err := cli.Grant(context.TODO(), 5)
if err != nil {
log.Fatal(err)
}
// after 5 seconds, the key 'foo' will be removed
_, err = cli.Put(context.TODO(), "foo", "bar", clientv3.WithLease(resp.ID))
if err != nil {
log.Fatal(err)
}
-------------
//不必等超时,立即使和resp.ID相关的key无效
_, err = cli.Revoke(context.TODO(), resp.ID)
if err != nil {
log.Fatal(err)
}
-------------
// 与resp.ID关联的key永久有效
ch, kaerr := cli.KeepAlive(context.TODO(), resp.ID)
if kaerr != nil {
log.Fatal(kaerr)
}
}
-------------------------------------------------------------
//maintenance,维护
-------------
resp, err := cli.Status(context.Background(), "127.0.0.1:2379")
if err != nil {
log.Fatal(err)
}
fmt.Printf("endpoint: %s / Leader: %vn", "127.0.0.1:2379", resp.Header.MemberId == resp.Leader)
//输出:endpoint: 127.0.0.1:2379 / Leader: true
-------------
//删除大量key之后可以用此去除垃圾空间
if _, err = cli.Defragment(context.TODO(), 127.0.0.1:2379); err != nil {
log.Fatal(err)
}
------------------------------------------------------------
//metrics
//待续。。。