test.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. package ra
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. rabbitmq "github.com/wagslane/go-rabbitmq"
  7. "log"
  8. )
  9. type Message struct {
  10. ID string `json:"id"`
  11. Method string `json:"method"`
  12. Body []byte `json:"body"`
  13. }
  14. var FuncMap = make(map[string]func(message *Message))
  15. var MQUrl = "amqp://lxz:lxz664278@106.54.33.152:5672/lxz"
  16. var ExchangeName = "Agent_exchange1"
  17. var QueueName = "Agent_queue1"
  18. var RoutingKey = "192.168.2.122"
  19. var ResultExchangeName = "Server_exchange"
  20. func InitMQ() {
  21. conn, err := rabbitmq.NewConn(
  22. MQUrl,
  23. rabbitmq.WithConnectionOptionsLogging,
  24. )
  25. if err != nil {
  26. panic(err)
  27. }
  28. consumer, err := rabbitmq.NewConsumer(
  29. conn,
  30. QueueName,
  31. rabbitmq.WithConsumerOptionsRoutingKey(RoutingKey),
  32. rabbitmq.WithConsumerOptionsExchangeDeclare,
  33. rabbitmq.WithConsumerOptionsExchangeName(ExchangeName),
  34. rabbitmq.WithConsumerOptionsExchangeKind("direct"),
  35. )
  36. if err != nil {
  37. InitMQ()
  38. return
  39. }
  40. err = consumer.Run(func(d rabbitmq.Delivery) (action rabbitmq.Action) {
  41. // 处理消息
  42. message, err := processMessage(d.Body)
  43. if err != nil {
  44. return
  45. }
  46. log.Println("消息:", message)
  47. //调用函数
  48. if _, ok := FuncMap[message.Method]; !ok {
  49. return
  50. }
  51. FuncMap[message.Method](message)
  52. return rabbitmq.Ack
  53. })
  54. if err != nil {
  55. log.Println("消费消息错误:", err)
  56. InitMQ()
  57. }
  58. return
  59. }
  60. func processMessage(msg []byte) (*Message, error) {
  61. var message Message
  62. err := json.Unmarshal(msg, &message)
  63. if err != nil {
  64. log.Printf("反序列化消息错误:%s\n", err)
  65. return nil, err
  66. }
  67. return &message, nil
  68. }
  69. func NewPublisher(conn *rabbitmq.Conn) (*rabbitmq.Publisher, error) {
  70. pub, err := rabbitmq.NewPublisher(
  71. conn,
  72. )
  73. if err != nil {
  74. return nil, err
  75. }
  76. pub.NotifyReturn(func(r rabbitmq.Return) {
  77. //log.Printf("message returned from server: %s", string(r.Body))
  78. })
  79. pub.NotifyPublish(func(c rabbitmq.Confirmation) {
  80. //log.Printf("message confirmed from server. tag: %v, ack: %v", c.DeliveryTag, c.Ack)
  81. })
  82. return pub, nil
  83. }
  84. func SendMessage(message Message, key string, pub *rabbitmq.Publisher) error {
  85. marshal, err := json.Marshal(message)
  86. if err != nil {
  87. log.Printf("序列化消息错误: %s\n", err)
  88. return err
  89. }
  90. confirms, err := pub.PublishWithDeferredConfirmWithContext(
  91. context.Background(),
  92. marshal,
  93. []string{key},
  94. rabbitmq.WithPublishOptionsContentType("application/json"),
  95. rabbitmq.WithPublishOptionsPriority(0),
  96. rabbitmq.WithPublishOptionsExpiration("60000"),
  97. rabbitmq.WithPublishOptionsExchange(ResultExchangeName),
  98. )
  99. if err != nil {
  100. return err
  101. }
  102. if len(confirms) == 0 || confirms[0] == nil {
  103. log.Println("no confirms received")
  104. return errors.New("no confirmations received")
  105. }
  106. ok, err := confirms[0].WaitContext(context.Background())
  107. if err != nil {
  108. return err
  109. }
  110. if !ok {
  111. log.Println("message was nack'd")
  112. return errors.New("message was nack'd")
  113. }
  114. return nil
  115. }