package main import ( "fmt" "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/concurrency" "log" "strings" "time" ) //func main() { // var ( // client *clientv3.Client // config clientv3.Config // err error // ) // // config = clientv3.Config{ // // 这里的 Endpoints 是一个字符串数组切片,支持配合多个节点 // Endpoints: []string{"106.54.33.152:2379"}, // // DialTimeout 连接超时设置 // DialTimeout: time.Duration(5) * time.Millisecond, // } // if client, err = clientv3.New(config); err != nil { // return // } // kv := client.KV // ctx := context.Background() // c := map[string]interface{}{ // "id": 3, // "name": "demo", // "age": 30, // } // data, _ := json.Marshal(c) // // var s = "/hstack/nodes/" // var nodes = []string{ // "1.1.1.1", // "2.2.2.2", // "3.3.3.3", // } // // for _, node := range nodes { // newnodeKey := fmt.Sprintf("%s%s", s, node) // kv.Put(ctx, newnodeKey, string(data)) // } // // get, err := kv.Get(ctx, s, clientv3.WithPrefix()) // if err != nil { // return // } // // for _, kv := range get.Kvs { // after, _ := strings.CutPrefix(string(kv.Key), s) // fmt.Printf("%s : %s\n", after, string(kv.Value)) // } //} // func main() { cli, err := clientv3.New(clientv3.Config{ Endpoints: []string{"106.54.33.152:2379"}, DialTimeout: 5 * time.Second, }) if err != nil { panic(err) } defer cli.Close() session, err := concurrency.NewSession(cli, concurrency.WithTTL(5)) if err != nil { panic(err) } m := concurrency.NewMutex(session, "lock/test") for i := 0; i < 10; i++ { err = m.TryLock(ctx) if err != nil { fmt.Printf("Failed to lock: %v\n", err) time.Sleep(1 * time.Second) continue } fmt.Println("Locked") time.Sleep(1 * time.Second) m.Unlock(ctx) fmt.Println("Unlocked") } } // 保持租约 func keepAlive(cli *clientv3.Client, leaseID clientv3.LeaseID) { alive, err := cli.KeepAlive(ctx, leaseID) if err != nil { fmt.Printf("Failed to keep alive: %v\n", err) return } //监听KeepAlive的响应 go func() { for { ka := <-alive if ka == nil { log.Println("租约已失效或KeepAlive通道已关闭") return } log.Printf("续租成功: %v\n", ka) } }() } // 新增键值对 func put(cli *clientv3.Client, key, value string) error { _, err := cli.Put(ctx, key, value) if err != nil { log.Fatal(err) return err } return nil } // 新增键值对,带有过期时间 func putWithTTL(cli *clientv3.Client, key, value string, ttl int64) (int64, error) { leaseResp, err := cli.Grant(ctx, ttl) if err != nil { log.Fatal(err) return 0, err } _, err = cli.Put(ctx, key, value, clientv3.WithLease(leaseResp.ID)) if err != nil { log.Fatal(err) return 0, err } return int64(leaseResp.ID), nil } // 监听键值变化 func watch(cli *clientv3.Client, key string) { watchCh := cli.Watch(ctx, key, clientv3.WithPrefix()) go func() { for event := range watchCh { if event.Err() != nil { fmt.Printf("Failed to watch: %v\n", event.Err()) return } if len(event.Events) == 0 { fmt.Println("No events") return } fmt.Printf("Event: %s %q : %q\n", event.Events[0].Type, event.Events[0].Kv.Key, event.Events[0].Kv.Value) if event.Events[0].Type == clientv3.EventTypeDelete { fmt.Println("删除") fmt.Printf(" %v\n", event.Events[0]) } } }() } // 获取键的值 func get(cli *clientv3.Client, key string) (string, error) { resp, err := cli.Get(ctx, key) if err != nil { log.Fatal(err) return "", err } if resp.Count == 0 { return "", fmt.Errorf("key %s not found", key) } return string(resp.Kvs[0].Value), nil } // 获取当前前缀下所有的键并截取前缀 func getAndCutPrefix(cli *clientv3.Client, prefix string) ([]string, error) { resp, err := cli.Get(ctx, prefix, clientv3.WithPrefix()) if err != nil { log.Fatal(err) return nil, err } keys := make([]string, 0) for _, kv := range resp.Kvs { keys = append(keys, strings.TrimPrefix(string(kv.Key), prefix)) } return keys, nil }