test.go 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. rabbitmq "github.com/wagslane/go-rabbitmq"
  6. "log"
  7. )
  8. type Message struct {
  9. ID string `json:"id"`
  10. Method string `json:"method"`
  11. Body []byte `json:"body"`
  12. }
  13. var FuncMap = make(map[string]func(message *Message))
  14. var MQUrl = "amqp://lxz:lxz664278@106.54.33.152:5672/lxz"
  15. var ExchangeName = "Agent_exchange1"
  16. var QueueName = "Agent_queue1"
  17. var ResultExchangeName = "Agent_exchange1"
  18. func InitMQ(RoutingKey string) {
  19. conn, err := rabbitmq.NewConn(
  20. MQUrl,
  21. //rabbitmq.WithConnectionOptionsLogging,
  22. )
  23. if err != nil {
  24. panic(err)
  25. }
  26. consumer, err := rabbitmq.NewConsumer(
  27. conn,
  28. QueueName,
  29. rabbitmq.WithConsumerOptionsRoutingKey(RoutingKey),
  30. rabbitmq.WithConsumerOptionsExchangeDeclare,
  31. rabbitmq.WithConsumerOptionsExchangeName(ExchangeName),
  32. rabbitmq.WithConsumerOptionsExchangeKind("direct"),
  33. )
  34. if err != nil {
  35. InitMQ(RoutingKey)
  36. return
  37. }
  38. err = consumer.Run(func(d rabbitmq.Delivery) (action rabbitmq.Action) {
  39. // 处理消息
  40. message, err := processMessage(d.Body)
  41. if err != nil {
  42. return
  43. }
  44. //log.Println("消息:", message)
  45. //调用函数
  46. if _, ok := FuncMap[message.Method]; !ok {
  47. return
  48. }
  49. FuncMap[message.Method](message)
  50. return rabbitmq.Ack
  51. })
  52. if err != nil {
  53. log.Println("消费消息错误:", err)
  54. InitMQ(RoutingKey)
  55. }
  56. return
  57. }
  58. func processMessage(msg []byte) (*Message, error) {
  59. var message Message
  60. err := json.Unmarshal(msg, &message)
  61. if err != nil {
  62. log.Printf("反序列化消息错误:%s\n", err)
  63. return nil, err
  64. }
  65. return &message, nil
  66. }
  67. func main() {
  68. go InitMQ("1")
  69. go InitMQ("2")
  70. FuncMap["test1"] = func(message *Message) {
  71. fmt.Println(message.ID + message.Method)
  72. }
  73. FuncMap["test2"] = func(message *Message) {
  74. fmt.Println(message.ID + message.Method)
  75. }
  76. select {}
  77. }