diff --git a/internal/daemon/config.go b/internal/daemon/config.go index 72e9742d..ea188020 100644 --- a/internal/daemon/config.go +++ b/internal/daemon/config.go @@ -101,3 +101,20 @@ func ParseFlagsAndConfig() { utils.PrintErrorThenExit(err, ExitFailure) } } + +// InitTestConfig initialises the global daemon config instance and applies the defaults. +// This should be used for unit tests only. +func InitTestConfig() error { + daemonConfig = new(ConfigFile) + if err := defaults.Set(daemonConfig); err != nil { + return err + } + if err := defaults.Set(&daemonConfig.Database); err != nil { + return err + } + if err := defaults.Set(&daemonConfig.Logging); err != nil { + return err + } + + return nil +} diff --git a/internal/events/events.go b/internal/events/events.go new file mode 100644 index 00000000..33803fdb --- /dev/null +++ b/internal/events/events.go @@ -0,0 +1,29 @@ +package events + +import ( + "context" + "github.com/icinga/icinga-go-library/database" + "github.com/icinga/icinga-go-library/logging" + "github.com/icinga/icinga-notifications/internal/config" + "github.com/icinga/icinga-notifications/internal/event" + "github.com/icinga/icinga-notifications/internal/notification" +) + +// Process processes the specified event.Event. +// +// Please note that this function is the only way to access the internal events.router type. +// +// The returned error might be wrapped around event.ErrSuperfluousStateChange. +func Process(ctx context.Context, db *database.DB, logs *logging.Logging, rc *config.RuntimeConfig, ev *event.Event) error { + r := &router{ + logs: logs, + Evaluable: config.NewEvaluable(), + Notifier: notification.Notifier{ + DB: db, + RuntimeConfig: rc, + Logger: logs.GetChildLogger("routing").SugaredLogger, + }, + } + + return r.route(ctx, ev) +} diff --git a/internal/events/events_test.go b/internal/events/events_test.go new file mode 100644 index 00000000..6526fd21 --- /dev/null +++ b/internal/events/events_test.go @@ -0,0 +1,156 @@ +package events + +import ( + "context" + "github.com/icinga/icinga-go-library/logging" + "github.com/icinga/icinga-go-library/types" + "github.com/icinga/icinga-notifications/internal/config" + "github.com/icinga/icinga-notifications/internal/daemon" + "github.com/icinga/icinga-notifications/internal/event" + "github.com/icinga/icinga-notifications/internal/incident" + "github.com/icinga/icinga-notifications/internal/object" + "github.com/icinga/icinga-notifications/internal/testutils" + "github.com/icinga/icinga-notifications/internal/utils" + "github.com/jmoiron/sqlx" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zapcore" + "go.uber.org/zap/zaptest" + "testing" + "time" +) + +func TestProcess(t *testing.T) { + ctx := context.Background() + db := testutils.GetTestDB(ctx, t) + + require.NoError(t, daemon.InitTestConfig(), "mocking daemon.Config should not fail") + + // Insert a dummy source for our test cases! + source := config.Source{Type: "notifications", Name: "Icinga Notifications", Icinga2InsecureTLS: types.Bool{Bool: false, Valid: true}} + source.ChangedAt = types.UnixMilli(time.Now()) + source.Deleted = types.Bool{Bool: false, Valid: true} + + err := utils.RunInTx(ctx, db, func(tx *sqlx.Tx) error { + id, err := utils.InsertAndFetchId(ctx, tx, utils.BuildInsertStmtWithout(db, source, "id"), source) + require.NoError(t, err, "populating source table should not fail") + + source.ID = id + return nil + }) + require.NoError(t, err, "utils.RunInTx() should not fail") + + logs, err := logging.NewLogging("events-router", zapcore.DebugLevel, "console", nil, time.Hour) + require.NoError(t, err, "logging initialisation should not fail") + + runtimeConfig := new(config.RuntimeConfig) + + t.Run("InvalidEvents", func(t *testing.T) { + assert.Nil(t, Process(ctx, db, logs, runtimeConfig, makeEvent(t, source.ID, event.TypeState, event.SeverityNone))) + assert.ErrorIs(t, Process(ctx, db, logs, runtimeConfig, makeEvent(t, source.ID, event.TypeState, event.SeverityOK)), event.ErrSuperfluousStateChange) + assert.ErrorIs(t, Process(ctx, db, logs, runtimeConfig, makeEvent(t, source.ID, event.TypeAcknowledgementSet, event.SeverityOK)), event.ErrSuperfluousStateChange) + assert.ErrorIs(t, Process(ctx, db, logs, runtimeConfig, makeEvent(t, source.ID, event.TypeAcknowledgementCleared, event.SeverityOK)), event.ErrSuperfluousStateChange) + }) + + t.Run("StateChangeEvents", func(t *testing.T) { + states := map[string]*event.Event{ + "crit": makeEvent(t, source.ID, event.TypeState, event.SeverityCrit), + "warn": makeEvent(t, source.ID, event.TypeState, event.SeverityWarning), + "err": makeEvent(t, source.ID, event.TypeState, event.SeverityErr), + "alert": makeEvent(t, source.ID, event.TypeState, event.SeverityAlert), + } + + for severity, ev := range states { + assert.NoErrorf(t, Process(ctx, db, logs, runtimeConfig, ev), "state event with severity %q should open an incident", severity) + assert.ErrorIsf(t, Process(ctx, db, logs, runtimeConfig, ev), event.ErrSuperfluousStateChange, + "superfluous state event %q should be ignored", severity) + + obj := object.GetFromCache(object.ID(source.ID, ev.Tags)) + require.NotNil(t, obj, "there should be a cached object") + + i, err := incident.GetCurrent(ctx, db, obj, logs.GetLogger(), runtimeConfig, false) + require.NoError(t, err, "retrieving current incident should not fail") + require.NotNil(t, i, "there should be a cached incident") + assert.Equal(t, ev.Severity, i.Severity, "severities should be equal") + } + + reloadIncidents := func(ctx context.Context) { + object.ClearCache() + + // Remove all existing incidents from the cache, as they are indexed with the + // pointer of their object, which is going to change! + for _, i := range incident.GetCurrentIncidents() { + incident.RemoveCurrent(i.Object) + } + + // The incident loading process may hang due to unknown bugs or semaphore lock waits. + // Therefore, give it maximum time of 10s to finish normally, otherwise give up and fail. + ctx, cancelFunc := context.WithDeadline(ctx, time.Now().Add(10*time.Second)) + defer cancelFunc() + + err := incident.LoadOpenIncidents(ctx, db, logging.NewLogger(zaptest.NewLogger(t).Sugar(), time.Hour), runtimeConfig) + require.NoError(t, err, "loading active incidents should not fail") + } + reloadIncidents(ctx) + + for severity, ev := range states { + obj, err := object.FromEvent(ctx, db, ev) + assert.NoError(t, err) + + i, err := incident.GetCurrent(ctx, db, obj, logs.GetLogger(), runtimeConfig, false) + assert.NoErrorf(t, err, "incident for event severity %q should be in cache", severity) + + assert.Equal(t, obj, i.Object, "incident and event object should be the same") + assert.Equal(t, i.Severity, ev.Severity, "incident and event severity should be the same") + } + + // Recover the incidents + for _, ev := range states { + ev.Time = time.Now() + ev.Severity = event.SeverityOK + + assert.NoErrorf(t, Process(ctx, db, logs, runtimeConfig, ev), "state event with severity %q should close an incident", "ok") + } + reloadIncidents(ctx) + assert.Len(t, incident.GetCurrentIncidents(), 0, "there should be no cached incidents") + }) + + t.Run("NonStateEvents", func(t *testing.T) { + events := []*event.Event{ + makeEvent(t, source.ID, event.TypeDowntimeStart, event.SeverityNone), + makeEvent(t, source.ID, event.TypeDowntimeEnd, event.SeverityNone), + makeEvent(t, source.ID, event.TypeDowntimeRemoved, event.SeverityNone), + makeEvent(t, source.ID, event.TypeCustom, event.SeverityNone), + makeEvent(t, source.ID, event.TypeFlappingStart, event.SeverityNone), + makeEvent(t, source.ID, event.TypeFlappingEnd, event.SeverityNone), + } + + for _, ev := range events { + assert.NoErrorf(t, Process(ctx, db, logs, runtimeConfig, ev), "processing non-state event %q should not fail", ev.Type) + assert.Lenf(t, incident.GetCurrentIncidents(), 0, "non-state event %q should not open an incident", ev.Type) + require.NotNil(t, object.GetFromCache(object.ID(source.ID, ev.Tags)), "there should be a cached object") + } + }) +} + +// makeEvent creates a fully initialised event.Event of the given type and severity. +func makeEvent(t *testing.T, sourceID int64, typ string, severity event.Severity) *event.Event { + return &event.Event{ + SourceId: sourceID, + Name: testutils.MakeRandomString(t), + URL: "https://localhost/icingaweb2/icingadb", + Type: typ, + Time: time.Now(), + Severity: severity, + Username: "icingaadmin", + Message: "You will contract a rare disease :(", + Tags: map[string]string{ + "Host": testutils.MakeRandomString(t), + "Service": testutils.MakeRandomString(t), + }, + ExtraTags: map[string]string{ + "hostgroup/database-server": "", + "servicegroup/webserver": "", + }, + } +} diff --git a/internal/events/router.go b/internal/events/router.go new file mode 100644 index 00000000..ce1dde08 --- /dev/null +++ b/internal/events/router.go @@ -0,0 +1,225 @@ +package events + +import ( + "context" + "fmt" + "github.com/icinga/icinga-go-library/logging" + "github.com/icinga/icinga-notifications/internal/config" + "github.com/icinga/icinga-notifications/internal/event" + "github.com/icinga/icinga-notifications/internal/incident" + "github.com/icinga/icinga-notifications/internal/notification" + "github.com/icinga/icinga-notifications/internal/object" + "github.com/icinga/icinga-notifications/internal/rule" + "github.com/icinga/icinga-notifications/internal/utils" + "github.com/jmoiron/sqlx" + "github.com/pkg/errors" + "go.uber.org/zap" +) + +// router dispatches all incoming events to their corresponding handlers and provides a default one if there is none. +// +// You should always use this type to handle events properly and shouldn't try to bypass it +// by accessing other handlers directly. +type router struct { + // notification.Notifier is a helper type used to send notifications. + // It is embedded to allow direct access to its members, such as logger, DB etc. + notification.Notifier + + // config.Evaluable encapsulates all evaluable configuration types, such as rule.Rule, rule.Entry etc. + // It is embedded to enable direct access to its members. + *config.Evaluable + + logs *logging.Logging +} + +// route routes the specified event.Event to its corresponding handler. +// +// This function first constructs the target object.Object and its incident.Incident from the provided event.Event. +// After some safety checks have been carried out, the event is then handed over to the process method. +// +// Returns an error if it fails to successfully route/process the provided event. +func (r *router) route(ctx context.Context, ev *event.Event) error { + var wasObjectMuted bool + if obj := object.GetFromCache(object.ID(ev.SourceId, ev.Tags)); obj != nil { + wasObjectMuted = obj.IsMuted() + } + + obj, err := object.FromEvent(ctx, r.DB, ev) + if err != nil { + r.Logger.Errorw("Failed to generate object from event", zap.Stringer("event", ev), zap.Error(err)) + return err + } + + r.Logger = r.Logger.With(zap.String("object", obj.DisplayName()), zap.Stringer("event", ev)) + + createIncident := ev.Severity != event.SeverityNone && ev.Severity != event.SeverityOK + currentIncident, err := incident.GetCurrent(ctx, r.DB, obj, r.logs.GetChildLogger("incident"), r.RuntimeConfig, createIncident) + if err != nil { + r.Logger.Errorw("Failed to create/determine an incident", zap.Error(err)) + return err + } + + if currentIncident == nil { + switch { + case ev.Severity == event.SeverityNone: + // We need to ignore superfluous mute and unmute events here, as would be the case with an existing + // incident, otherwise the event stream catch-up phase will generate useless events after each + // Icinga 2 reload and overwhelm the database with the very same mute/unmute events. + if wasObjectMuted && ev.Type == event.TypeMute { + return event.ErrSuperfluousMuteUnmuteEvent + } + if !wasObjectMuted && ev.Type == event.TypeUnmute { + return event.ErrSuperfluousMuteUnmuteEvent + } + case ev.Severity == event.SeverityOK: + r.Logger.Debugw("Cannot process OK state event", zap.Int64("source_id", ev.SourceId)) + return errors.Wrapf(event.ErrSuperfluousStateChange, "OK state event from source %d", ev.SourceId) + default: + panic(fmt.Sprintf("cannot process event %v with a non-OK state %v without a known incident", ev, ev.Severity)) + } + } + + return r.process(ctx, obj, ev, currentIncident, wasObjectMuted) +} + +// process processes the provided event and notifies the recipients of the resulting notifications in a non-blocking manner. +// You should be aware, though, that this method might block competing events that refer to the same incident.Incident. +// +// process processes the specified event in an own transaction and rolls back any changes made to the database +// if it returns with an error. However, it should be noted that notifications are triggered outside a database +// transaction initiated after successful event processing and will not undo the changes made by the event processing +// tx if sending the notifications fails. +// +// Returns an error in case of internal processing errors. +func (r *router) process(ctx context.Context, obj *object.Object, ev *event.Event, currentIncident *incident.Incident, wasObjMuted bool) error { + tx, err := r.DB.BeginTxx(ctx, nil) + if err != nil { + r.Logger.Errorw("Failed to start a database transaction", zap.Error(err)) + return err + } + defer func() { _ = tx.Rollback() }() + + if err := ev.Sync(ctx, tx, r.DB, obj.ID); err != nil { + r.Logger.Errorw("Failed to sync an event to the database", zap.Error(err)) + return err + } + + r.RuntimeConfig.RLock() + defer r.RuntimeConfig.RUnlock() + + if currentIncident != nil { + currentIncident.Lock() + defer currentIncident.Unlock() + + if err := currentIncident.ProcessEvent(ctx, tx, ev); err != nil { + return err + } + } + + // EvaluateRules only returns an error if one of the provided callback hooks returns + // an error or the OnError handler returns false, and since none of our callbacks return + // an error nor false, we can safely discard the return value here. + _ = r.EvaluateRules(r.RuntimeConfig, obj, config.EvalOptions[*rule.Rule, any]{ + OnPreEvaluate: func(r *rule.Rule) bool { return r.Type == rule.TypeRouting }, + OnFilterMatch: func(ru *rule.Rule) error { + r.Logger.Infow("Rule matches", zap.Object("rule", ru)) + return nil + }, + OnError: func(ru *rule.Rule, err error) bool { + r.Logger.Warnw("Failed to evaluate non-state rule condition", zap.Object("rule", ru), zap.Error(err)) + return true + }, + }) + + filterContext := &rule.RoutingFilter{EventType: ev.Type} + // EvaluateRuleEntries only returns an error if one of the provided callback hooks returns + // an error or the OnError handler returns false, and since none of our callbacks return an + // error nor false, we can safely discard the return value here. + _ = r.EvaluateRuleEntries(r.RuntimeConfig, filterContext, config.EvalOptions[*rule.Entry, any]{ + OnFilterMatch: func(route *rule.Entry) error { + ru := r.RuntimeConfig.Rules[route.RuleID] + r.Logger.Debugw("Routing condition matches", zap.Object("rule", ru), zap.Object("rule_routing", route)) + return nil + }, + OnError: func(route *rule.Entry, err error) bool { + ru := r.RuntimeConfig.Rules[route.RuleID] + r.Logger.Warnw("Failed to evaluate routing condition", + zap.Object("rule", ru), + zap.Object("rule_routing", route), + zap.Error(err)) + return true + }, + }) + + var incidentID int64 + notifications := make(notification.PendingNotifications) + if currentIncident != nil { + incidentID = currentIncident.ID + notifications, err = currentIncident.GenerateNotifications(ctx, tx, ev, currentIncident.GetRecipientsChannel(ev.Time)) + if err != nil { + r.Logger.Errorw("Failed to generate incident notifications", zap.Error(err)) + return err + } + } + if err := r.generateNotifications(ctx, tx, ev, wasObjMuted && obj.IsMuted(), incidentID, notifications); err != nil { + return err + } + + if err = tx.Commit(); err != nil { + r.Logger.Errorw("Cannot commit database transaction", zap.Error(err)) + return err + } + + if currentIncident != nil { + // We've just committed the DB transaction and can safely update the incident muted flag. + currentIncident.RefreshIsMuted() + return currentIncident.NotifyContacts(ctx, currentIncident.MakeNotificationRequest(ev), notifications) + } + + if err := r.NotifyContacts(ctx, notification.NewPluginRequest(obj, ev), notifications); err != nil { + r.Logger.Errorw("Failed to send all pending notifications", zap.Error(err)) + return err + } + + return nil +} + +// generateNotifications generates non-state notifications and loads them into the provided map. +// +// Returns an error if it fails to persist the generated pending/suppressed notifications to the database. +func (r *router) generateNotifications( + ctx context.Context, tx *sqlx.Tx, ev *event.Event, suppressed bool, incidentID int64, + notifications notification.PendingNotifications, +) error { + for _, route := range r.RuleEntries { + channels := make(rule.ContactChannels) + channels.LoadFromEntryRecipients(route, ev.Time, rule.AlwaysNotifiable) + if len(channels) == 0 { + r.Logger.Warnw("Rule routing expanded to no contacts", + zap.Object("rule_routing", route)) + continue + } + + histories, err := notification.AddNotifications(ctx, r.DB, tx, channels, func(h *notification.History) { + h.RuleEntryID = utils.ToDBInt(route.ID) + h.IncidentID = utils.ToDBInt(incidentID) + h.Message = utils.ToDBString(ev.Message) + if suppressed { + h.NotificationState = notification.StateSuppressed + } + }) + if err != nil { + r.Logger.Errorw("Failed to insert pending notification histories", + zap.Inline(route), zap.Bool("suppressed", suppressed), zap.Error(err)) + return err + } + + if !suppressed { + for contact, entries := range histories { + notifications[contact] = append(notifications[contact], entries...) + } + } + } + + return nil +} diff --git a/internal/icinga2/launcher.go b/internal/icinga2/launcher.go index becc9226..54849160 100644 --- a/internal/icinga2/launcher.go +++ b/internal/icinga2/launcher.go @@ -13,7 +13,7 @@ import ( "github.com/icinga/icinga-notifications/internal/config" "github.com/icinga/icinga-notifications/internal/daemon" "github.com/icinga/icinga-notifications/internal/event" - "github.com/icinga/icinga-notifications/internal/incident" + "github.com/icinga/icinga-notifications/internal/events" "go.uber.org/zap" "net/http" "sync" @@ -129,7 +129,7 @@ func (launcher *Launcher) launch(src *config.Source) { CallbackFn: func(ev *event.Event) { l := logger.With(zap.Stringer("event", ev)) - err := incident.ProcessEvent(subCtx, launcher.Db, launcher.Logs, launcher.RuntimeConfig, ev) + err := events.Process(subCtx, launcher.Db, launcher.Logs, launcher.RuntimeConfig, ev) switch { case errors.Is(err, event.ErrSuperfluousStateChange): l.Debugw("Stopped processing event with superfluous state change", zap.Error(err)) diff --git a/internal/incident/incident.go b/internal/incident/incident.go index 5a399fc8..ade37205 100644 --- a/internal/incident/incident.go +++ b/internal/incident/incident.go @@ -78,6 +78,12 @@ func (i *Incident) String() string { return fmt.Sprintf("#%d", i.ID) } +// RefreshIsMuted refreshes the current incident isMuted flag. +// Please note that you always have to call this method while holding the incident lock. +func (i *Incident) RefreshIsMuted() { + i.isMuted = i.Object.IsMuted() +} + func (i *Incident) HasManager() bool { for recipientKey, state := range i.Recipients { if i.RuntimeConfig.GetRecipient(recipientKey) == nil { @@ -104,14 +110,16 @@ func (i *Incident) IsNotifiable(role ContactRole) bool { return role > RoleRecipient } -// ProcessEvent processes the given event for the current incident in an own transaction. -func (i *Incident) ProcessEvent(ctx context.Context, ev *event.Event) error { - i.Lock() - defer i.Unlock() - - i.RuntimeConfig.RLock() - defer i.RuntimeConfig.RUnlock() - +// ProcessEvent processes the given event for the current incident. +// +// ProcessEvent will perform all the necessary actions for the current incident and execute any database queries +// within the provided transaction. However, this method does not trigger any notifications by itself and must be +// generated/triggered manually via the GenerateNotifications method. +// +// Please note that you always have to call this method while holding the incident and config.RuntimeConfig lock. +// +// Returns an error when it fails to successfully process the specified event. +func (i *Incident) ProcessEvent(ctx context.Context, tx *sqlx.Tx, ev *event.Event) error { // These event types are not like the others used to mute an object/incident, such as DowntimeStart, which // uniquely identify themselves why an incident is being muted, but are rather super generic types, and as // such, we are ignoring superfluous ones that don't have any effect on that incident. @@ -123,22 +131,9 @@ func (i *Incident) ProcessEvent(ctx context.Context, ev *event.Event) error { return event.ErrSuperfluousMuteUnmuteEvent } - tx, err := i.DB.BeginTxx(ctx, nil) - if err != nil { - i.Logger.Errorw("Cannot start a db transaction", zap.Error(err)) - return err - } - defer func() { _ = tx.Rollback() }() - - if err = ev.Sync(ctx, tx, i.DB, i.Object.ID); err != nil { - i.Logger.Errorw("Failed to insert event and fetch its ID", zap.String("event", ev.String()), zap.Error(err)) - return err - } - isNew := i.StartedAt.Time().IsZero() if isNew { - err = i.processIncidentOpenedEvent(ctx, tx, ev) - if err != nil { + if err := i.processIncidentOpenedEvent(ctx, tx, ev); err != nil { return err } @@ -180,10 +175,7 @@ func (i *Incident) ProcessEvent(ctx context.Context, ev *event.Event) error { // Re-evaluate escalations based on the newly evaluated rules. i.evaluateEscalations(ev.Time) - - if err := i.triggerEscalations(ctx, tx, ev); err != nil { - return err - } + return i.triggerEscalations(ctx, tx, ev) case event.TypeAcknowledgementSet: if err := i.processAcknowledgementEvent(ctx, tx, ev); err != nil { if errors.Is(err, errSuperfluousAckEvent) { @@ -196,20 +188,7 @@ func (i *Incident) ProcessEvent(ctx context.Context, ev *event.Event) error { } } - notifications, err := i.generateNotifications(ctx, tx, ev, i.getRecipientsChannel(ev.Time)) - if err != nil { - return err - } - - if err = tx.Commit(); err != nil { - i.Logger.Errorw("Cannot commit db transaction", zap.Error(err)) - return err - } - - // We've just committed the DB transaction and can safely update the incident muted flag. - i.isMuted = i.Object.IsMuted() - - return i.NotifyContacts(ctx, i.makeNotificationRequest(ev), notifications) + return nil } // RetriggerEscalations tries to re-evaluate the escalations and notify contacts. @@ -257,13 +236,13 @@ func (i *Incident) RetriggerEscalations(ev *event.Event) { channels.LoadFromEntryRecipients(escalation, ev.Time, i.isRecipientNotifiable) } - notifications, err = i.generateNotifications(ctx, tx, ev, channels) + notifications, err = i.GenerateNotifications(ctx, tx, ev, channels) return err }) if err != nil { i.Logger.Errorw("Reevaluating time-based escalations failed", zap.Error(err)) } else { - if err = i.NotifyContacts(ctx, i.makeNotificationRequest(ev), notifications); err != nil { + if err = i.NotifyContacts(ctx, i.MakeNotificationRequest(ev), notifications); err != nil { i.Logger.Errorw("Failed to notify reevaluated escalation recipients", zap.Error(err)) return } @@ -569,8 +548,8 @@ func (i *Incident) processAcknowledgementEvent(ctx context.Context, tx *sqlx.Tx, return nil } -// getRecipientsChannel returns all the configured channels of the current incident and escalation recipients. -func (i *Incident) getRecipientsChannel(t time.Time) rule.ContactChannels { +// GetRecipientsChannel returns all the configured channels of the current incident and escalation recipients. +func (i *Incident) GetRecipientsChannel(t time.Time) rule.ContactChannels { contactChs := make(rule.ContactChannels) // Load all escalations recipients channels for escalationID := range i.EscalationState { diff --git a/internal/incident/incidents.go b/internal/incident/incidents.go index 061d1cfe..cef99f6c 100644 --- a/internal/incident/incidents.go +++ b/internal/incident/incidents.go @@ -11,7 +11,6 @@ import ( "github.com/icinga/icinga-notifications/internal/event" "github.com/icinga/icinga-notifications/internal/object" "github.com/icinga/icinga-notifications/internal/utils" - "github.com/jmoiron/sqlx" "github.com/pkg/errors" "go.uber.org/zap" "golang.org/x/sync/errgroup" @@ -196,67 +195,3 @@ func GetCurrentIncidents() map[int64]*Incident { } return m } - -// ProcessEvent from an event.Event. -// -// This function first gets this Event's object.Object and its incident.Incident. Then, after performing some safety -// checks, it calls the Incident.ProcessEvent method. -// -// The returned error might be wrapped around event.ErrSuperfluousStateChange. -func ProcessEvent( - ctx context.Context, - db *database.DB, - logs *logging.Logging, - runtimeConfig *config.RuntimeConfig, - ev *event.Event, -) error { - var wasObjectMuted bool - if obj := object.GetFromCache(object.ID(ev.SourceId, ev.Tags)); obj != nil { - wasObjectMuted = obj.IsMuted() - } - - obj, err := object.FromEvent(ctx, db, ev) - if err != nil { - return fmt.Errorf("cannot sync event object: %w", err) - } - - createIncident := ev.Severity != event.SeverityNone && ev.Severity != event.SeverityOK - currentIncident, err := GetCurrent( - ctx, - db, - obj, - logs.GetChildLogger("incident"), - runtimeConfig, - createIncident) - if err != nil { - return fmt.Errorf("cannot get current incident for %q: %w", obj.DisplayName(), err) - } - - if currentIncident == nil { - switch { - case ev.Severity == event.SeverityNone: - // We need to ignore superfluous mute and unmute events here, as would be the case with an existing - // incident, otherwise the event stream catch-up phase will generate useless events after each - // Icinga 2 reload and overwhelm the database with the very same mute/unmute events. - if wasObjectMuted && ev.Type == event.TypeMute { - return event.ErrSuperfluousMuteUnmuteEvent - } else if !wasObjectMuted && ev.Type == event.TypeUnmute { - return event.ErrSuperfluousMuteUnmuteEvent - } - - // There is no active incident, but the event appears to be relevant, so try to persist it in the DB. - err = utils.RunInTx(ctx, db, func(tx *sqlx.Tx) error { return ev.Sync(ctx, tx, db, obj.ID) }) - if err != nil { - return errors.New("cannot sync non-state event to the database") - } - - return nil - case ev.Severity != event.SeverityOK: - panic(fmt.Sprintf("cannot process event %v with a non-OK state %v without a known incident", ev, ev.Severity)) - default: - return fmt.Errorf("%w: ok state event from source %d", event.ErrSuperfluousStateChange, ev.SourceId) - } - } - - return currentIncident.ProcessEvent(ctx, ev) -} diff --git a/internal/incident/sync.go b/internal/incident/sync.go index ccbb59ee..830fd9e3 100644 --- a/internal/incident/sync.go +++ b/internal/incident/sync.go @@ -124,12 +124,12 @@ func (i *Incident) AddRuleMatched(ctx context.Context, tx *sqlx.Tx, r *rule.Rule return err } -// generateNotifications generates incident notification histories of the given recipients. +// GenerateNotifications generates incident notification histories of the given recipients. // // This function will just insert notification.StateSuppressed incident histories and return an empty slice if // the current Object is muted, otherwise a slice of pending *NotificationEntry(ies) that can be used to update // the corresponding histories after the actual notifications have been sent out. -func (i *Incident) generateNotifications( +func (i *Incident) GenerateNotifications( ctx context.Context, tx *sqlx.Tx, ev *event.Event, contactChannels rule.ContactChannels, ) (notification.PendingNotifications, error) { notifications, err := notification.AddNotifications(ctx, i.DB, tx, contactChannels, func(n *notification.History) { diff --git a/internal/incident/utils.go b/internal/incident/utils.go index 357aa0e2..a75ee997 100644 --- a/internal/incident/utils.go +++ b/internal/incident/utils.go @@ -10,9 +10,9 @@ import ( "net/url" ) -// makeNotificationRequest generates a *plugin.NotificationRequest for the provided event. +// MakeNotificationRequest generates a *plugin.NotificationRequest for the provided event. // Fails fatally when fails to parse the Icinga Web 2 url. -func (i *Incident) makeNotificationRequest(ev *event.Event) *plugin.NotificationRequest { +func (i *Incident) MakeNotificationRequest(ev *event.Event) *plugin.NotificationRequest { baseUrl, err := url.Parse(daemon.Config().Icingaweb2URL) if err != nil { i.Logger.Panicw("Failed to parse Icinga Web 2 URL", zap.String("url", daemon.Config().Icingaweb2URL), zap.Error(err)) diff --git a/internal/listener/listener.go b/internal/listener/listener.go index 7429f6e1..9b63d088 100644 --- a/internal/listener/listener.go +++ b/internal/listener/listener.go @@ -12,6 +12,7 @@ import ( "github.com/icinga/icinga-notifications/internal/config" "github.com/icinga/icinga-notifications/internal/daemon" "github.com/icinga/icinga-notifications/internal/event" + "github.com/icinga/icinga-notifications/internal/events" "github.com/icinga/icinga-notifications/internal/incident" "go.uber.org/zap" "net/http" @@ -133,7 +134,7 @@ func (l *Listener) ProcessEvent(w http.ResponseWriter, req *http.Request) { } l.logger.Infow("Processing event", zap.String("event", ev.String())) - err = incident.ProcessEvent(context.Background(), l.db, l.logs, l.runtimeConfig, &ev) + err = events.Process(context.Background(), l.db, l.logs, l.runtimeConfig, &ev) if errors.Is(err, event.ErrSuperfluousStateChange) || errors.Is(err, event.ErrSuperfluousMuteUnmuteEvent) { abort(http.StatusNotAcceptable, &ev, "%v", err) return