rabitmq.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. package RabbitMQ
  2. import (
  3. "github.com/streadway/amqp"
  4. "log"
  5. )
  6. func NewRabbitMQ(MQUrl string) RabbitMQ {
  7. var err error
  8. //创建链接
  9. conn, err := amqp.Dial(MQUrl)
  10. if err != nil {
  11. log.Printf("Error: %v\n", err)
  12. panic(err)
  13. }
  14. //创建RabbitMQ实例
  15. rabbitmq := RabbitMQ{
  16. Conn: conn,
  17. }
  18. //获取channel
  19. rabbitmq.ConsumerChannel, err = rabbitmq.Conn.Channel()
  20. if err != nil {
  21. log.Printf("Error: %v\n", err)
  22. panic(err)
  23. }
  24. rabbitmq.PublisherChannel, err = rabbitmq.Conn.Channel()
  25. if err != nil {
  26. log.Printf("Error: %v\n", err)
  27. panic(err)
  28. }
  29. return rabbitmq
  30. }
  31. // NewQueue 新建队列
  32. func (r *RabbitMQ) NewQueue(QueueName string, opts ...Opt) {
  33. for _, opt := range opts {
  34. opt()
  35. }
  36. _, err := r.Channel.QueueDeclare(
  37. QueueName,
  38. r.QueueConfig.durable,
  39. r.QueueConfig.autoDelete,
  40. r.QueueConfig.exclusive,
  41. r.QueueConfig.noWait,
  42. r.QueueConfig.args,
  43. )
  44. if err != nil {
  45. log.Println(err)
  46. }
  47. }
  48. // NewExchange 新建交换机
  49. func (r *RabbitMQ) NewExchange(Exchange string, kind string, opts ...Opt) {
  50. for _, opt := range opts {
  51. opt()
  52. }
  53. //1.申请交换机
  54. err := r.Channel.ExchangeDeclare(
  55. //交换机名
  56. Exchange,
  57. //交换机类型
  58. kind,
  59. //是否持久化
  60. r.ExchangeConfig.durable,
  61. //是否自动删除
  62. r.ExchangeConfig.autoDelete,
  63. //是否具有排他性
  64. r.ExchangeConfig.internal,
  65. //是否阻塞处理
  66. r.ExchangeConfig.noWait,
  67. //额外的属性
  68. r.ExchangeConfig.args,
  69. )
  70. if err != nil {
  71. log.Println(err)
  72. panic(err)
  73. }
  74. }
  75. // Close 关闭channel和connection
  76. func (r *RabbitMQ) Close() {
  77. _ = r.Channel.Close()
  78. _ = r.Conn.Close()
  79. }
  80. func (r *RabbitMQ) QueueBind(QueueName, Exchange, Key string, opts ...Opt) error {
  81. for _, opt := range opts {
  82. opt()
  83. }
  84. return r.Channel.QueueBind(QueueName, Key, Exchange, r.QueueConfig.noWait, r.QueueConfig.args)
  85. }
  86. func (r *RabbitMQ) Consume(queueName, consumer string, opts ...Opt) (<-chan amqp.Delivery, error) {
  87. for _, opt := range opts {
  88. opt()
  89. }
  90. return r.Channel.Consume(
  91. queueName, // queue
  92. //用来区分多个消费者
  93. consumer, // consumer
  94. //是否自动应答
  95. r.autoAck, // auto-ack
  96. //是否独有
  97. r.exclusive, // exclusive
  98. //设置为true,表示 不能将同一个Conenction中生产者发送的消息传递给这个Connection中 的消费者
  99. r.noLocal, // no-local
  100. //列是否阻塞
  101. r.noWait, // no-wait
  102. r.args, // args
  103. )
  104. }
  105. func (r *RabbitMQ) Pub(msg []byte, exchange, key string) error {
  106. return r.Channel.Publish(
  107. exchange,
  108. key,
  109. r.mandatory,
  110. r.immediate,
  111. amqp.Publishing{
  112. ContentType: "text/json",
  113. Body: msg,
  114. },
  115. )
  116. }