package main import ( "encoding/json" "fmt" rabbitmq "github.com/wagslane/go-rabbitmq" "log" ) type Message struct { ID string `json:"id"` Method string `json:"method"` Body []byte `json:"body"` } var FuncMap = make(map[string]func(message *Message)) var MQUrl = "amqp://lxz:lxz664278@106.54.33.152:5672/lxz" var ExchangeName = "Agent_exchange1" var QueueName = "Agent_queue1" var ResultExchangeName = "Agent_exchange1" func InitMQ(RoutingKey string) { conn, err := rabbitmq.NewConn( MQUrl, //rabbitmq.WithConnectionOptionsLogging, ) if err != nil { panic(err) } consumer, err := rabbitmq.NewConsumer( conn, QueueName, rabbitmq.WithConsumerOptionsRoutingKey(RoutingKey), rabbitmq.WithConsumerOptionsExchangeDeclare, rabbitmq.WithConsumerOptionsExchangeName(ExchangeName), rabbitmq.WithConsumerOptionsExchangeKind("direct"), ) if err != nil { InitMQ(RoutingKey) return } err = consumer.Run(func(d rabbitmq.Delivery) (action rabbitmq.Action) { // 处理消息 message, err := processMessage(d.Body) if err != nil { return } //log.Println("消息:", message) //调用函数 if _, ok := FuncMap[message.Method]; !ok { return } FuncMap[message.Method](message) return rabbitmq.Ack }) if err != nil { log.Println("消费消息错误:", err) InitMQ(RoutingKey) } return } func processMessage(msg []byte) (*Message, error) { var message Message err := json.Unmarshal(msg, &message) if err != nil { log.Printf("反序列化消息错误:%s\n", err) return nil, err } return &message, nil } func main() { go InitMQ("1") go InitMQ("2") FuncMap["test1"] = func(message *Message) { fmt.Println(message.ID + message.Method) } FuncMap["test2"] = func(message *Message) { fmt.Println(message.ID + message.Method) } select {} }