123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184 |
- package main
- import (
- "fmt"
- "go.etcd.io/etcd/client/v3"
- "go.etcd.io/etcd/client/v3/concurrency"
- "log"
- "strings"
- "time"
- )
- //func main() {
- // var (
- // client *clientv3.Client
- // config clientv3.Config
- // err error
- // )
- //
- // config = clientv3.Config{
- // // 这里的 Endpoints 是一个字符串数组切片,支持配合多个节点
- // Endpoints: []string{"106.54.33.152:2379"},
- // // DialTimeout 连接超时设置
- // DialTimeout: time.Duration(5) * time.Millisecond,
- // }
- // if client, err = clientv3.New(config); err != nil {
- // return
- // }
- // kv := client.KV
- // ctx := context.Background()
- // c := map[string]interface{}{
- // "id": 3,
- // "name": "demo",
- // "age": 30,
- // }
- // data, _ := json.Marshal(c)
- //
- // var s = "/hstack/nodes/"
- // var nodes = []string{
- // "1.1.1.1",
- // "2.2.2.2",
- // "3.3.3.3",
- // }
- //
- // for _, node := range nodes {
- // newnodeKey := fmt.Sprintf("%s%s", s, node)
- // kv.Put(ctx, newnodeKey, string(data))
- // }
- //
- // get, err := kv.Get(ctx, s, clientv3.WithPrefix())
- // if err != nil {
- // return
- // }
- //
- // for _, kv := range get.Kvs {
- // after, _ := strings.CutPrefix(string(kv.Key), s)
- // fmt.Printf("%s : %s\n", after, string(kv.Value))
- // }
- //}
- //
- func main() {
- cli, err := clientv3.New(clientv3.Config{
- Endpoints: []string{"106.54.33.152:2379"},
- DialTimeout: 5 * time.Second,
- })
- if err != nil {
- panic(err)
- }
- defer cli.Close()
- session, err := concurrency.NewSession(cli, concurrency.WithTTL(5))
- if err != nil {
- panic(err)
- }
- m := concurrency.NewMutex(session, "lock/test")
- for i := 0; i < 10; i++ {
- err = m.TryLock(ctx)
- if err != nil {
- fmt.Printf("Failed to lock: %v\n", err)
- time.Sleep(1 * time.Second)
- continue
- }
- fmt.Println("Locked")
- time.Sleep(1 * time.Second)
- m.Unlock(ctx)
- fmt.Println("Unlocked")
- }
- }
- // 保持租约
- func keepAlive(cli *clientv3.Client, leaseID clientv3.LeaseID) {
- alive, err := cli.KeepAlive(ctx, leaseID)
- if err != nil {
- fmt.Printf("Failed to keep alive: %v\n", err)
- return
- }
- //监听KeepAlive的响应
- go func() {
- for {
- ka := <-alive
- if ka == nil {
- log.Println("租约已失效或KeepAlive通道已关闭")
- return
- }
- log.Printf("续租成功: %v\n", ka)
- }
- }()
- }
- // 新增键值对
- func put(cli *clientv3.Client, key, value string) error {
- _, err := cli.Put(ctx, key, value)
- if err != nil {
- log.Fatal(err)
- return err
- }
- return nil
- }
- // 新增键值对,带有过期时间
- func putWithTTL(cli *clientv3.Client, key, value string, ttl int64) (int64, error) {
- leaseResp, err := cli.Grant(ctx, ttl)
- if err != nil {
- log.Fatal(err)
- return 0, err
- }
- _, err = cli.Put(ctx, key, value, clientv3.WithLease(leaseResp.ID))
- if err != nil {
- log.Fatal(err)
- return 0, err
- }
- return int64(leaseResp.ID), nil
- }
- // 监听键值变化
- func watch(cli *clientv3.Client, key string) {
- watchCh := cli.Watch(ctx, key, clientv3.WithPrefix())
- go func() {
- for event := range watchCh {
- if event.Err() != nil {
- fmt.Printf("Failed to watch: %v\n", event.Err())
- return
- }
- if len(event.Events) == 0 {
- fmt.Println("No events")
- return
- }
- fmt.Printf("Event: %s %q : %q\n", event.Events[0].Type, event.Events[0].Kv.Key, event.Events[0].Kv.Value)
- if event.Events[0].Type == clientv3.EventTypeDelete {
- fmt.Println("删除")
- fmt.Printf(" %v\n", event.Events[0])
- }
- }
- }()
- }
- // 获取键的值
- func get(cli *clientv3.Client, key string) (string, error) {
- resp, err := cli.Get(ctx, key)
- if err != nil {
- log.Fatal(err)
- return "", err
- }
- if resp.Count == 0 {
- return "", fmt.Errorf("key %s not found", key)
- }
- return string(resp.Kvs[0].Value), nil
- }
- // 获取当前前缀下所有的键并截取前缀
- func getAndCutPrefix(cli *clientv3.Client, prefix string) ([]string, error) {
- resp, err := cli.Get(ctx, prefix, clientv3.WithPrefix())
- if err != nil {
- log.Fatal(err)
- return nil, err
- }
- keys := make([]string, 0)
- for _, kv := range resp.Kvs {
- keys = append(keys, strings.TrimPrefix(string(kv.Key), prefix))
- }
- return keys, nil
- }
|