123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081 |
- package main
- import (
- "demo3/rabbitmq/RabbitMQ"
- "encoding/json"
- "fmt"
- "log"
- "time"
- )
- //func main() {
- // rabbitmq := RabbitMQ.NewRabbitMQ("amqp://lxz:lxz664278@106.54.33.152:5672/lxz")
- // var qc RabbitMQ.QueueConfig
- // rabbitmq.NewExchange(context.Background(), "ra", "direct",
- // qc.Durable())
- // rabbitmq.NewQueue(context.Background(), "qq")
- // rabbitmq.NewQueue(context.Background(), "qw")
- // rabbitmq.QueueBind("qq", "cc", "ra")
- // rabbitmq.QueueBind("qw", "aa", "ra")
- // M, _ := rabbitmq.Consume("qw", "")
- // for {
- // select {
- // case msg := <-M:
- // type Message struct {
- // Id string `json:"id"`
- // Method string `json:"method"`
- // Body []byte `json:"body"`
- // }
- // var m Message
- // json.Unmarshal(msg.Body, &m)
- // fmt.Println(m.Id, string(m.Body))
- // }
- // }
- //}
- func main() {
- mq := RabbitMQ.NewRabbitMQ("amqp://lxz:lxz664278@106.54.33.152:5672/lxz")
- mq.NewExchange("Server_exchange", "direct", mq.ExchangeConfig.Durable())
- mq.NewQueue("Server_queue", mq.QueueConfig.Durable())
- err := mq.QueueBind("Server_queue", "Server_exchange", "192.168.2.122")
- if err != nil {
- return
- }
- m, err := mq.Consume("Server", "", mq.AutoAck())
- if err != nil {
- panic(err)
- }
- // 消费消息
- go func() {
- for {
- select {
- case msg := <-m:
- //顺序处理消息体
- message, err := processMessage(msg.Body)
- if err != nil {
- continue
- }
- //调用函数
- fmt.Println(message)
- }
- }
- }()
- time.Sleep(time.Second)
- }
- type Message struct {
- ID string `json:"id"`
- Method string `json:"method"`
- Body []byte `json:"body"`
- }
- func processMessage(msg []byte) (*Message, error) {
- var message Message
- err := json.Unmarshal(msg, &message)
- if err != nil {
- log.Printf("反序列化消息错误:%s\n", err)
- return nil, err
- }
- return &message, nil
- }
|