test.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. package main
  2. import (
  3. "fmt"
  4. "go.etcd.io/etcd/client/v3"
  5. "go.etcd.io/etcd/client/v3/concurrency"
  6. "log"
  7. "strings"
  8. "time"
  9. )
  10. //func main() {
  11. // var (
  12. // client *clientv3.Client
  13. // config clientv3.Config
  14. // err error
  15. // )
  16. //
  17. // config = clientv3.Config{
  18. // // 这里的 Endpoints 是一个字符串数组切片,支持配合多个节点
  19. // Endpoints: []string{"106.54.33.152:2379"},
  20. // // DialTimeout 连接超时设置
  21. // DialTimeout: time.Duration(5) * time.Millisecond,
  22. // }
  23. // if client, err = clientv3.New(config); err != nil {
  24. // return
  25. // }
  26. // kv := client.KV
  27. // ctx := context.Background()
  28. // c := map[string]interface{}{
  29. // "id": 3,
  30. // "name": "demo",
  31. // "age": 30,
  32. // }
  33. // data, _ := json.Marshal(c)
  34. //
  35. // var s = "/hstack/nodes/"
  36. // var nodes = []string{
  37. // "1.1.1.1",
  38. // "2.2.2.2",
  39. // "3.3.3.3",
  40. // }
  41. //
  42. // for _, node := range nodes {
  43. // newnodeKey := fmt.Sprintf("%s%s", s, node)
  44. // kv.Put(ctx, newnodeKey, string(data))
  45. // }
  46. //
  47. // get, err := kv.Get(ctx, s, clientv3.WithPrefix())
  48. // if err != nil {
  49. // return
  50. // }
  51. //
  52. // for _, kv := range get.Kvs {
  53. // after, _ := strings.CutPrefix(string(kv.Key), s)
  54. // fmt.Printf("%s : %s\n", after, string(kv.Value))
  55. // }
  56. //}
  57. //
  58. func main() {
  59. cli, err := clientv3.New(clientv3.Config{
  60. Endpoints: []string{"106.54.33.152:2379"},
  61. DialTimeout: 5 * time.Second,
  62. })
  63. if err != nil {
  64. panic(err)
  65. }
  66. defer cli.Close()
  67. session, err := concurrency.NewSession(cli, concurrency.WithTTL(5))
  68. if err != nil {
  69. panic(err)
  70. }
  71. m := concurrency.NewMutex(session, "lock/test")
  72. for i := 0; i < 10; i++ {
  73. err = m.TryLock(ctx)
  74. if err != nil {
  75. fmt.Printf("Failed to lock: %v\n", err)
  76. time.Sleep(1 * time.Second)
  77. continue
  78. }
  79. fmt.Println("Locked")
  80. time.Sleep(1 * time.Second)
  81. m.Unlock(ctx)
  82. fmt.Println("Unlocked")
  83. }
  84. }
  85. // 保持租约
  86. func keepAlive(cli *clientv3.Client, leaseID clientv3.LeaseID) {
  87. alive, err := cli.KeepAlive(ctx, leaseID)
  88. if err != nil {
  89. fmt.Printf("Failed to keep alive: %v\n", err)
  90. return
  91. }
  92. //监听KeepAlive的响应
  93. go func() {
  94. for {
  95. ka := <-alive
  96. if ka == nil {
  97. log.Println("租约已失效或KeepAlive通道已关闭")
  98. return
  99. }
  100. log.Printf("续租成功: %v\n", ka)
  101. }
  102. }()
  103. }
  104. // 新增键值对
  105. func put(cli *clientv3.Client, key, value string) error {
  106. _, err := cli.Put(ctx, key, value)
  107. if err != nil {
  108. log.Fatal(err)
  109. return err
  110. }
  111. return nil
  112. }
  113. // 新增键值对,带有过期时间
  114. func putWithTTL(cli *clientv3.Client, key, value string, ttl int64) (int64, error) {
  115. leaseResp, err := cli.Grant(ctx, ttl)
  116. if err != nil {
  117. log.Fatal(err)
  118. return 0, err
  119. }
  120. _, err = cli.Put(ctx, key, value, clientv3.WithLease(leaseResp.ID))
  121. if err != nil {
  122. log.Fatal(err)
  123. return 0, err
  124. }
  125. return int64(leaseResp.ID), nil
  126. }
  127. // 监听键值变化
  128. func watch(cli *clientv3.Client, key string) {
  129. watchCh := cli.Watch(ctx, key, clientv3.WithPrefix())
  130. go func() {
  131. for event := range watchCh {
  132. if event.Err() != nil {
  133. fmt.Printf("Failed to watch: %v\n", event.Err())
  134. return
  135. }
  136. if len(event.Events) == 0 {
  137. fmt.Println("No events")
  138. return
  139. }
  140. fmt.Printf("Event: %s %q : %q\n", event.Events[0].Type, event.Events[0].Kv.Key, event.Events[0].Kv.Value)
  141. if event.Events[0].Type == clientv3.EventTypeDelete {
  142. fmt.Println("删除")
  143. fmt.Printf(" %v\n", event.Events[0])
  144. }
  145. }
  146. }()
  147. }
  148. // 获取键的值
  149. func get(cli *clientv3.Client, key string) (string, error) {
  150. resp, err := cli.Get(ctx, key)
  151. if err != nil {
  152. log.Fatal(err)
  153. return "", err
  154. }
  155. if resp.Count == 0 {
  156. return "", fmt.Errorf("key %s not found", key)
  157. }
  158. return string(resp.Kvs[0].Value), nil
  159. }
  160. // 获取当前前缀下所有的键并截取前缀
  161. func getAndCutPrefix(cli *clientv3.Client, prefix string) ([]string, error) {
  162. resp, err := cli.Get(ctx, prefix, clientv3.WithPrefix())
  163. if err != nil {
  164. log.Fatal(err)
  165. return nil, err
  166. }
  167. keys := make([]string, 0)
  168. for _, kv := range resp.Kvs {
  169. keys = append(keys, strings.TrimPrefix(string(kv.Key), prefix))
  170. }
  171. return keys, nil
  172. }