entity.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. package RabbitMQ
  2. import "github.com/streadway/amqp"
  3. // rabbitMQ结构体
  4. type RabbitMQ struct {
  5. Conn *amqp.Connection
  6. ConsumerChannel *amqp.Channel
  7. PublisherChannel *amqp.Channel
  8. QueueConfig QueueConfig
  9. ExchangeConfig ExchangeConfig
  10. Consumer
  11. Publisher
  12. }
  13. type Opt func()
  14. type Consumer struct {
  15. autoAck bool //是否自动应答
  16. exclusive bool //是否独有
  17. noLocal bool // //设置为true,表示 不能将同一个Conenction中生产者发送的消息传递给这个Connection中 的消费者
  18. noWait bool //列是否阻塞
  19. args amqp.Table // 额外的属性
  20. }
  21. // AutoAck 自动应答
  22. func (c *Consumer) AutoAck() Opt {
  23. return func() {
  24. c.autoAck = true
  25. }
  26. }
  27. // Exclusive 独有
  28. func (c *Consumer) Exclusive() Opt {
  29. return func() {
  30. c.exclusive = true
  31. }
  32. }
  33. // NoLocal 设置为true,表示不能将同一个Conenction中生产者发送的消息传递给这个Connection中的消费者
  34. func (c *Consumer) NoLocal() Opt {
  35. return func() {
  36. c.noLocal = true
  37. }
  38. }
  39. // NoWait 是否阻塞
  40. func (c *Consumer) NoWait() Opt {
  41. return func() {
  42. c.noWait = true
  43. }
  44. }
  45. type Publisher struct {
  46. mandatory bool //
  47. immediate bool //
  48. }
  49. // Mandatory 是否将无法路由到任何队列的消息返回
  50. func (p *Publisher) Mandatory() Opt {
  51. return func() {
  52. p.mandatory = true
  53. }
  54. }
  55. // Immediate 当exchange发送消息到队列后发现队列上没有消费者,则会把消息返还给发送者
  56. func (p *Publisher) Immediate() Opt {
  57. return func() {
  58. p.immediate = true
  59. }
  60. }
  61. // 配置
  62. type QueueConfig struct {
  63. durable bool // 是否持久化
  64. autoDelete bool // 是否自动删除
  65. exclusive bool // 是否独占连接
  66. noWait bool // 是否阻塞处理
  67. args amqp.Table // 额外的属性
  68. }
  69. // Durable 队列持久化
  70. func (c *QueueConfig) Durable() Opt {
  71. return func() {
  72. c.durable = true
  73. }
  74. }
  75. // AutoDelete 队列自动删除
  76. func (c *QueueConfig) AutoDelete() Opt {
  77. return func() {
  78. c.autoDelete = true
  79. }
  80. }
  81. // Exclusive 队列独占连接
  82. func (c *QueueConfig) Exclusive() Opt {
  83. return func() {
  84. c.exclusive = true
  85. }
  86. }
  87. // NoWait 是否阻塞
  88. func (c *QueueConfig) NoWait() Opt {
  89. return func() {
  90. c.noWait = true
  91. }
  92. }
  93. // Args 额外的属性
  94. func (c *QueueConfig) Args(args amqp.Table) Opt {
  95. return func() {
  96. c.args = args
  97. }
  98. }
  99. type ExchangeConfig struct {
  100. durable bool //是否持久化
  101. autoDelete bool //是否自动删除
  102. internal bool // 是否内部处理
  103. noWait bool //是否阻塞处理
  104. args amqp.Table //额外的属性
  105. }
  106. // Durable 交换机持久化
  107. func (c ExchangeConfig) Durable() Opt {
  108. return func() {
  109. c.durable = true
  110. }
  111. }
  112. // AutoDelete 交换机自动删除
  113. func (c *ExchangeConfig) AutoDelete() Opt {
  114. return func() {
  115. c.autoDelete = true
  116. }
  117. }
  118. // Internal 是否内部处理
  119. func (c *ExchangeConfig) Internal() Opt {
  120. return func() {
  121. c.internal = true
  122. }
  123. }
  124. // NoWait 是否阻塞
  125. func (c *ExchangeConfig) NoWait() Opt {
  126. return func() {
  127. c.noWait = true
  128. }
  129. }
  130. // Args 额外的属性
  131. func (c *ExchangeConfig) Args(args amqp.Table) Opt {
  132. return func() {
  133. c.args = args
  134. }
  135. }