12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788 |
- package main
- import (
- "encoding/json"
- "fmt"
- rabbitmq "github.com/wagslane/go-rabbitmq"
- "log"
- )
- type Message struct {
- ID string `json:"id"`
- Method string `json:"method"`
- Body []byte `json:"body"`
- }
- var FuncMap = make(map[string]func(message *Message))
- var MQUrl = "amqp://lxz:lxz664278@106.54.33.152:5672/lxz"
- var ExchangeName = "Agent_exchange1"
- var QueueName = "Agent_queue1"
- var ResultExchangeName = "Agent_exchange1"
- func InitMQ(RoutingKey string) {
- conn, err := rabbitmq.NewConn(
- MQUrl,
- //rabbitmq.WithConnectionOptionsLogging,
- )
- if err != nil {
- panic(err)
- }
- consumer, err := rabbitmq.NewConsumer(
- conn,
- QueueName,
- rabbitmq.WithConsumerOptionsRoutingKey(RoutingKey),
- rabbitmq.WithConsumerOptionsExchangeDeclare,
- rabbitmq.WithConsumerOptionsExchangeName(ExchangeName),
- rabbitmq.WithConsumerOptionsExchangeKind("direct"),
- )
- if err != nil {
- InitMQ(RoutingKey)
- return
- }
- err = consumer.Run(func(d rabbitmq.Delivery) (action rabbitmq.Action) {
- // 处理消息
- message, err := processMessage(d.Body)
- if err != nil {
- return
- }
- //log.Println("消息:", message)
- //调用函数
- if _, ok := FuncMap[message.Method]; !ok {
- return
- }
- FuncMap[message.Method](message)
- return rabbitmq.Ack
- })
- if err != nil {
- log.Println("消费消息错误:", err)
- InitMQ(RoutingKey)
- }
- return
- }
- func processMessage(msg []byte) (*Message, error) {
- var message Message
- err := json.Unmarshal(msg, &message)
- if err != nil {
- log.Printf("反序列化消息错误:%s\n", err)
- return nil, err
- }
- return &message, nil
- }
- func main() {
- go InitMQ("1")
- go InitMQ("2")
- FuncMap["test1"] = func(message *Message) {
- fmt.Println(message.ID + message.Method)
- }
- FuncMap["test2"] = func(message *Message) {
- fmt.Println(message.ID + message.Method)
- }
- select {}
- }
|