123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127 |
- package RabbitMQ
- import (
- "github.com/streadway/amqp"
- "log"
- )
- func NewRabbitMQ(MQUrl string) RabbitMQ {
- var err error
- //创建链接
- conn, err := amqp.Dial(MQUrl)
- if err != nil {
- log.Printf("Error: %v\n", err)
- panic(err)
- }
- //创建RabbitMQ实例
- rabbitmq := RabbitMQ{
- Conn: conn,
- }
- //获取channel
- rabbitmq.ConsumerChannel, err = rabbitmq.Conn.Channel()
- if err != nil {
- log.Printf("Error: %v\n", err)
- panic(err)
- }
- rabbitmq.PublisherChannel, err = rabbitmq.Conn.Channel()
- if err != nil {
- log.Printf("Error: %v\n", err)
- panic(err)
- }
- return rabbitmq
- }
- // NewQueue 新建队列
- func (r *RabbitMQ) NewQueue(QueueName string, opts ...Opt) {
- for _, opt := range opts {
- opt()
- }
- _, err := r.Channel.QueueDeclare(
- QueueName,
- r.QueueConfig.durable,
- r.QueueConfig.autoDelete,
- r.QueueConfig.exclusive,
- r.QueueConfig.noWait,
- r.QueueConfig.args,
- )
- if err != nil {
- log.Println(err)
- }
- }
- // NewExchange 新建交换机
- func (r *RabbitMQ) NewExchange(Exchange string, kind string, opts ...Opt) {
- for _, opt := range opts {
- opt()
- }
- //1.申请交换机
- err := r.Channel.ExchangeDeclare(
- //交换机名
- Exchange,
- //交换机类型
- kind,
- //是否持久化
- r.ExchangeConfig.durable,
- //是否自动删除
- r.ExchangeConfig.autoDelete,
- //是否具有排他性
- r.ExchangeConfig.internal,
- //是否阻塞处理
- r.ExchangeConfig.noWait,
- //额外的属性
- r.ExchangeConfig.args,
- )
- if err != nil {
- log.Println(err)
- panic(err)
- }
- }
- // Close 关闭channel和connection
- func (r *RabbitMQ) Close() {
- _ = r.Channel.Close()
- _ = r.Conn.Close()
- }
- func (r *RabbitMQ) QueueBind(QueueName, Exchange, Key string, opts ...Opt) error {
- for _, opt := range opts {
- opt()
- }
- return r.Channel.QueueBind(QueueName, Key, Exchange, r.QueueConfig.noWait, r.QueueConfig.args)
- }
- func (r *RabbitMQ) Consume(queueName, consumer string, opts ...Opt) (<-chan amqp.Delivery, error) {
- for _, opt := range opts {
- opt()
- }
- return r.Channel.Consume(
- queueName, // queue
- //用来区分多个消费者
- consumer, // consumer
- //是否自动应答
- r.autoAck, // auto-ack
- //是否独有
- r.exclusive, // exclusive
- //设置为true,表示 不能将同一个Conenction中生产者发送的消息传递给这个Connection中 的消费者
- r.noLocal, // no-local
- //列是否阻塞
- r.noWait, // no-wait
- r.args, // args
- )
- }
- func (r *RabbitMQ) Pub(msg []byte, exchange, key string) error {
- return r.Channel.Publish(
- exchange,
- key,
- r.mandatory,
- r.immediate,
- amqp.Publishing{
- ContentType: "text/json",
- Body: msg,
- },
- )
- }
|