package RabbitMQ import ( "github.com/streadway/amqp" "log" ) func NewRabbitMQ(MQUrl string) RabbitMQ { var err error //创建链接 conn, err := amqp.Dial(MQUrl) if err != nil { log.Printf("Error: %v\n", err) panic(err) } //创建RabbitMQ实例 rabbitmq := RabbitMQ{ Conn: conn, } //获取channel rabbitmq.ConsumerChannel, err = rabbitmq.Conn.Channel() if err != nil { log.Printf("Error: %v\n", err) panic(err) } rabbitmq.PublisherChannel, err = rabbitmq.Conn.Channel() if err != nil { log.Printf("Error: %v\n", err) panic(err) } return rabbitmq } // NewQueue 新建队列 func (r *RabbitMQ) NewQueue(QueueName string, opts ...Opt) { for _, opt := range opts { opt() } _, err := r.Channel.QueueDeclare( QueueName, r.QueueConfig.durable, r.QueueConfig.autoDelete, r.QueueConfig.exclusive, r.QueueConfig.noWait, r.QueueConfig.args, ) if err != nil { log.Println(err) } } // NewExchange 新建交换机 func (r *RabbitMQ) NewExchange(Exchange string, kind string, opts ...Opt) { for _, opt := range opts { opt() } //1.申请交换机 err := r.Channel.ExchangeDeclare( //交换机名 Exchange, //交换机类型 kind, //是否持久化 r.ExchangeConfig.durable, //是否自动删除 r.ExchangeConfig.autoDelete, //是否具有排他性 r.ExchangeConfig.internal, //是否阻塞处理 r.ExchangeConfig.noWait, //额外的属性 r.ExchangeConfig.args, ) if err != nil { log.Println(err) panic(err) } } // Close 关闭channel和connection func (r *RabbitMQ) Close() { _ = r.Channel.Close() _ = r.Conn.Close() } func (r *RabbitMQ) QueueBind(QueueName, Exchange, Key string, opts ...Opt) error { for _, opt := range opts { opt() } return r.Channel.QueueBind(QueueName, Key, Exchange, r.QueueConfig.noWait, r.QueueConfig.args) } func (r *RabbitMQ) Consume(queueName, consumer string, opts ...Opt) (<-chan amqp.Delivery, error) { for _, opt := range opts { opt() } return r.Channel.Consume( queueName, // queue //用来区分多个消费者 consumer, // consumer //是否自动应答 r.autoAck, // auto-ack //是否独有 r.exclusive, // exclusive //设置为true,表示 不能将同一个Conenction中生产者发送的消息传递给这个Connection中 的消费者 r.noLocal, // no-local //列是否阻塞 r.noWait, // no-wait r.args, // args ) } func (r *RabbitMQ) Pub(msg []byte, exchange, key string) error { return r.Channel.Publish( exchange, key, r.mandatory, r.immediate, amqp.Publishing{ ContentType: "text/json", Body: msg, }, ) }