123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141 |
- package ra
- import (
- "context"
- "encoding/json"
- "errors"
- rabbitmq "github.com/wagslane/go-rabbitmq"
- "log"
- )
- type Message struct {
- ID string `json:"id"`
- Method string `json:"method"`
- Body []byte `json:"body"`
- }
- var FuncMap = make(map[string]func(message *Message))
- var MQUrl = "amqp://lxz:lxz664278@106.54.33.152:5672/lxz"
- var ExchangeName = "Agent_exchange"
- var QueueName = "Agent_queue"
- var RoutingKey = "192.168.2.122"
- var ResultExchangeName = "Server_exchange"
- func InitMQ() {
- conn, err := rabbitmq.NewConn(
- MQUrl,
- rabbitmq.WithConnectionOptionsLogging,
- )
- if err != nil {
- panic(err)
- }
- consumer, err := rabbitmq.NewConsumer(
- conn,
- QueueName,
- rabbitmq.WithConsumerOptionsRoutingKey(RoutingKey),
- rabbitmq.WithConsumerOptionsExchangeDeclare,
- rabbitmq.WithConsumerOptionsExchangeName(ExchangeName),
- rabbitmq.WithConsumerOptionsExchangeKind("direct"),
- )
- if err != nil {
- InitMQ()
- return
- }
- err = consumer.Run(func(d rabbitmq.Delivery) (action rabbitmq.Action) {
- // 处理消息
- message, err := processMessage(d.Body)
- if err != nil {
- return
- }
- log.Println("消息:", message)
- //调用函数
- if _, ok := FuncMap[message.Method]; !ok {
- return
- }
- FuncMap[message.Method](message)
- return rabbitmq.Ack
- })
- if err != nil {
- log.Println("消费消息错误:", err)
- InitMQ()
- }
- return
- }
- func processMessage(msg []byte) (*Message, error) {
- var message Message
- err := json.Unmarshal(msg, &message)
- if err != nil {
- log.Printf("反序列化消息错误:%s\n", err)
- return nil, err
- }
- return &message, nil
- }
- func newPublisher(conn *rabbitmq.Conn) (*rabbitmq.Publisher, error) {
- pub, err := rabbitmq.NewPublisher(
- conn,
- )
- if err != nil {
- return nil, err
- }
- pub.NotifyReturn(func(r rabbitmq.Return) {
- log.Printf("message returned from server: %s", string(r.Body))
- })
- pub.NotifyPublish(func(c rabbitmq.Confirmation) {
- log.Printf("message confirmed from server. tag: %v, ack: %v", c.DeliveryTag, c.Ack)
- })
- return pub, nil
- }
- func SendMessage(message Message, key string) error {
- conn, err := rabbitmq.NewConn(
- MQUrl,
- rabbitmq.WithConnectionOptionsLogging,
- )
- defer conn.Close()
- pub, err := newPublisher(conn)
- if err != nil {
- return err
- }
- marshal, err := json.Marshal(message)
- if err != nil {
- log.Printf("序列化消息错误: %s\n", err)
- return err
- }
- confirms, err := pub.PublishWithDeferredConfirmWithContext(
- context.Background(),
- marshal,
- []string{key},
- rabbitmq.WithPublishOptionsContentType("application/json"),
- rabbitmq.WithPublishOptionsPriority(0),
- rabbitmq.WithPublishOptionsExpiration("60000"),
- rabbitmq.WithPublishOptionsExchange(ResultExchangeName),
- )
- if err != nil {
- return err
- }
- if len(confirms) == 0 || confirms[0] == nil {
- log.Println("no confirms received")
- return errors.New("no confirmations received")
- }
- ok, err := confirms[0].WaitContext(context.Background())
- if err != nil {
- return err
- }
- if !ok {
- log.Println("message was nack'd")
- return errors.New("message was nack'd")
- }
- return nil
- }
|