diff --git a/internal/eventstream/client.go b/internal/eventstream/client.go index e8d1fc5df..fe510bb98 100644 --- a/internal/eventstream/client.go +++ b/internal/eventstream/client.go @@ -13,10 +13,10 @@ import ( "github.com/icinga/icingadb/pkg/icingadb" "github.com/icinga/icingadb/pkg/logging" "go.uber.org/zap" + "golang.org/x/sync/errgroup" "net/http" "net/url" "os" - "sync" "sync/atomic" "time" ) @@ -323,25 +323,29 @@ func (client *Client) enterReplayPhase() { return } - queryFns := []func(string){client.checkMissedAcknowledgements, client.checkMissedStateChanges} + queryFns := []func(string, context.Context) error{client.checkMissedAcknowledgements, client.checkMissedStateChanges} objTypes := []string{"host", "service"} - var replayWg sync.WaitGroup - replayWg.Add(len(queryFns) * len(objTypes)) - + group, groupCtx := errgroup.WithContext(client.Ctx) for _, fn := range queryFns { for _, objType := range objTypes { - go func(fn func(string), objType string) { - fn(objType) - replayWg.Done() - }(fn, objType) + fn, objType := fn, objType // https://go.dev/doc/faq#closures_and_goroutines + group.Go(func() error { + return fn(objType, groupCtx) + }) } } go func() { startTime := time.Now() - replayWg.Wait() - client.Logger.Debugw("All replay phase workers have finished", zap.Duration("duration", time.Since(startTime))) + + err := group.Wait() + if err != nil { + client.Logger.Errorw("Replaying the API resulted in errors", zap.Error(err), zap.Duration("duration", time.Since(startTime))) + } else { + client.Logger.Debugw("All replay phase workers have finished", zap.Duration("duration", time.Since(startTime))) + } + client.replayTrigger <- struct{}{} }() } diff --git a/internal/eventstream/client_api.go b/internal/eventstream/client_api.go index aaad44832..d1ba09ad6 100644 --- a/internal/eventstream/client_api.go +++ b/internal/eventstream/client_api.go @@ -178,42 +178,37 @@ func (client *Client) fetchAcknowledgementComment(host, service string, ackTime // If a filterExpr is given (non-empty string), it will be used for the query. Otherwise, all objects will be requested. // // The callback function will be called f.e. object of the objType (i.e. "host" or "service") being retrieved from the -// Icinga 2 Objects API. The callback function or a later caller must decide if this object should be replayed. -func (client *Client) checkMissedChanges(objType, filterExpr string, attrsCallbackFn func(attrs HostServiceRuntimeAttributes, host, service string)) { - var ( - logger = client.Logger.With(zap.String("object type", objType)) - - jsonRaw io.ReadCloser - err error - ) +// Icinga 2 Objects API sequentially. The callback function or a later caller decides if this object should be replayed. +func (client *Client) checkMissedChanges( + objType, filterExpr string, + attrsCallbackFn func(attrs HostServiceRuntimeAttributes, host, service string) error, +) (err error) { + logger := client.Logger.With(zap.String("object type", objType), zap.String("filter expr", filterExpr)) + + defer func() { + if err != nil { + logger.Errorw("Querying API for replay failed", zap.Error(err)) + } + }() + + var jsonRaw io.ReadCloser if filterExpr == "" { jsonRaw, err = client.queryObjectsApiDirect(objType, "") } else { jsonRaw, err = client.queryObjectsApiQuery(objType, map[string]any{"filter": filterExpr}) } if err != nil { - logger.Errorw("Querying API failed", zap.Error(err)) return } objQueriesResults, err := extractObjectQueriesResult[HostServiceRuntimeAttributes](jsonRaw) if err != nil { - logger.Errorw("Parsing API response failed", zap.Error(err)) - return - } - - if len(objQueriesResults) == 0 { return } logger.Debugw("Querying API resulted in state changes", zap.Int("changes", len(objQueriesResults))) for _, objQueriesResult := range objQueriesResults { - if client.Ctx.Err() != nil { - logger.Warnw("Stopping API response processing as context is finished", zap.Error(client.Ctx.Err())) - return - } - var hostName, serviceName string switch objQueriesResult.Type { case "Host": @@ -224,29 +219,31 @@ func (client *Client) checkMissedChanges(objType, filterExpr string, attrsCallba serviceName = objQueriesResult.Attrs.Name default: - logger.Errorw("Querying API delivered a wrong object type", zap.String("result type", objQueriesResult.Type)) - continue + err = fmt.Errorf("querying API delivered a wrong object type %q", objQueriesResult.Type) + return } - attrsCallbackFn(objQueriesResult.Attrs, hostName, serviceName) + err = attrsCallbackFn(objQueriesResult.Attrs, hostName, serviceName) + if err != nil { + return + } } + return } // checkMissedStateChanges fetches all objects of the requested type and feeds them into the handler. -func (client *Client) checkMissedStateChanges(objType string) { - client.checkMissedChanges(objType, "", func(attrs HostServiceRuntimeAttributes, host, service string) { - logger := client.Logger.With(zap.String("object type", objType)) - +func (client *Client) checkMissedStateChanges(objType string, ctx context.Context) error { + return client.checkMissedChanges(objType, "", func(attrs HostServiceRuntimeAttributes, host, service string) error { ev, err := client.buildHostServiceEvent(attrs.LastCheckResult, attrs.State, host, service) if err != nil { - logger.Errorw("Failed to construct Event from API", zap.Error(err)) - return + return fmt.Errorf("failed to construct Event from API, %w", err) } select { - case <-client.Ctx.Done(): - logger.Warnw("Cannot dispatch replayed event as context is finished", zap.Error(client.Ctx.Err())) + case <-ctx.Done(): + return ctx.Err() case client.eventDispatcherReplay <- &eventMsg{ev, attrs.LastStateChange.Time}: + return nil } }) } @@ -254,27 +251,24 @@ func (client *Client) checkMissedStateChanges(objType string) { // checkMissedAcknowledgements fetches all Host or Service Acknowledgements and feeds them into the handler. // // Currently only active acknowledgements are being processed. -func (client *Client) checkMissedAcknowledgements(objType string) { +func (client *Client) checkMissedAcknowledgements(objType string, ctx context.Context) error { filterExpr := fmt.Sprintf("%s.acknowledgement", objType) - client.checkMissedChanges(objType, filterExpr, func(attrs HostServiceRuntimeAttributes, host, service string) { - logger := client.Logger.With(zap.String("object type", objType)) - + return client.checkMissedChanges(objType, filterExpr, func(attrs HostServiceRuntimeAttributes, host, service string) error { ackComment, err := client.fetchAcknowledgementComment(host, service, attrs.AcknowledgementLastChange.Time) if err != nil { - logger.Errorw("Cannot fetch ACK Comment for Acknowledgement", zap.Error(err)) - return + return fmt.Errorf("cannot fetch ACK Comment for Acknowledgement, %w", err) } ev, err := client.buildAcknowledgementEvent(host, service, ackComment.Author, ackComment.Text) if err != nil { - logger.Errorw("Failed to construct Event from Acknowledgement API", zap.Error(err)) - return + return fmt.Errorf("failed to construct Event from Acknowledgement API, %w", err) } select { - case <-client.Ctx.Done(): - logger.Warnw("Cannot dispatch replayed event as context is finished", zap.Error(client.Ctx.Err())) + case <-ctx.Done(): + return ctx.Err() case client.eventDispatcherReplay <- &eventMsg{ev, attrs.AcknowledgementLastChange.Time}: + return nil } }) }