test.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "go.etcd.io/etcd/client/v3"
  6. "log"
  7. "time"
  8. )
  9. //func main() {
  10. // var (
  11. // client *clientv3.Client
  12. // config clientv3.Config
  13. // err error
  14. // )
  15. //
  16. // config = clientv3.Config{
  17. // // 这里的 Endpoints 是一个字符串数组切片,支持配合多个节点
  18. // Endpoints: []string{"106.54.33.152:2379"},
  19. // // DialTimeout 连接超时设置
  20. // DialTimeout: time.Duration(5) * time.Millisecond,
  21. // }
  22. // if client, err = clientv3.New(config); err != nil {
  23. // return
  24. // }
  25. // kv := client.KV
  26. // ctx := context.Background()
  27. // c := map[string]interface{}{
  28. // "id": 3,
  29. // "name": "demo",
  30. // "age": 30,
  31. // }
  32. // data, _ := json.Marshal(c)
  33. //
  34. // var s = "/hstack/nodes/"
  35. // var nodes = []string{
  36. // "1.1.1.1",
  37. // "2.2.2.2",
  38. // "3.3.3.3",
  39. // }
  40. //
  41. // for _, node := range nodes {
  42. // newnodeKey := fmt.Sprintf("%s%s", s, node)
  43. // kv.Put(ctx, newnodeKey, string(data))
  44. // }
  45. //
  46. // get, err := kv.Get(ctx, s, clientv3.WithPrefix())
  47. // if err != nil {
  48. // return
  49. // }
  50. //
  51. // for _, kv := range get.Kvs {
  52. // after, _ := strings.CutPrefix(string(kv.Key), s)
  53. // fmt.Printf("%s : %s\n", after, string(kv.Value))
  54. // }
  55. //}
  56. //
  57. func main() {
  58. cli, err := clientv3.New(clientv3.Config{
  59. Endpoints: []string{"106.54.33.152:2379"},
  60. DialTimeout: 5 * time.Second,
  61. })
  62. if err != nil {
  63. panic(err)
  64. }
  65. defer cli.Close()
  66. ctx, cancel := context.WithCancel(context.Background())
  67. defer cancel()
  68. // 开始监听以 "/services/" 为前缀的键值变化
  69. watchCh := cli.Watch(ctx, "/services/", clientv3.WithPrefix())
  70. go func() {
  71. for event := range watchCh {
  72. if event.Err() != nil {
  73. fmt.Printf("Failed to watch: %v\n", event.Err())
  74. return
  75. }
  76. if len(event.Events) == 0 {
  77. fmt.Println("No events")
  78. return
  79. }
  80. fmt.Printf("Event: %s %q : %q\n", event.Events[0].Type, event.Events[0].Kv.Key, event.Events[0].Kv.Value)
  81. if event.Events[0].Type == clientv3.EventTypeDelete {
  82. fmt.Println("删除")
  83. fmt.Printf(" %v\n", event.Events[0])
  84. }
  85. }
  86. }()
  87. // 创建租约
  88. leaseResp, err := cli.Grant(ctx, 15) // 租约时间为 15 秒
  89. if err != nil {
  90. fmt.Printf("Failed to grant lease: %v\n", err)
  91. return
  92. }
  93. // 使用租约创建服务实例的键
  94. _, err = cli.Put(ctx, "/services/service1", "", clientv3.WithLease(leaseResp.ID))
  95. if err != nil {
  96. fmt.Printf("Failed to put service instance: %v\n", err)
  97. return
  98. }
  99. alive, err := cli.KeepAlive(ctx, leaseResp.ID) // 续租(ttl/3)
  100. if err != nil {
  101. fmt.Printf("Failed to keep alive: %v\n", err)
  102. return
  103. }
  104. //监听KeepAlive的响应
  105. go func() {
  106. for {
  107. ka := <-alive
  108. if ka == nil {
  109. log.Println("租约已失效或KeepAlive通道已关闭")
  110. return
  111. }
  112. log.Printf("续租成功: %v\n", ka)
  113. }
  114. }()
  115. fmt.Println("Watching for changes...")
  116. get, _ := cli.Get(ctx, "/services/service1")
  117. fmt.Println(get.Kvs)
  118. // 处理监听结果
  119. go func() {
  120. time.Sleep(10 * time.Second)
  121. // 注意: 程序退出时,应当撤销租约或删除键
  122. _, err = cli.Revoke(context.TODO(), leaseResp.ID)
  123. if err != nil {
  124. log.Fatal(err)
  125. }
  126. }()
  127. time.Sleep(15 * time.Second)
  128. }