Skip to content

Commit

Permalink
WIP!
Browse files Browse the repository at this point in the history
  • Loading branch information
sukhwinder33445 committed Jan 4, 2024
1 parent 055f298 commit 5c34818
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 55 deletions.
26 changes: 26 additions & 0 deletions internal/common/history_event_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package common
import (
"database/sql/driver"
"fmt"
"github.com/icinga/icinga-notifications/internal/event"
)

type HistoryEventType int
Expand Down Expand Up @@ -91,3 +92,28 @@ func (h HistoryEventType) Value() (driver.Value, error) {
func (h *HistoryEventType) String() string {
return historyEventTypeToName[*h]
}

func GetHistoryEventType(eventType string) (HistoryEventType, error) {
var historyEvType HistoryEventType
switch eventType {
case event.TypeDowntimeStart:
historyEvType = DowntimeStarted
case event.TypeDowntimeEnd:
historyEvType = DowntimeEnded
case event.TypeDowntimeCancelled:
historyEvType = DowntimeCancelled
case event.TypeFlappingStart:
historyEvType = FlappingStarted
case event.TypeFlappingEnd:
historyEvType = FlappingEnded
case event.TypeCustom:
historyEvType = Custom
case event.TypeCommentAdded:
historyEvType = CommentAdded
default:
//TODO: other events
return historyEvType, fmt.Errorf("type %s not implemented yet", eventType)
}

return historyEvType, nil
}
128 changes: 108 additions & 20 deletions internal/eventhandler/eventhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,6 @@ func (n *Notification) TableName() string {
return "history"
}

/*type IncidentNotification struct {
*Notification
}
// TableName implements the contracts.TableNamer interface.
func (in *IncidentNotification) TableName() string {
return "incident_history"
}*/

func NewEventHandler(
db *icingadb.DB, runtimeConfig *config.RuntimeConfig, logger *zap.SugaredLogger, obj *object.Object, ev *event.Event, i *incident.Incident,
) *EventHandler {
Expand Down Expand Up @@ -180,6 +171,26 @@ func (eh *EventHandler) handle(ctx context.Context) error {
eh.timer.Stop()
eh.timer = nil
}
} else {
historyEvType, err := common.GetHistoryEventType(ev.Type)
if err != nil {
return err
}

hr := &incident.HistoryRow{
EventID: utils.ToDBInt(ev.ID),
Time: types.UnixMilli(time.Now()),
Type: historyEvType,
Message: utils.ToDBString(ev.Message),
}

// TODO: do we need causedby here? for rule_matched, etc entries
causedBy, err = eh.addNonIncidentHistory(ctx, tx, hr, true)
if err != nil {
eh.logger.Errorw("Failed to add non-incident history", zap.String("type", historyEvType.String()), zap.Error(err))

return fmt.Errorf("failed to add %s non-incident history", historyEvType.String())
}
}

// Check if any (additional) rules match this object. Filters of rules that already have a state don't have
Expand Down Expand Up @@ -250,7 +261,21 @@ func (eh *EventHandler) evaluateRules(ctx context.Context, tx *sqlx.Tx, eventID
return causedBy, err
}
} else {
// non state handling here
//TODO: NON-incident history entry
history := &incident.HistoryRow{
Time: types.UnixMilli(time.Now()),
EventID: utils.ToDBInt(eventID),
RuleID: utils.ToDBInt(r.ID),
Type: common.RuleMatched,
CausedByHistoryID: causedBy,
}

causedBy, err = eh.addNonIncidentHistory(ctx, tx, history, true) //TODO: fethedId??
if err != nil {
eh.logger.Errorw("Failed to insert rule matched non-incident history", zap.String("rule", r.Name), zap.Error(err))

return types.Int{}, fmt.Errorf("failed to insert rule matched non-incident history")
}
}
}
}
Expand Down Expand Up @@ -424,6 +449,11 @@ func (eh *EventHandler) EvaluateIncidentEscalations(ev *event.Event) ([]*rule.Es
}
}

//TODO: test fails: Add a non-state escalation with comment_added filter
// Add a escalation rule incident age 30s.. set host to down, and a comment before 30 sec.. see, the logs..
// Yes, comment notifications reevaluate the time based escalation and it is then never triggred..

//TODO: this must be handled differently. the returned escalations should not contain escalations that have a retryAfter
if matched || isNonStateEscForAll {
escalations = append(escalations, escalation.EscalationTemplate)
}
Expand Down Expand Up @@ -467,7 +497,29 @@ func (eh *EventHandler) triggerEscalations(ctx context.Context, tx *sqlx.Tx, ev
} else {
//TODO: non incident
eh.logger.Infof("Rule %q reached escalation %q", r.Name, escalation.DisplayName())
err := eh.AddRecipient(ctx, tx, escalation, ev.ID)

history := &incident.HistoryRow{
Time: types.UnixMilli(time.Now()),
EventID: utils.ToDBInt(ev.ID),
RuleNonStateEscalationID: utils.ToDBInt(escalation.ID),
RuleID: utils.ToDBInt(r.ID),
Type: common.EscalationTriggered,
CausedByHistoryID: causedBy,
}

var err error
causedBy, err = eh.addNonIncidentHistory(ctx, tx, history, false)
if err != nil {
eh.logger.Errorw(
"Failed to insert non-state-escalation triggered non-incident history",
zap.String("rule", r.Name),
zap.String("escalation",
escalation.DisplayName()), zap.Error(err))

return fmt.Errorf("failed to insert non-state-escalation triggered non-incident history")
}

err = eh.AddRecipient(ctx, tx, escalation, ev.ID)
if err != nil {
return err
}
Expand All @@ -480,7 +532,7 @@ func (eh *EventHandler) triggerEscalations(ctx context.Context, tx *sqlx.Tx, ev
// AddRecipient adds recipient from the given *rule.Escalation to this incident.
// Syncs also all the recipients with the database and returns an error on db failure.
func (eh *EventHandler) AddRecipient(ctx context.Context, tx *sqlx.Tx, escalation *rule.EscalationTemplate, eventId int64) error {
//TODO: Handling required, currently nothing happens without incident
//TODO: either trigger i.AddRecipient here or remove unused ctx,tx params
for _, escalationRecipient := range escalation.Recipients {
r := escalationRecipient.Recipient

Expand All @@ -501,6 +553,7 @@ func (eh *EventHandler) getRecipientsChannel() incident.ContactChannels {
i := eh.Incident
if i != nil {
contactChs = i.GetRecipientsChannel(eventTime)
println("sd contCHs", len(contactChs), eh.Event.Type, i.Severity.String())
}

// Check whether all the incident recipients do have an appropriate contact channel configured.
Expand Down Expand Up @@ -538,8 +591,27 @@ func (eh *EventHandler) addPendingNotifications(ctx context.Context, tx *sqlx.Tx
return nil, err
}
} else {
//TODO: handling required for non-incident
historyId = 9898
hr := &incident.HistoryRow{
Key: recipient.ToKey(contact),
EventID: utils.ToDBInt(ev.ID),
Time: types.UnixMilli(time.Now()),
Type: common.Notified,
ChannelID: utils.ToDBInt(chID),
CausedByHistoryID: causedBy,
NotificationState: common.NotificationStatePending,
}

id, err := eh.addNonIncidentHistory(ctx, tx, hr, true)
if err != nil {
eh.logger.Errorw(
"Failed to insert contact pending notification non-incident history",
zap.String("contact", contact.String()),
zap.Error(err))

return nil, fmt.Errorf("failed to insert contact pending notification non-incident history")
}

historyId = id.Int64
}

notifications = append(notifications, &Notification{
Expand All @@ -554,6 +626,26 @@ func (eh *EventHandler) addPendingNotifications(ctx context.Context, tx *sqlx.Tx
return notifications, nil
}

func (eh *EventHandler) addNonIncidentHistory(ctx context.Context, tx *sqlx.Tx, historyRow *incident.HistoryRow, fetchId bool) (types.Int, error) {
historyRow.ObjectID = eh.Object.ID
stmt := utils.BuildInsertStmtWithout(eh.db, historyRow, "id")
if fetchId {
historyId, err := utils.InsertAndFetchId(ctx, tx, stmt, historyRow)
if err != nil {
return types.Int{}, err
}

return utils.ToDBInt(historyId), nil
} else {
_, err := tx.NamedExecContext(ctx, stmt, historyRow)
if err != nil {
return types.Int{}, err
}
}

return types.Int{}, nil
}

func (eh *EventHandler) notifyContacts(ctx context.Context, ev *event.Event, notifications []*Notification) error {
for _, n := range notifications {
contact := eh.runtimeConfig.Contacts[n.ContactID]
Expand All @@ -575,19 +667,15 @@ func (eh *EventHandler) notifyContacts(ctx context.Context, ev *event.Event, not
}

func (eh *EventHandler) addNotifiedHistory(ctx context.Context, n *Notification, contactStr string) error {
var notifiedHistory interface{}

notifiedHistory = n

/*if eh.Incident != nil {
notifiedHistory = &IncidentNotification{n}
} else {
//TODO: history for non-incident notifications
return nil
}*/

stmt, _ := eh.db.BuildUpdateStmt(notifiedHistory)
if _, err := eh.db.NamedExecContext(ctx, stmt, notifiedHistory); err != nil {
stmt, _ := eh.db.BuildUpdateStmt(n)
if _, err := eh.db.NamedExecContext(ctx, stmt, n); err != nil {
eh.logger.Errorw(
"Failed to update contact notified history", zap.String("contact", contactStr),
zap.Error(err),
Expand Down
59 changes: 24 additions & 35 deletions internal/incident/incident.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,28 +249,33 @@ func (i *Incident) processIncidentOpenedEvent(ctx context.Context, tx *sqlx.Tx,
// TriggerEscalation triggers the given escalations and generates incident history items for each of them.
// Returns an error on database failure.
func (i *Incident) TriggerEscalation(ctx context.Context, tx *sqlx.Tx, ev *event.Event, causedBy types.Int, escalation *rule.EscalationTemplate, r *rule.Rule) error {
i.logger.Infof("Rule %q reached escalation %q", r.Name, escalation.DisplayName())

state := &EscalationState{RuleEscalationID: escalation.ID, TriggeredAt: types.UnixMilli(time.Now())}
i.EscalationState[escalation.ID] = state

if err := i.AddEscalationTriggered(ctx, tx, state); err != nil {
i.logger.Errorw(
"Failed to upsert escalation state", zap.String("rule", r.Name),
zap.String("escalation", escalation.DisplayName()), zap.Error(err),
)
return errors.New("failed to upsert escalation state")
}

history := &HistoryRow{
Time: state.TriggeredAt,
Time: types.UnixMilli(time.Now()),
EventID: utils.ToDBInt(ev.ID),
RuleEscalationID: utils.ToDBInt(state.RuleEscalationID),
RuleID: utils.ToDBInt(r.ID),
Type: common.EscalationTriggered,
CausedByHistoryID: causedBy,
}

i.logger.Infof("Rule %q reached escalation %q", r.Name, escalation.DisplayName())
if ev.Type == event.TypeState {
history.RuleEscalationID = utils.ToDBInt(escalation.ID)

state := &EscalationState{RuleEscalationID: escalation.ID, TriggeredAt: history.Time}
i.EscalationState[escalation.ID] = state

if err := i.AddEscalationTriggered(ctx, tx, state); err != nil {
i.logger.Errorw(
"Failed to upsert escalation state", zap.String("rule", r.Name),
zap.String("escalation", escalation.DisplayName()), zap.Error(err),
)

return errors.New("failed to upsert escalation state")
}
} else {
history.RuleNonStateEscalationID = utils.ToDBInt(escalation.ID)
}

if _, err := i.AddHistory(ctx, tx, history, false); err != nil {
i.logger.Errorw(
"Failed to insert escalation triggered incident history", zap.String("rule", r.Name),
Expand All @@ -292,25 +297,9 @@ func (i *Incident) processNonStateTypeEvent(ctx context.Context, tx *sqlx.Tx, ev
return i.processAcknowledgementEvent(ctx, tx, ev)
}

var historyEvType common.HistoryEventType
switch ev.Type {
case event.TypeDowntimeStart:
historyEvType = common.DowntimeStarted
case event.TypeDowntimeEnd:
historyEvType = common.DowntimeEnded
case event.TypeDowntimeCancelled:
historyEvType = common.DowntimeCancelled
case event.TypeFlappingStart:
historyEvType = common.FlappingStarted
case event.TypeFlappingEnd:
historyEvType = common.FlappingEnded
case event.TypeCustom:
historyEvType = common.Custom
case event.TypeCommentAdded:
historyEvType = common.CommentAdded
default:
//TODO: other events
return fmt.Errorf("type %s not implemented yet", ev.Type)
historyEvType, err := common.GetHistoryEventType(ev.Type)
if err != nil {
return err
}

hr := &HistoryRow{
Expand All @@ -320,7 +309,7 @@ func (i *Incident) processNonStateTypeEvent(ctx context.Context, tx *sqlx.Tx, ev
Message: utils.ToDBString(ev.Message),
}

_, err := i.AddHistory(ctx, tx, hr, false)
_, err = i.AddHistory(ctx, tx, hr, false)
if err != nil {
i.logger.Errorw("Failed to add incident history", zap.String("type", historyEvType.String()), zap.Error(err))

Expand Down

0 comments on commit 5c34818

Please sign in to comment.