From fc0236c7b03909203ca53739cd8056fb80bbfeda Mon Sep 17 00:00:00 2001 From: Alvar Penning Date: Tue, 19 Mar 2024 10:51:50 +0100 Subject: [PATCH] icinga2: Client fixes - Rate limit catch-up-phase worker start. In case of a network disruption during the catch-up-phase, this will result in an error and infinite retries. Those, however, might result in lots of useless logging, which can be rate limited. - Remove the both useless and broken catchupEventCh drainage logic. All sends are being protected by context checks. - Abort early on errors received from the catchupEventCh and don't store them for later. --- internal/icinga2/client.go | 57 ++++++++++++++++++++------------------ 1 file changed, 30 insertions(+), 27 deletions(-) diff --git a/internal/icinga2/client.go b/internal/icinga2/client.go index ae88669cb..2cb417a8a 100644 --- a/internal/icinga2/client.go +++ b/internal/icinga2/client.go @@ -6,6 +6,7 @@ import ( "github.com/icinga/icinga-notifications/internal/event" "go.uber.org/zap" "golang.org/x/sync/errgroup" + "math" "net/http" "net/url" "time" @@ -21,8 +22,8 @@ type eventMsg struct { // catchupEventMsg propagates either an eventMsg or an error back from the catch-up worker. // -// The type must be used as a sum-type like data structure holding either an eventMsg pointer or an error. The error -// should have a higher precedence than the eventMsg. +// The type must be used as a sum-type like data structure holding either an error or an eventMsg pointer. The error has +// a higher precedence than the eventMsg. type catchupEventMsg struct { *eventMsg error @@ -196,7 +197,10 @@ func (client *Client) buildAcknowledgementEvent(ctx context.Context, host, servi // // Those workers honor a context derived from the Client.Ctx and would either stop when this context is done or when the // context.CancelFunc is called. -func (client *Client) startCatchupWorkers() (chan *catchupEventMsg, context.CancelFunc) { +// +// The startup time might be delayed through the parameter. This lets the goroutines sleep to rate-limit reconnection +// attempts during network hiccups. +func (client *Client) startCatchupWorkers(delay time.Duration) (chan *catchupEventMsg, context.CancelFunc) { startTime := time.Now() catchupEventCh := make(chan *catchupEventMsg) @@ -208,6 +212,12 @@ func (client *Client) startCatchupWorkers() (chan *catchupEventMsg, context.Canc for _, objType := range objTypes { objType := objType // https://go.dev/doc/faq#closures_and_goroutines group.Go(func() error { + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(delay): + } + err := client.checkMissedChanges(groupCtx, objType, catchupEventCh) if err != nil { client.Logger.Errorw("Catch-up-phase event worker failed", zap.String("object type", objType), zap.Error(err)) @@ -261,8 +271,9 @@ func (client *Client) worker() { // catchupCache maps event.Events.Name to API time to skip replaying outdated events. catchupCache = make(map[string]time.Time) - // catchupErr might hold an error received from catchupEventCh, indicating another catch-up-phase run. - catchupErr error + // catchupFailCounter indicates how many prior catch-up-phase attempts have failed. It will be used to + // rate limit catch-up-phase restarts. + catchupFailCounter int ) // catchupReset resets all catchup variables to their initial empty state. @@ -270,7 +281,6 @@ func (client *Client) worker() { catchupEventCh, catchupCancel = nil, nil catchupBuffer = make([]*event.Event, 0) catchupCache = make(map[string]time.Time) - catchupErr = nil } // catchupCacheUpdate updates the catchupCache if this eventMsg seems to be the latest of its kind. @@ -290,18 +300,13 @@ func (client *Client) worker() { case <-client.catchupPhaseRequest: if catchupEventCh != nil { client.Logger.Warn("Switching to catch-up-phase was requested while already catching up, restarting phase") - - // Drain the old catch-up-phase producer channel until it is closed as its context will be canceled. - go func(catchupEventCh chan *catchupEventMsg) { - for _, ok := <-catchupEventCh; ok; { - } - }(catchupEventCh) catchupCancel() } client.Logger.Info("Worker enters catch-up-phase, start caching up on Event Stream events") catchupReset() - catchupEventCh, catchupCancel = client.startCatchupWorkers() + catchupEventCh, catchupCancel = client.startCatchupWorkers( + min(3*time.Minute, time.Duration(math.Exp2(float64(catchupFailCounter)))*time.Second-time.Second)) case catchupMsg, ok := <-catchupEventCh: // Process an incoming event @@ -311,9 +316,17 @@ func (client *Client) worker() { break } - // Store an incoming error as the catchupErr to be processed below + // Abort and restart the catch-up-phase when receiving an error. if ok && catchupMsg.error != nil { - catchupErr = catchupMsg.error + client.Logger.Warnw("Worker leaves catch-up-phase with an error, another attempt will be made", zap.Error(catchupMsg.error)) + go func() { + select { + case <-client.Ctx.Done(): + case client.catchupPhaseRequest <- struct{}{}: + } + }() + catchupReset() + catchupFailCounter++ break } @@ -336,19 +349,9 @@ func (client *Client) worker() { break } - if catchupErr != nil { - client.Logger.Warnw("Worker leaves catch-up-phase with an error, another attempt will be made", zap.Error(catchupErr)) - go func() { - select { - case <-client.Ctx.Done(): - case client.catchupPhaseRequest <- struct{}{}: - } - }() - } else { - client.Logger.Info("Worker leaves catch-up-phase, returning to normal operation") - } - + client.Logger.Info("Worker leaves catch-up-phase, returning to normal operation") catchupReset() + catchupFailCounter = 0 case ev := <-client.eventDispatcherEventStream: // During catch-up-phase, buffer Event Stream events