diff --git a/plugins/outputs/event_hubs/event_hubs.go b/plugins/outputs/event_hubs/event_hubs.go index f7e0695ef7ab2..0dccdc60922c6 100644 --- a/plugins/outputs/event_hubs/event_hubs.go +++ b/plugins/outputs/event_hubs/event_hubs.go @@ -4,6 +4,7 @@ package event_hubs import ( "context" _ "embed" + "fmt" "time" eventhub "github.com/Azure/azure-event-hubs-go/v3" @@ -75,7 +76,6 @@ func (*EventHubs) SampleConfig() string { func (e *EventHubs) Init() error { err := e.Hub.GetHub(e.ConnectionString) - if err != nil { return err } @@ -96,7 +96,6 @@ func (e *EventHubs) Close() error { defer cancel() err := e.Hub.Close(ctx) - if err != nil { return err } @@ -112,7 +111,6 @@ func (e *EventHubs) Write(metrics []telegraf.Metric) error { events := make([]*eventhub.Event, 0, len(metrics)) for _, metric := range metrics { payload, err := e.serializer.Serialize(metric) - if err != nil { e.Log.Debugf("Could not serialize metric: %v", err) continue @@ -136,8 +134,18 @@ func (e *EventHubs) Write(metrics []telegraf.Metric) error { defer cancel() err := e.Hub.SendBatch(ctx, eventhub.NewEventBatchIterator(events...), e.batchOptions...) - if err != nil { + if err.Error() == "amqp: link closed" { + e.Log.Warn("Link is closed, reconnecting and retrying") + if err := e.Close(); err != nil { + return fmt.Errorf("could not close connection: %w", err) + } + err := e.Hub.GetHub(e.ConnectionString) + if err != nil { + return fmt.Errorf("could not reconnect: %w", err) + } + return e.Hub.SendBatch(ctx, eventhub.NewEventBatchIterator(events...), e.batchOptions...) + } return err }