test.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. package ra
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. rabbitmq "github.com/wagslane/go-rabbitmq"
  7. "log"
  8. )
  9. type Message struct {
  10. ID string `json:"id"`
  11. Method string `json:"method"`
  12. Body []byte `json:"body"`
  13. }
  14. var FuncMap = make(map[string]func(message *Message))
  15. var MQUrl = "amqp://lxz:lxz664278@106.54.33.152:5672/lxz"
  16. var ExchangeName = "Agent_exchange"
  17. var QueueName = "Agent_queue"
  18. var RoutingKey = "192.168.2.122"
  19. var ResultExchangeName = "Server_exchange"
  20. func InitMQ() {
  21. conn, err := rabbitmq.NewConn(
  22. MQUrl,
  23. rabbitmq.WithConnectionOptionsLogging,
  24. )
  25. if err != nil {
  26. panic(err)
  27. }
  28. consumer, err := rabbitmq.NewConsumer(
  29. conn,
  30. QueueName,
  31. rabbitmq.WithConsumerOptionsRoutingKey(RoutingKey),
  32. rabbitmq.WithConsumerOptionsExchangeDeclare,
  33. rabbitmq.WithConsumerOptionsExchangeName(ExchangeName),
  34. rabbitmq.WithConsumerOptionsExchangeKind("direct"),
  35. )
  36. if err != nil {
  37. InitMQ()
  38. return
  39. }
  40. err = consumer.Run(func(d rabbitmq.Delivery) (action rabbitmq.Action) {
  41. // 处理消息
  42. message, err := processMessage(d.Body)
  43. if err != nil {
  44. return
  45. }
  46. log.Println("消息:", message)
  47. //调用函数
  48. if _, ok := FuncMap[message.Method]; !ok {
  49. return
  50. }
  51. FuncMap[message.Method](message)
  52. return rabbitmq.Ack
  53. })
  54. if err != nil {
  55. log.Println("消费消息错误:", err)
  56. InitMQ()
  57. }
  58. return
  59. }
  60. func processMessage(msg []byte) (*Message, error) {
  61. var message Message
  62. err := json.Unmarshal(msg, &message)
  63. if err != nil {
  64. log.Printf("反序列化消息错误:%s\n", err)
  65. return nil, err
  66. }
  67. return &message, nil
  68. }
  69. func newPublisher(conn *rabbitmq.Conn) (*rabbitmq.Publisher, error) {
  70. pub, err := rabbitmq.NewPublisher(
  71. conn,
  72. )
  73. if err != nil {
  74. return nil, err
  75. }
  76. pub.NotifyReturn(func(r rabbitmq.Return) {
  77. log.Printf("message returned from server: %s", string(r.Body))
  78. })
  79. pub.NotifyPublish(func(c rabbitmq.Confirmation) {
  80. log.Printf("message confirmed from server. tag: %v, ack: %v", c.DeliveryTag, c.Ack)
  81. })
  82. return pub, nil
  83. }
  84. func SendMessage(message Message, key string) error {
  85. conn, err := rabbitmq.NewConn(
  86. MQUrl,
  87. rabbitmq.WithConnectionOptionsLogging,
  88. )
  89. defer conn.Close()
  90. pub, err := newPublisher(conn)
  91. if err != nil {
  92. return err
  93. }
  94. marshal, err := json.Marshal(message)
  95. if err != nil {
  96. log.Printf("序列化消息错误: %s\n", err)
  97. return err
  98. }
  99. confirms, err := pub.PublishWithDeferredConfirmWithContext(
  100. context.Background(),
  101. marshal,
  102. []string{key},
  103. rabbitmq.WithPublishOptionsContentType("application/json"),
  104. rabbitmq.WithPublishOptionsPriority(0),
  105. rabbitmq.WithPublishOptionsExpiration("60000"),
  106. rabbitmq.WithPublishOptionsExchange(ResultExchangeName),
  107. )
  108. if err != nil {
  109. return err
  110. }
  111. if len(confirms) == 0 || confirms[0] == nil {
  112. log.Println("no confirms received")
  113. return errors.New("no confirmations received")
  114. }
  115. ok, err := confirms[0].WaitContext(context.Background())
  116. if err != nil {
  117. return err
  118. }
  119. if !ok {
  120. log.Println("message was nack'd")
  121. return errors.New("message was nack'd")
  122. }
  123. return nil
  124. }