-
Notifications
You must be signed in to change notification settings - Fork 46
/
consumer.go
154 lines (135 loc) · 2.85 KB
/
consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package cony
import (
"fmt"
"os"
"sync"
"github.com/streadway/amqp"
)
// ConsumerOpt is a consumer's functional option type
type ConsumerOpt func(*Consumer)
// Consumer holds definition for AMQP consumer
type Consumer struct {
q *Queue
deliveries chan amqp.Delivery
errs chan error
qos int
tag string
autoAck bool
exclusive bool
noLocal bool
stop chan struct{}
dead bool
m sync.Mutex
}
// Deliveries return deliveries shipped to this consumer
// this channel never closed, even on disconnects
func (c *Consumer) Deliveries() <-chan amqp.Delivery {
return c.deliveries
}
// Errors returns channel with AMQP channel level errors
func (c *Consumer) Errors() <-chan error {
return c.errs
}
// Cancel this consumer.
//
// This will CLOSE Deliveries() channel
func (c *Consumer) Cancel() {
c.m.Lock()
defer c.m.Unlock()
if !c.dead {
close(c.deliveries)
close(c.stop)
c.dead = true
}
}
func (c *Consumer) reportErr(err error) bool {
if err != nil {
select {
case c.errs <- err:
default:
}
return true
}
return false
}
func (c *Consumer) serve(client mqDeleter, ch mqChannel) {
if c.reportErr(ch.Qos(c.qos, 0, false)) {
return
}
deliveries, err2 := ch.Consume(c.q.Name,
c.tag, // consumer tag
c.autoAck, // autoAck,
c.exclusive, // exclusive,
c.noLocal, // noLocal,
false, // noWait,
nil, // args Table
)
if c.reportErr(err2) {
return
}
for {
select {
case <-c.stop:
client.deleteConsumer(c)
ch.Close()
return
case d, ok := <-deliveries: // deliveries will be closed once channel is closed (disconnected from network)
if !ok {
return
}
c.deliveries <- d
}
}
}
// NewConsumer Consumer's constructor
func NewConsumer(q *Queue, opts ...ConsumerOpt) *Consumer {
c := &Consumer{
q: q,
deliveries: make(chan amqp.Delivery),
errs: make(chan error, 100),
stop: make(chan struct{}),
}
for _, o := range opts {
o(c)
}
return c
}
// Qos on channel
func Qos(count int) ConsumerOpt {
return func(c *Consumer) {
c.qos = count
}
}
// Tag the consumer
func Tag(tag string) ConsumerOpt {
return func(c *Consumer) {
c.tag = tag
}
}
// AutoTag set automatically generated tag like this
// fmt.Sprintf(QueueName+"-pid-%d@%s", os.Getpid(), os.Hostname())
func AutoTag() ConsumerOpt {
return func(c *Consumer) {
host, _ := os.Hostname()
tag := fmt.Sprintf(c.q.Name+"-pid-%d@%s", os.Getpid(), host)
Tag(tag)(c)
}
}
// AutoAck set this consumer in AutoAck mode
func AutoAck() ConsumerOpt {
return func(c *Consumer) {
c.autoAck = true
}
}
// Exclusive set this consumer in exclusive mode
func Exclusive() ConsumerOpt {
return func(c *Consumer) {
c.exclusive = true
}
}
// NoLocal set this consumer in NoLocal mode.
func NoLocal() ConsumerOpt {
return func(c *Consumer) {
c.noLocal = true
}
}