mainSimpleRecieve.go 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. package main
  2. import (
  3. "demo3/rabbitmq/RabbitMQ"
  4. "encoding/json"
  5. "fmt"
  6. "log"
  7. "time"
  8. )
  9. //func main() {
  10. // rabbitmq := RabbitMQ.NewRabbitMQ("amqp://lxz:lxz664278@106.54.33.152:5672/lxz")
  11. // var qc RabbitMQ.QueueConfig
  12. // rabbitmq.NewExchange(context.Background(), "ra", "direct",
  13. // qc.Durable())
  14. // rabbitmq.NewQueue(context.Background(), "qq")
  15. // rabbitmq.NewQueue(context.Background(), "qw")
  16. // rabbitmq.QueueBind("qq", "cc", "ra")
  17. // rabbitmq.QueueBind("qw", "aa", "ra")
  18. // M, _ := rabbitmq.Consume("qw", "")
  19. // for {
  20. // select {
  21. // case msg := <-M:
  22. // type Message struct {
  23. // Id string `json:"id"`
  24. // Method string `json:"method"`
  25. // Body []byte `json:"body"`
  26. // }
  27. // var m Message
  28. // json.Unmarshal(msg.Body, &m)
  29. // fmt.Println(m.Id, string(m.Body))
  30. // }
  31. // }
  32. //}
  33. func main() {
  34. mq := RabbitMQ.NewRabbitMQ("amqp://lxz:lxz664278@106.54.33.152:5672/lxz")
  35. mq.NewExchange("Server_exchange", "direct", mq.ExchangeConfig.Durable())
  36. mq.NewQueue("Server_queue", mq.QueueConfig.Durable())
  37. err := mq.QueueBind("Server_queue", "Server_exchange", "192.168.2.122")
  38. if err != nil {
  39. return
  40. }
  41. m, err := mq.Consume("Server", "", mq.AutoAck())
  42. if err != nil {
  43. panic(err)
  44. }
  45. // 消费消息
  46. go func() {
  47. for {
  48. select {
  49. case msg := <-m:
  50. //顺序处理消息体
  51. message, err := processMessage(msg.Body)
  52. if err != nil {
  53. continue
  54. }
  55. //调用函数
  56. fmt.Println(message)
  57. }
  58. }
  59. }()
  60. time.Sleep(time.Second)
  61. }
  62. type Message struct {
  63. ID string `json:"id"`
  64. Method string `json:"method"`
  65. Body []byte `json:"body"`
  66. }
  67. func processMessage(msg []byte) (*Message, error) {
  68. var message Message
  69. err := json.Unmarshal(msg, &message)
  70. if err != nil {
  71. log.Printf("反序列化消息错误:%s\n", err)
  72. return nil, err
  73. }
  74. return &message, nil
  75. }