package ra import ( "context" "encoding/json" "errors" rabbitmq "github.com/wagslane/go-rabbitmq" "log" ) type Message struct { ID string `json:"id"` Method string `json:"method"` Body []byte `json:"body"` } var FuncMap = make(map[string]func(message *Message)) var MQUrl = "amqp://lxz:lxz664278@106.54.33.152:5672/lxz" var ExchangeName = "Agent_exchange" var QueueName = "Agent_queue" var RoutingKey = "192.168.2.122" var ResultExchangeName = "Server_exchange" func InitMQ() { conn, err := rabbitmq.NewConn( MQUrl, rabbitmq.WithConnectionOptionsLogging, ) if err != nil { panic(err) } consumer, err := rabbitmq.NewConsumer( conn, QueueName, rabbitmq.WithConsumerOptionsRoutingKey(RoutingKey), rabbitmq.WithConsumerOptionsExchangeDeclare, rabbitmq.WithConsumerOptionsExchangeName(ExchangeName), rabbitmq.WithConsumerOptionsExchangeKind("direct"), ) if err != nil { InitMQ() return } err = consumer.Run(func(d rabbitmq.Delivery) (action rabbitmq.Action) { // 处理消息 message, err := processMessage(d.Body) if err != nil { return } log.Println("消息:", message) //调用函数 if _, ok := FuncMap[message.Method]; !ok { return } FuncMap[message.Method](message) return rabbitmq.Ack }) if err != nil { log.Println("消费消息错误:", err) InitMQ() } return } func processMessage(msg []byte) (*Message, error) { var message Message err := json.Unmarshal(msg, &message) if err != nil { log.Printf("反序列化消息错误:%s\n", err) return nil, err } return &message, nil } func newPublisher(conn *rabbitmq.Conn) (*rabbitmq.Publisher, error) { pub, err := rabbitmq.NewPublisher( conn, ) if err != nil { return nil, err } pub.NotifyReturn(func(r rabbitmq.Return) { log.Printf("message returned from server: %s", string(r.Body)) }) pub.NotifyPublish(func(c rabbitmq.Confirmation) { log.Printf("message confirmed from server. tag: %v, ack: %v", c.DeliveryTag, c.Ack) }) return pub, nil } func SendMessage(message Message, key string) error { conn, err := rabbitmq.NewConn( MQUrl, rabbitmq.WithConnectionOptionsLogging, ) defer conn.Close() pub, err := newPublisher(conn) if err != nil { return err } marshal, err := json.Marshal(message) if err != nil { log.Printf("序列化消息错误: %s\n", err) return err } confirms, err := pub.PublishWithDeferredConfirmWithContext( context.Background(), marshal, []string{key}, rabbitmq.WithPublishOptionsContentType("application/json"), rabbitmq.WithPublishOptionsPriority(0), rabbitmq.WithPublishOptionsExpiration("60000"), rabbitmq.WithPublishOptionsExchange(ResultExchangeName), ) if err != nil { return err } if len(confirms) == 0 || confirms[0] == nil { log.Println("no confirms received") return errors.New("no confirmations received") } ok, err := confirms[0].WaitContext(context.Background()) if err != nil { return err } if !ok { log.Println("message was nack'd") return errors.New("message was nack'd") } return nil }