ThinkChat🤖让你学习和工作更高效,注册即送10W Token,即刻开启你的AI之旅 广告
[TOC] # go操作etcd ~~~ go get -u go.etcd.io/etcd ~~~ 在import的时候 应该import “go.etcd.io/etcd/clientv3” 而不是 "github.com/coreos/etcd/clientv3" ## 连接 ~~~ import ( "fmt" "go.etcd.io/etcd/clientv3" "time" ) func main() { var ( config clientv3.Config //服务器配置 client *clientv3.Client err error ) //客户端配置 config = clientv3.Config{ Endpoints: []string{"127.0.0.1:2379"}, DialTimeout: 5 * time.Second, } //建立连接 if client, err = clientv3.New(config); err != nil { fmt.Println(err) return } defer client.Close() fmt.Println("连接成功") } ~~~ ## kv设置 ~~~ package main import ( "context" "fmt" "go.etcd.io/etcd/clientv3" "time" ) func main() { var ( config clientv3.Config //服务器配置 client *clientv3.Client //客户端连接对象 err error kv clientv3.KV //操作kv的对象 putResp *clientv3.PutResponse //设置kv的对象 ) //客户端配置 config = clientv3.Config{ Endpoints: []string{"127.0.0.1:2379"}, DialTimeout: 5 * time.Second, } //建立连接 if client, err = clientv3.New(config); err != nil { fmt.Println(err) return } //关闭连接 defer client.Close() //用于读写etcd的键值对 kv = clientv3.NewKV(client) //这里面有上下文,可以用来取消他 //第三个参数可选,clientv3.WithPrevKV()表示可以查到以前的kv if putResp, err = kv.Put(context.TODO(), "/cron/jobs/job1", "hello", clientv3.WithPrevKV()); err != nil { fmt.Println(err) } else { //每次操作有个唯一的Revision,单调递增 fmt.Println("Revision: ", putResp.Header.Revision) //打印之前的值 if putResp.PrevKv != nil { fmt.Println("之前的值: ", string(putResp.PrevKv.Key), "---", string(putResp.PrevKv.Value)) } } } ~~~ ## kv读取 ~~~ package main import ( "context" "fmt" "go.etcd.io/etcd/clientv3" "time" ) func main() { var ( config clientv3.Config //服务器配置 client *clientv3.Client //客户端连接对象 err error kv clientv3.KV //操作kv的对象 getResp *clientv3.GetResponse //获取kv的对象 ) //客户端配置 config = clientv3.Config{ Endpoints: []string{"127.0.0.1:2379"}, DialTimeout: 5 * time.Second, } //建立连接 if client, err = clientv3.New(config); err != nil { fmt.Println(err) return } //关闭连接 defer client.Close() //用于读写etcd的键值对 kv = clientv3.NewKV(client) //第三个参数也是可选的 clientv3. if getResp, err = kv.Get(context.TODO(), "/cron/jobs/job1"); err != nil { fmt.Println(err) } else { //打印出来的create_revision是创建版本,mod_revision是修改版本, version是修改的次数 fmt.Println(getResp.Kvs) } } ~~~ ## 以什么为前缀查找 ~~~ func main() { var ( config clientv3.Config //服务器配置 client *clientv3.Client //客户端连接对象 err error kv clientv3.KV //操作kv的对象 getResp *clientv3.GetResponse //获取kv的对象 ) //客户端配置 config = clientv3.Config{ Endpoints: []string{"127.0.0.1:2379"}, DialTimeout: 5 * time.Second, } //建立连接 if client, err = clientv3.New(config); err != nil { fmt.Println(err) return } //关闭连接 defer client.Close() //用于读写etcd的键值对 kv = clientv3.NewKV(client) //第三个参数也是可选的 clientv3. //以什么为前缀的 if getResp, err = kv.Get(context.TODO(), "/cron/jobs/", clientv3.WithPrefix()); err != nil { fmt.Println(err) } else { //数组 //打印出来的create_revision是创建版本,mod_revision是修改版本, version是修改的次数 fmt.Println(getResp.Kvs) for k, v := range getResp.Kvs { fmt.Println(k) fmt.Println(v) } } } ~~~ ## 删除key ~~~ func main() { var ( config clientv3.Config //服务器配置 client *clientv3.Client //客户端连接对象 err error kv clientv3.KV //操作kv的对象 deleteResp *clientv3.DeleteResponse //删除的对象 ) //客户端配置 config = clientv3.Config{ Endpoints: []string{"127.0.0.1:2379"}, DialTimeout: 5 * time.Second, } //建立连接 if client, err = clientv3.New(config); err != nil { fmt.Println(err) return } //关闭连接 defer client.Close() //用于读写etcd的键值对 kv = clientv3.NewKV(client) //删除 //第三个参数可选,clientv3.WithPrevKV()表示会赋值deleteResp.PrevKvs if deleteResp, err = kv.Delete(context.TODO(), "/cron/jobs/job1", clientv3.WithPrevKV()); err != nil { fmt.Println(err) return } //被删除之前的k和v,上面第三个参数要设置,否则是没有的 if len(deleteResp.PrevKvs) != 0 { for _, v := range deleteResp.PrevKvs { fmt.Println(string(v.Key)) fmt.Println(string(v.Value)) } } } ~~~ ## 删除多个key,以什么为前缀 ~~~ if deleteResp, err = kv.Delete(context.TODO(), "/cron/jobs/", clientv3.WithPrefix()); err != nil { fmt.Println(err) return } ~~~ ## 删除连续的2个key ~~~ if deleteResp, err = kv.Delete(context.TODO(), "/cron/jobs/job1", clientv3.WithFromKey(), clientv3.WithLimit(2)); err != nil { fmt.Println(err) return } ~~~ ## 租约 ~~~ import ( "context" "fmt" "go.etcd.io/etcd/clientv3" "time" ) func main() { var ( config clientv3.Config //服务器配置 client *clientv3.Client //客户端连接对象 err error lease clientv3.Lease //租约对象 leaseGrantResp *clientv3.LeaseGrantResponse //申请到的租约 leaseId clientv3.LeaseID //租约id putResp *clientv3.PutResponse //PUT对象 getResp *clientv3.GetResponse //get对象 kv clientv3.KV //kv操作对象 ) //客户端配置 config = clientv3.Config{ Endpoints: []string{"127.0.0.1:2379"}, DialTimeout: 5 * time.Second, } //建立连接 if client, err = clientv3.New(config); err != nil { fmt.Println(err) return } //关闭连接 defer client.Close() //申请一个租约(lease) lease = clientv3.NewLease(client) //申请一个3秒的租约 if leaseGrantResp, err = lease.Grant(context.TODO(), 3); err != nil { fmt.Println(err) return } //拿到租约的id leaseId = leaseGrantResp.ID fmt.Println("租约id: ", leaseId) //获得kv对象 kv = clientv3.NewKV(client) //put一个kv,让他与租约关联起来,从而实现10秒后自动过期 //第三个参数是具体的值 if putResp, err = kv.Put(context.TODO(), "/cron/lock/job1", "hello", clientv3.WithLease(leaseId)); err != nil { fmt.Println(err) return } fmt.Println("写入成功: ", putResp.Header.Revision) //time.Sleep(5 * time.Second) //获取下kv if getResp, err = kv.Get(context.TODO(), "/cron/lock/job1"); err != nil { fmt.Println(err) return } if getResp.Count == 0 { fmt.Println("kv过期了") return } fmt.Println("读取成功: ", getResp.Kvs) } ~~~ ## 续租 ~~~ package main import ( "context" "fmt" "go.etcd.io/etcd/clientv3" "time" ) func main() { var ( config clientv3.Config //服务器配置 client *clientv3.Client //客户端连接对象 err error lease clientv3.Lease //租约对象 leaseGrantResp *clientv3.LeaseGrantResponse //申请到的租约 leaseId clientv3.LeaseID //租约id putResp *clientv3.PutResponse //PUT对象 getResp *clientv3.GetResponse //get对象 kv clientv3.KV //kv操作对象 keepResp *clientv3.LeaseKeepAliveResponse //续租channel中的对象 keepRespChan <-chan *clientv3.LeaseKeepAliveResponse //续租的channel ) //客户端配置 config = clientv3.Config{ Endpoints: []string{"127.0.0.1:2379"}, DialTimeout: 5 * time.Second, } //建立连接 if client, err = clientv3.New(config); err != nil { fmt.Println(err) return } //关闭连接 defer client.Close() //申请一个租约(lease) lease = clientv3.NewLease(client) //申请一个10秒的租约 if leaseGrantResp, err = lease.Grant(context.TODO(), 3); err != nil { fmt.Println(err) return } //拿到租约的id leaseId = leaseGrantResp.ID fmt.Println("租约id: ", leaseId) //自动续租 KeepAliveOnce只续租一次 KeepAlive是一直续租里面有个协程维护着 if keepRespChan, err = lease.KeepAlive(context.TODO(), leaseId); err != nil { fmt.Println(err) return } //启动协程,消费这个租约 go func() { for { select { //每秒续租一次 case keepResp = <-keepRespChan: //如果维护租约中发生异常,网络重新连接后发现租约过期的话,或者我主动把context取消 if keepResp == nil { fmt.Println("租约已经失效了") //退出循环 goto END } else { //续租一切正常,打印租约id fmt.Println("收到自动续租应答: ", keepResp.ID) } } } END: }() //获得kv对象 kv = clientv3.NewKV(client) //put一个kv,让他与租约关联起来,从而实现10秒后自动过期 //第三个参数是具体的值 if putResp, err = kv.Put(context.TODO(), "/cron/lock/job1", "hello", clientv3.WithLease(leaseId)); err != nil { fmt.Println(err) return } fmt.Println("写入成功: ", putResp.Header.Revision) //time.Sleep(5 * time.Second) //获取下kv if getResp, err = kv.Get(context.TODO(), "/cron/lock/job1"); err != nil { fmt.Println(err) return } if getResp.Count == 0 { fmt.Println("kv过期了") return } fmt.Println("读取成功: ", getResp.Kvs) for { ; } } ~~~ ## op封装get和put ~~~ import ( "context" "fmt" "go.etcd.io/etcd/clientv3" "time" ) func main() { var ( config clientv3.Config //服务器配置 client *clientv3.Client //客户端连接对象 err error kv clientv3.KV //kv操作对象 putOp clientv3.Op //op对象,赋值的 getOp clientv3.Op //op对象, 获取值的 opResp clientv3.OpResponse //op执行的返回结果 ) //客户端配置 config = clientv3.Config{ Endpoints: []string{"127.0.0.1:2379"}, DialTimeout: 5 * time.Second, } //建立连接 if client, err = clientv3.New(config); err != nil { fmt.Println(err) return } //关闭连接 defer client.Close() //获得kv对象 kv = clientv3.NewKV(client) //Op:opeartion 代表一个操作,具体操作封装在里面 putOp = clientv3.OpPut("/cron/jobs/job8", "111") //执行op if opResp, err = kv.Do(context.TODO(), putOp); err != nil { fmt.Println(err) return } //把opeartion变为put的 opResp.Put() fmt.Println("写入Revision: ", opResp.Put().Header.Revision) //get的 getOp = clientv3.OpGet("/cron/jobs/job8") //执行op if opResp, err = kv.Do(context.TODO(), getOp); err != nil { fmt.Println(err) return } fmt.Println("读取数据: ", opResp.Get().Kvs) } ~~~ # 分布式锁 ~~~ package main import ( "context" "fmt" "go.etcd.io/etcd/clientv3" "time" ) func main() { var ( config clientv3.Config //服务器配置 client *clientv3.Client //客户端连接对象 err error kv clientv3.KV //kv操作对象 lease clientv3.Lease //租约对象 leaseGrantResp *clientv3.LeaseGrantResponse //申请到的租约 leaseId clientv3.LeaseID //租约id keepResp *clientv3.LeaseKeepAliveResponse //续租channel中的对象 keepRespChan <-chan *clientv3.LeaseKeepAliveResponse //续租的channel ctx context.Context //创建一个用于取消租约的context cancelFunc context.CancelFunc //取消上下文 txn clientv3.Txn //事务 txnResp *clientv3.TxnResponse //事务提交的返回值 ) //客户端配置 config = clientv3.Config{ Endpoints: []string{"127.0.0.1:2379"}, DialTimeout: 5 * time.Second, } //建立连接 if client, err = clientv3.New(config); err != nil { fmt.Println(err) return } //关闭连接 defer client.Close() //获得kv对象 kv = clientv3.NewKV(client) //lease实现锁自动过期 //op操作 //txn事务: if else then //1.上锁(创建租约,自动续租,拿着租约去抢占一个key) //申请一个租约(lease) lease = clientv3.NewLease(client) //申请一个5秒的租约 if leaseGrantResp, err = lease.Grant(context.TODO(), 5); err != nil { fmt.Println(err) return } //拿到租约的id leaseId = leaseGrantResp.ID //准备一个用于取消自动续租的context ctx, cancelFunc = context.WithCancel(context.TODO()) //确保函数退出后,自动续租会停止 defer cancelFunc() //确保租约释放 defer lease.Revoke(context.TODO(), leaseId) //自动续租 KeepAliveOnce只续租一次 KeepAlive是一直续租里面有个协程维护着 if keepRespChan, err = lease.KeepAlive(ctx, leaseId); err != nil { fmt.Println(err) return } //启动协程,消费这个租约 go func() { for { select { //每秒续租一次 case keepResp = <-keepRespChan: //如果维护租约中发生异常,网络重新连接后发现租约过期的话,或者我主动把context取消 if keepResp == nil { fmt.Println("租约已经失效了") //退出循环 goto END } else { //续租一切正常,打印租约id fmt.Println("收到自动续租应答: ", keepResp.ID) } } } END: }() //if 不存在key,then设置他,else抢锁失败 //创建事务 txn = kv.Txn(context.TODO()) //定义事务 //job9的创建版本=0,满足了说明key不存在.满足就走then,不满足就走else txn.If(clientv3.Compare(clientv3.CreateRevision("/cron/lock/job9"), "=", 0)). Then(clientv3.OpPut("/cron/lock/job9", "", clientv3.WithLease(leaseId))). Else(clientv3.OpGet("/cron/lock/job9")) //提交事务 if txnResp, err = txn.Commit(); err != nil { fmt.Println(err) return } //判断是否抢到了锁 if !txnResp.Succeeded { //没抢到锁的话,取else部分的返回值 fmt.Println("锁被占用: ", txnResp.Responses[0].GetResponseRange().Kvs) } //2. 处理业务 //在锁内很安全 fmt.Println("----------处理任务") time.Sleep(5 * time.Second) //3. 释放锁(取消自动续租,释放租约,一释放与租约关联的key就被删除了) //defer会把租约释放掉,关联的kv就被删除了 } ~~~