package main import ( "demo3/rabbitmq/RabbitMQ" "encoding/json" "fmt" "log" "time" ) //func main() { // rabbitmq := RabbitMQ.NewRabbitMQ("amqp://lxz:lxz664278@106.54.33.152:5672/lxz") // var qc RabbitMQ.QueueConfig // rabbitmq.NewExchange(context.Background(), "ra", "direct", // qc.Durable()) // rabbitmq.NewQueue(context.Background(), "qq") // rabbitmq.NewQueue(context.Background(), "qw") // rabbitmq.QueueBind("qq", "cc", "ra") // rabbitmq.QueueBind("qw", "aa", "ra") // M, _ := rabbitmq.Consume("qw", "") // for { // select { // case msg := <-M: // type Message struct { // Id string `json:"id"` // Method string `json:"method"` // Body []byte `json:"body"` // } // var m Message // json.Unmarshal(msg.Body, &m) // fmt.Println(m.Id, string(m.Body)) // } // } //} func main() { mq := RabbitMQ.NewRabbitMQ("amqp://lxz:lxz664278@106.54.33.152:5672/lxz") mq.NewExchange("Server_exchange", "direct", mq.ExchangeConfig.Durable()) mq.NewQueue("Server_queue", mq.QueueConfig.Durable()) err := mq.QueueBind("Server_queue", "Server_exchange", "192.168.2.122") if err != nil { return } m, err := mq.Consume("Server", "", mq.AutoAck()) if err != nil { panic(err) } // 消费消息 go func() { for { select { case msg := <-m: //顺序处理消息体 message, err := processMessage(msg.Body) if err != nil { continue } //调用函数 fmt.Println(message) } } }() time.Sleep(time.Second) } type Message struct { ID string `json:"id"` Method string `json:"method"` Body []byte `json:"body"` } func processMessage(msg []byte) (*Message, error) { var message Message err := json.Unmarshal(msg, &message) if err != nil { log.Printf("反序列化消息错误:%s\n", err) return nil, err } return &message, nil }