This repository has been archived by the owner on Apr 14, 2022. It is now read-only.
forked from asticode/go-astiamqp
-
Notifications
You must be signed in to change notification settings - Fork 0
/
consumer.go
130 lines (114 loc) · 3.74 KB
/
consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package astiamqp
import (
"context"
"strconv"
"sync"
"github.com/molotovtv/go-astilog"
"github.com/pkg/errors"
"github.com/streadway/amqp"
)
// Consumer represents a Consumer
type Consumer struct {
cancel context.CancelFunc
ctx context.Context
configuration ConfigurationConsumer
handlingDeliveries bool
tag string
wg *sync.WaitGroup
}
// AddConsumer adds a consumer
func (a *AMQP) AddConsumer(c ConfigurationConsumer) (err error) {
// Lock
a.mc.Lock()
defer a.mc.Unlock()
// Create consumer
a.consumerId++
var csm = &Consumer{
configuration: c,
tag: strconv.Itoa(a.consumerId),
wg: &sync.WaitGroup{},
}
csm.ctx, csm.cancel = context.WithCancel(a.ctx)
// Set up consumer
if err = a.setupConsumer(csm); err != nil {
err = errors.Wrapf(err, "astiamqp: setting up consumer %+v failed", csm)
return
}
// Append consumer
a.consumers = append(a.consumers, csm)
return
}
func (a *AMQP) setupConsumer(c *Consumer) (err error) {
// Stop handling deliveries
if c.handlingDeliveries {
c.cancel()
c.wg.Wait()
}
// Reset context
c.ctx, c.cancel = context.WithCancel(a.ctx)
// Declare exchange
if err = a.declareExchange(c.configuration.Exchange); err != nil {
err = errors.Wrapf(err, "astiamqp: declaring exchange %+v failed", c.configuration.Exchange)
return
}
// Declare queue
if err = a.declareQueue(c.configuration.Queue); err != nil {
err = errors.Wrapf(err, "astiamqp: declaring queue %+v failed", c.configuration.Queue)
return
}
// Bind queue
if err = a.bindQueue(c.configuration.Queue, c.configuration.Exchange, c.configuration.RoutingKey); err != nil {
err = errors.Wrapf(err, "astiamqp: binding queue %+v to exchange %+v for routing key %s failed", c.configuration.Queue, c.configuration.Exchange, c.configuration.RoutingKey)
return
}
// Consume
var deliveries <-chan amqp.Delivery
if deliveries, err = a.consume(c); err != nil {
err = errors.Wrapf(err, "astiamqp: consuming on consumer %+v failed", c.configuration)
return
}
// Handle deliveries
astilog.Debugf("astiamqp: handling deliveries of consumer %s on queue %s", c.tag, c.configuration.Queue.Name)
go func() {
// Handle waiting groups
c.handlingDeliveries = true
a.wg.Add(1)
c.wg.Add(1)
defer func() {
a.wg.Done()
c.wg.Done()
}()
// Loop
for {
select {
case d := <-deliveries:
if d.DeliveryTag > 0 {
astilog.Debugf("astiamqp: received body %s on routing key %s, queue %s and exchange %s", string(d.Body), d.RoutingKey, c.configuration.Queue.Name, c.configuration.Exchange.Name)
if err = c.configuration.Handler(d.Body, d.RoutingKey, acknowledger{deliveryTag: d.DeliveryTag, acknowledger: d.Acknowledger}); err != nil {
astilog.Error(errors.Wrapf(err, "astiamqp: handling body %s on routing key %s, queue %s and exchange %s", string(d.Body), d.RoutingKey, c.configuration.Queue.Name, c.configuration.Exchange.Name))
}
}
case <-c.ctx.Done():
astilog.Debugf("astiamqp: stopping handling deliveries for consumer %s", c.tag)
return
}
}
}()
return
}
func (a *AMQP) consume(c *Consumer) (deliveries <-chan amqp.Delivery, err error) {
astilog.Debugf("astiamqp: consuming on queue %s with consumer %s", c.configuration.Queue.Name, c.tag)
if deliveries, err = a.channel.Consume(
c.configuration.Queue.Name, // queue
c.tag, // consumer
c.configuration.AutoAck, // auto-ack
c.configuration.Exclusive, // exclusive
c.configuration.NoLocal, // no-local
c.configuration.NoWait, // no-wait
amqp.Table(c.configuration.Arguments), // args
); err != nil {
err = errors.Wrapf(err, "astiamqp: consuming on consumer %+v failed", c.configuration)
return
}
return
}