package main import ( "context" "fmt" "go.etcd.io/etcd/client/v3" "log" "time" ) //func main() { // var ( // client *clientv3.Client // config clientv3.Config // err error // ) // // config = clientv3.Config{ // // 这里的 Endpoints 是一个字符串数组切片,支持配合多个节点 // Endpoints: []string{"106.54.33.152:2379"}, // // DialTimeout 连接超时设置 // DialTimeout: time.Duration(5) * time.Millisecond, // } // if client, err = clientv3.New(config); err != nil { // return // } // kv := client.KV // ctx := context.Background() // c := map[string]interface{}{ // "id": 3, // "name": "demo", // "age": 30, // } // data, _ := json.Marshal(c) // // var s = "/hstack/nodes/" // var nodes = []string{ // "1.1.1.1", // "2.2.2.2", // "3.3.3.3", // } // // for _, node := range nodes { // newnodeKey := fmt.Sprintf("%s%s", s, node) // kv.Put(ctx, newnodeKey, string(data)) // } // // get, err := kv.Get(ctx, s, clientv3.WithPrefix()) // if err != nil { // return // } // // for _, kv := range get.Kvs { // after, _ := strings.CutPrefix(string(kv.Key), s) // fmt.Printf("%s : %s\n", after, string(kv.Value)) // } //} // func main() { cli, err := clientv3.New(clientv3.Config{ Endpoints: []string{"106.54.33.152:2379"}, DialTimeout: 5 * time.Second, }) if err != nil { panic(err) } defer cli.Close() ctx, cancel := context.WithCancel(context.Background()) defer cancel() // 开始监听以 "/services/" 为前缀的键值变化 watchCh := cli.Watch(ctx, "/services/", clientv3.WithPrefix()) go func() { for event := range watchCh { if event.Err() != nil { fmt.Printf("Failed to watch: %v\n", event.Err()) return } if len(event.Events) == 0 { fmt.Println("No events") return } fmt.Printf("Event: %s %q : %q\n", event.Events[0].Type, event.Events[0].Kv.Key, event.Events[0].Kv.Value) if event.Events[0].Type == clientv3.EventTypeDelete { fmt.Println("删除") fmt.Printf(" %v\n", event.Events[0]) } } }() // 创建租约 leaseResp, err := cli.Grant(ctx, 15) // 租约时间为 15 秒 if err != nil { fmt.Printf("Failed to grant lease: %v\n", err) return } // 使用租约创建服务实例的键 _, err = cli.Put(ctx, "/services/service1", "", clientv3.WithLease(leaseResp.ID)) if err != nil { fmt.Printf("Failed to put service instance: %v\n", err) return } alive, err := cli.KeepAlive(ctx, leaseResp.ID) // 续租(ttl/3) if err != nil { fmt.Printf("Failed to keep alive: %v\n", err) return } //监听KeepAlive的响应 go func() { for { ka := <-alive if ka == nil { log.Println("租约已失效或KeepAlive通道已关闭") return } log.Printf("续租成功: %v\n", ka) } }() fmt.Println("Watching for changes...") get, _ := cli.Get(ctx, "/services/service1") fmt.Println(get.Kvs) // 处理监听结果 go func() { time.Sleep(10 * time.Second) // 注意: 程序退出时,应当撤销租约或删除键 _, err = cli.Revoke(context.TODO(), leaseResp.ID) if err != nil { log.Fatal(err) } }() time.Sleep(15 * time.Second) }