Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adding AMQP heartbeat/retry/retrydelay support #549

Merged
merged 2 commits into from
Jun 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 14 additions & 11 deletions cfg/cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,17 +140,20 @@ type Rewriter struct {
}

type Amqp struct {
Amqp_enabled bool
Amqp_host string
Amqp_port int
Amqp_vhost string
Amqp_user string
Amqp_password string
Amqp_exchange string
Amqp_queue string
Amqp_key string
Amqp_durable bool
Amqp_exclusive bool
Amqp_enabled bool
Amqp_host string
Amqp_port int
Amqp_vhost string
Amqp_user string
Amqp_password string
Amqp_exchange string
Amqp_queue string
Amqp_key string
Amqp_durable bool
Amqp_exclusive bool
Amqp_heartbeat int
Amqp_retry bool
Amqp_retrydelay int
}

type Init struct {
Expand Down
3 changes: 3 additions & 0 deletions examples/carbon-relay-ng.ini
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ amqp_queue = ""
amqp_key = "#"
amqp_durable = false
amqp_exclusive = true
amqp_heartbeat = 70
amqp_retry = true
amqp_retrydelay = 30

# Aggregators
# See https://github.com/grafana/carbon-relay-ng/blob/master/docs/config.md#Aggregators
Expand Down
49 changes: 46 additions & 3 deletions input/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,55 @@ type amqpConnector func(a *Amqp) error
// AMQPConnector connects using the given configuration
func AMQPConnector(a *Amqp) error {
log.Infof("dialing AMQP: %v", a.uri)
conn, err := amqp.Dial(a.uri.String())
if err != nil {
return err

var conn *amqp.Connection
var err error

for {
config := amqp.Config{
Heartbeat: time.Duration(a.config.Amqp.Amqp_heartbeat) * time.Second,
}

conn, err = amqp.DialConfig(a.uri.String(), config)
if err == nil {
log.Printf("Successfully connected to AMQP server: %v.", a.uri)
break
}

if !a.config.Amqp.Amqp_retry {
return err
}

log.Errorf("Failed to connect to AMQP server: %v. Retrying in %d seconds...", err, a.config.Amqp.Amqp_retrydelay)
time.Sleep(time.Duration(a.config.Amqp.Amqp_retrydelay) * time.Second)
}

a.conn = conn

// Create a channel to receive close notifications from the connection
closeCh := make(chan *amqp.Error)
conn.NotifyClose(closeCh)

// Start a goroutine to monitor the connection state
go func() {
for {
select {
case <-closeCh:
log.Println("AMQP connection closed.")

if !a.config.Amqp.Amqp_retry {
log.Println("Retry is disabled. Exiting reconnection attempt.")
return
}

log.Println("Attempting to reconnect...")

AMQPConnector(a)
return
}
}
}()

amqpChan, err := conn.Channel()
if err != nil {
a.conn.Close()
Expand Down