diff --git a/internal/channel/channel.go b/internal/channel/channel.go index c5a5c86d7..9db8b68d8 100644 --- a/internal/channel/channel.go +++ b/internal/channel/channel.go @@ -8,6 +8,7 @@ import ( "github.com/icinga/icinga-notifications/internal/event" "github.com/icinga/icinga-notifications/internal/recipient" "github.com/icinga/icinga-notifications/pkg/plugin" + "github.com/icinga/icingadb/pkg/types" "go.uber.org/zap" "net/url" ) @@ -18,6 +19,9 @@ type Channel struct { Type string `db:"type"` Config string `db:"config" json:"-"` // excluded from JSON config dump as this may contain sensitive information + ChangedAt types.UnixMilli `db:"changed_at" json:"changed_at"` + IsDeleted types.Bool `db:"deleted" json:"deleted"` + Logger *zap.SugaredLogger restartCh chan newConfig diff --git a/internal/config/channel.go b/internal/config/channel.go index 769a919f3..881e134d4 100644 --- a/internal/config/channel.go +++ b/internal/config/channel.go @@ -3,49 +3,47 @@ package config import ( "context" "github.com/icinga/icinga-notifications/internal/channel" + "github.com/icinga/icingadb/pkg/utils" "github.com/jmoiron/sqlx" "go.uber.org/zap" ) func (r *RuntimeConfig) fetchChannels(ctx context.Context, tx *sqlx.Tx) error { var channelPtr *channel.Channel - stmt := r.db.BuildSelectStmt(channelPtr, channelPtr) - r.logger.Debugf("Executing query %q", stmt) + changedAt := r.pendingLastChange[utils.TableName(channelPtr)] + stmt := r.buildSelectStmtWhereChangedAt(channelPtr) + r.logger.Debugw("Executing query to fetch channels", + zap.String("query", stmt), zap.Time("changed_at_after", changedAt.Time())) var channels []*channel.Channel - if err := tx.SelectContext(ctx, &channels, stmt); err != nil { + if err := tx.SelectContext(ctx, &channels, stmt, changedAt); err != nil { r.logger.Errorln(err) return err } channelsById := make(map[int64]*channel.Channel) for _, c := range channels { + changedAt = c.ChangedAt + channelLogger := r.logger.With( zap.Int64("id", c.ID), zap.String("name", c.Name), zap.String("type", c.Type), - ) - if channelsById[c.ID] != nil { - channelLogger.Warnw("ignoring duplicate config for channel type") + zap.Time("changed_at", c.ChangedAt.Time())) + + if c.IsDeleted.Valid && c.IsDeleted.Bool { + channelsById[c.ID] = nil + channelLogger.Debug("Marking channel as deleted") } else if err := channel.ValidateType(c.Type); err != nil { channelLogger.Errorw("Cannot load channel config", zap.Error(err)) } else { channelsById[c.ID] = c - - channelLogger.Debugw("loaded channel config") - } - } - - if r.Channels != nil { - // mark no longer existing channels for deletion - for id := range r.Channels { - if _, ok := channelsById[id]; !ok { - channelsById[id] = nil - } + channelLogger.Debug("Loaded channel config") } } r.pending.Channels = channelsById + r.pendingLastChange[utils.TableName(channelPtr)] = changedAt return nil } @@ -56,28 +54,30 @@ func (r *RuntimeConfig) applyPendingChannels() { } for id, pendingChannel := range r.pending.Channels { + existingChannel, existingChannelOk := r.Channels[id] if pendingChannel == nil { - r.Channels[id].Logger.Info("Channel has been removed") - r.Channels[id].Stop() + if !existingChannelOk { + continue + } + existingChannel.Logger.Info("Channel has been removed") + existingChannel.Stop() delete(r.Channels, id) - } else if currentChannel := r.Channels[id]; currentChannel != nil { - // Currently, the whole config is reloaded from the database frequently, replacing everything. - // Prevent restarting the plugin processes every time by explicitly checking for config changes. - // The if condition should no longer be necessary when https://github.com/Icinga/icinga-notifications/issues/5 - // is solved properly. - if currentChannel.Type != pendingChannel.Type || currentChannel.Name != pendingChannel.Name || currentChannel.Config != pendingChannel.Config { - currentChannel.Type = pendingChannel.Type - currentChannel.Name = pendingChannel.Name - currentChannel.Config = pendingChannel.Config - - currentChannel.Restart() - } + } else if existingChannelOk { + existingChannel.Logger.Info("Channel has been updated") + + existingChannel.Name = pendingChannel.Name + existingChannel.Type = pendingChannel.Type + existingChannel.Config = pendingChannel.Config + + existingChannel.Restart() } else { pendingChannel.Start(context.TODO(), r.logs.GetChildLogger("channel").With( zap.Int64("id", pendingChannel.ID), zap.String("name", pendingChannel.Name))) + pendingChannel.Logger.Info("Channel has been added") + r.Channels[id] = pendingChannel } } diff --git a/internal/config/contact.go b/internal/config/contact.go index bb72efad4..8807fde4b 100644 --- a/internal/config/contact.go +++ b/internal/config/contact.go @@ -3,40 +3,44 @@ package config import ( "context" "github.com/icinga/icinga-notifications/internal/recipient" + "github.com/icinga/icingadb/pkg/utils" "github.com/jmoiron/sqlx" "go.uber.org/zap" ) func (r *RuntimeConfig) fetchContacts(ctx context.Context, tx *sqlx.Tx) error { var contactPtr *recipient.Contact - stmt := r.db.BuildSelectStmt(contactPtr, contactPtr) - r.logger.Debugf("Executing query %q", stmt) + changedAt := r.pendingLastChange[utils.TableName(contactPtr)] + stmt := r.buildSelectStmtWhereChangedAt(contactPtr) + r.logger.Debugw("Executing query to fetch contacts", + zap.String("query", stmt), zap.Time("changed_at_after", changedAt.Time())) var contacts []*recipient.Contact - if err := tx.SelectContext(ctx, &contacts, stmt); err != nil { + if err := tx.SelectContext(ctx, &contacts, stmt, changedAt); err != nil { r.logger.Errorln(err) return err } contactsByID := make(map[int64]*recipient.Contact) for _, c := range contacts { - contactsByID[c.ID] = c + changedAt = c.ChangedAt - r.logger.Debugw("loaded contact config", + contactLogger := r.logger.With( zap.Int64("id", c.ID), - zap.String("name", c.FullName)) - } + zap.String("name", c.FullName), + zap.Time("changed_at", c.ChangedAt.Time())) - if r.Contacts != nil { - // mark no longer existing contacts for deletion - for id := range r.Contacts { - if _, ok := contactsByID[id]; !ok { - contactsByID[id] = nil - } + if c.IsDeleted.Valid && c.IsDeleted.Bool { + contactsByID[c.ID] = nil + contactLogger.Debug("Marking contact as deleted") + } else { + contactsByID[c.ID] = c + contactLogger.Debug("Loaded contact config") } } r.pending.Contacts = contactsByID + r.pendingLastChange[utils.TableName(contactPtr)] = changedAt return nil } @@ -47,13 +51,25 @@ func (r *RuntimeConfig) applyPendingContacts() { } for id, pendingContact := range r.pending.Contacts { + contactLogger := r.logger.With(zap.Int64("id", id)) + currentContact, currentContactOk := r.Contacts[id] + if pendingContact == nil { + if !currentContactOk { + continue + } + contactLogger.Info("Contact has been removed") + delete(r.Contacts, id) - } else if currentContact := r.Contacts[id]; currentContact != nil { + } else if currentContactOk { + contactLogger.Info("Contact has been updated") + currentContact.FullName = pendingContact.FullName currentContact.Username = pendingContact.Username currentContact.DefaultChannelID = pendingContact.DefaultChannelID } else { + contactLogger.Info("Contact has been added") + r.Contacts[id] = pendingContact } } diff --git a/internal/config/contact_address.go b/internal/config/contact_address.go index f89f82f0a..6639c934b 100644 --- a/internal/config/contact_address.go +++ b/internal/config/contact_address.go @@ -3,6 +3,7 @@ package config import ( "context" "github.com/icinga/icinga-notifications/internal/recipient" + "github.com/icinga/icingadb/pkg/utils" "github.com/jmoiron/sqlx" "go.uber.org/zap" "slices" @@ -10,36 +11,39 @@ import ( func (r *RuntimeConfig) fetchContactAddresses(ctx context.Context, tx *sqlx.Tx) error { var addressPtr *recipient.Address - stmt := r.db.BuildSelectStmt(addressPtr, addressPtr) - r.logger.Debugf("Executing query %q", stmt) + changedAt := r.pendingLastChange[utils.TableName(addressPtr)] + stmt := r.buildSelectStmtWhereChangedAt(addressPtr) + r.logger.Debugw("Executing query to fetch contact addresses", + zap.String("query", stmt), zap.Time("changed_at_after", changedAt.Time())) var addresses []*recipient.Address - if err := tx.SelectContext(ctx, &addresses, stmt); err != nil { + if err := tx.SelectContext(ctx, &addresses, stmt, changedAt); err != nil { r.logger.Errorln(err) return err } addressesById := make(map[int64]*recipient.Address) for _, a := range addresses { - addressesById[a.ID] = a - r.logger.Debugw("loaded contact_address config", + changedAt = a.ChangedAt + + addressLogger := r.logger.With( zap.Int64("id", a.ID), zap.Int64("contact_id", a.ContactID), zap.String("type", a.Type), zap.String("address", a.Address), - ) - } + zap.Time("changed_at", a.ChangedAt.Time())) - if r.ContactAddresses != nil { - // mark no longer existing contacts for deletion - for id := range r.ContactAddresses { - if _, ok := addressesById[id]; !ok { - addressesById[id] = nil - } + if a.IsDeleted.Valid && a.IsDeleted.Bool { + addressesById[a.ID] = nil + addressLogger.Debug("Marking contact address as deleted") + } else { + addressesById[a.ID] = a + addressLogger.Debug("Loaded contact address config") } } r.pending.ContactAddresses = addressesById + r.pendingLastChange[utils.TableName(addressPtr)] = changedAt return nil } @@ -50,11 +54,14 @@ func (r *RuntimeConfig) applyPendingContactAddresses() { } for id, pendingAddress := range r.pending.ContactAddresses { - currentAddress := r.ContactAddresses[id] + currentAddress, currentAddressOk := r.ContactAddresses[id] if pendingAddress == nil { + if !currentAddressOk { + continue + } r.removeContactAddress(currentAddress) - } else if currentAddress != nil { + } else if currentAddressOk { r.updateContactAddress(currentAddress, pendingAddress) } else { r.addContactAddress(pendingAddress) diff --git a/internal/config/group.go b/internal/config/group.go index 433162aaf..75d990b16 100644 --- a/internal/config/group.go +++ b/internal/config/group.go @@ -3,28 +3,41 @@ package config import ( "context" "github.com/icinga/icinga-notifications/internal/recipient" + "github.com/icinga/icinga-notifications/internal/utils" + dbutils "github.com/icinga/icingadb/pkg/utils" "github.com/jmoiron/sqlx" "go.uber.org/zap" ) func (r *RuntimeConfig) fetchGroups(ctx context.Context, tx *sqlx.Tx) error { var groupPtr *recipient.Group - stmt := r.db.BuildSelectStmt(groupPtr, groupPtr) - r.logger.Debugf("Executing query %q", stmt) + changedAt := r.pendingLastChange[dbutils.TableName(groupPtr)] + stmt := r.buildSelectStmtWhereChangedAt(groupPtr) + r.logger.Debugw("Executing query to fetch groups", + zap.String("query", stmt), zap.Time("changed_at_after", changedAt.Time())) var groups []*recipient.Group - if err := tx.SelectContext(ctx, &groups, stmt); err != nil { + if err := tx.SelectContext(ctx, &groups, stmt, changedAt); err != nil { r.logger.Errorln(err) return err } groupsById := make(map[int64]*recipient.Group) for _, g := range groups { - groupsById[g.ID] = g + changedAt = g.ChangedAt - r.logger.Debugw("loaded group config", + groupLogger := r.logger.With( zap.Int64("id", g.ID), - zap.String("name", g.Name)) + zap.String("name", g.Name), + zap.Time("changed_at", g.ChangedAt.Time())) + + if g.IsDeleted.Valid && g.IsDeleted.Bool { + groupsById[g.ID] = nil + groupLogger.Debug("Marking group as deleted") + } else { + groupsById[g.ID] = g + groupLogger.Debug("Loaded group config") + } } type ContactgroupMember struct { @@ -32,12 +45,16 @@ func (r *RuntimeConfig) fetchGroups(ctx context.Context, tx *sqlx.Tx) error { ContactId int64 `db:"contact_id"` } - var memberPtr *ContactgroupMember - stmt = r.db.BuildSelectStmt(memberPtr, memberPtr) - r.logger.Debugf("Executing query %q", stmt) - var members []*ContactgroupMember - if err := tx.SelectContext(ctx, &members, stmt); err != nil { + err := r.selectRelationshipTableEntries( + ctx, + tx, + new(ContactgroupMember), + "contactgroup_id", + utils.MapKeys(groupsById), + r.Groups == nil, + &members) + if err != nil { r.logger.Errorln(err) return err } @@ -48,26 +65,21 @@ func (r *RuntimeConfig) fetchGroups(ctx context.Context, tx *sqlx.Tx) error { zap.Int64("contactgroup_id", m.GroupId), ) - if g := groupsById[m.GroupId]; g == nil { - memberLogger.Warnw("ignoring member for unknown contactgroup_id") - } else { - g.MemberIDs = append(g.MemberIDs, m.ContactId) - - memberLogger.Debugw("loaded contact group member", - zap.String("contactgroup_name", g.Name)) + g, ok := groupsById[m.GroupId] + if !ok { + memberLogger.Warn("Ignoring member for unknown contactgroup_id") + continue + } else if g == nil { + memberLogger.Debug("Skipping deleted member for unknown contactgroup_id") + continue } - } - if r.Groups != nil { - // mark no longer existing groups for deletion - for id := range r.Groups { - if _, ok := groupsById[id]; !ok { - groupsById[id] = nil - } - } + g.MemberIDs = append(g.MemberIDs, m.ContactId) + memberLogger.Info("Loaded contact group member", zap.String("contactgroup_name", g.Name)) } r.pending.Groups = groupsById + r.pendingLastChange[dbutils.TableName(groupPtr)] = changedAt return nil } diff --git a/internal/config/rule.go b/internal/config/rule.go index 2ebb9c97d..66aa5d2ab 100644 --- a/internal/config/rule.go +++ b/internal/config/rule.go @@ -5,52 +5,68 @@ import ( "github.com/icinga/icinga-notifications/internal/filter" "github.com/icinga/icinga-notifications/internal/rule" "github.com/icinga/icinga-notifications/internal/utils" + dbutils "github.com/icinga/icingadb/pkg/utils" "github.com/jmoiron/sqlx" "go.uber.org/zap" ) func (r *RuntimeConfig) fetchRules(ctx context.Context, tx *sqlx.Tx) error { var rulePtr *rule.Rule - stmt := r.db.BuildSelectStmt(rulePtr, rulePtr) - r.logger.Debugf("Executing query %q", stmt) + changedAt := r.pendingLastChange[dbutils.TableName(rulePtr)] + stmt := r.buildSelectStmtWhereChangedAt(rulePtr) + r.logger.Debugw("Executing query to fetch rules", + zap.String("query", stmt), + zap.Time("changed_at_after", changedAt.Time())) var rules []*rule.Rule - if err := tx.SelectContext(ctx, &rules, stmt); err != nil { + if err := tx.SelectContext(ctx, &rules, stmt, changedAt); err != nil { r.logger.Errorln(err) return err } rulesByID := make(map[int64]*rule.Rule) for _, ru := range rules { + changedAt = ru.ChangedAt + ruleLogger := r.logger.With( zap.Int64("id", ru.ID), zap.String("name", ru.Name), zap.String("object_filter", ru.ObjectFilterExpr.String), zap.Int64("timeperiod_id", ru.TimePeriodID.Int64), - ) + zap.Time("changed_at", ru.ChangedAt.Time())) - if ru.ObjectFilterExpr.Valid { - f, err := filter.Parse(ru.ObjectFilterExpr.String) - if err != nil { - ruleLogger.Warnw("ignoring rule as parsing object_filter failed", zap.Error(err)) - continue - } + if ru.IsDeleted.Valid && ru.IsDeleted.Bool { + rulesByID[ru.ID] = nil + ruleLogger.Debug("Marking rule as deleted") + } else { + if ru.ObjectFilterExpr.Valid { + f, err := filter.Parse(ru.ObjectFilterExpr.String) + if err != nil { + rulesByID[ru.ID] = nil + ruleLogger.Warnw("Ignoring rule as parsing object_filter failed", zap.Error(err)) + continue + } - ru.ObjectFilter = f - } + ru.ObjectFilter = f + } - ru.Escalations = make(map[int64]*rule.Escalation) + ru.Escalations = make(map[int64]*rule.Escalation) - rulesByID[ru.ID] = ru - ruleLogger.Debugw("loaded rule config") + rulesByID[ru.ID] = ru + ruleLogger.Debug("Loaded rule config") + } } - var escalationPtr *rule.Escalation - stmt = r.db.BuildSelectStmt(escalationPtr, escalationPtr) - r.logger.Debugf("Executing query %q", stmt) - var escalations []*rule.Escalation - if err := tx.SelectContext(ctx, &escalations, stmt); err != nil { + err := r.selectRelationshipTableEntries( + ctx, + tx, + new(rule.Escalation), + "rule_id", + utils.MapKeys(rulesByID), + r.Rules == nil, + &escalations) + if err != nil { r.logger.Errorln(err) return err } @@ -65,9 +81,12 @@ func (r *RuntimeConfig) fetchRules(ctx context.Context, tx *sqlx.Tx) error { zap.Int64("fallback_for", escalation.FallbackForID.Int64), ) - rule := rulesByID[escalation.RuleID] - if rule == nil { - escalationLogger.Warnw("ignoring escalation for unknown rule_id") + r, ok := rulesByID[escalation.RuleID] + if !ok { + escalationLogger.Warn("Ignoring escalation for unknown rule_id") + continue + } else if r == nil { + escalationLogger.Debug("Skipping deleted escalation for unknown rule_id") continue } @@ -91,17 +110,21 @@ func (r *RuntimeConfig) fetchRules(ctx context.Context, tx *sqlx.Tx) error { escalation.Name = escalation.NameRaw.String } - rule.Escalations[escalation.ID] = escalation + r.Escalations[escalation.ID] = escalation escalationsByID[escalation.ID] = escalation escalationLogger.Debugw("loaded escalation config") } - var recipientPtr *rule.EscalationRecipient - stmt = r.db.BuildSelectStmt(recipientPtr, recipientPtr) - r.logger.Debugf("Executing query %q", stmt) - var recipients []*rule.EscalationRecipient - if err := tx.SelectContext(ctx, &recipients, stmt); err != nil { + err = r.selectRelationshipTableEntries( + ctx, + tx, + new(rule.EscalationRecipient), + "rule_escalation_id", + utils.MapKeys(escalationsByID), + r.Rules == nil, + &recipients) + if err != nil { r.logger.Errorln(err) return err } @@ -113,24 +136,17 @@ func (r *RuntimeConfig) fetchRules(ctx context.Context, tx *sqlx.Tx) error { zap.Int64("channel_id", recipient.ChannelID.Int64)) escalation := escalationsByID[recipient.EscalationID] + // In contrary to similar code snippets, this should not be able to contain nil elements. if escalation == nil { - recipientLogger.Warnw("ignoring recipient for unknown escalation") + recipientLogger.Warn("Ignoring recipient for unknown escalation") } else { escalation.Recipients = append(escalation.Recipients, recipient) - recipientLogger.Debugw("loaded escalation recipient config") - } - } - - if r.Rules != nil { - // mark no longer existing rules for deletion - for id := range r.Rules { - if _, ok := rulesByID[id]; !ok { - rulesByID[id] = nil - } + recipientLogger.Debug("Loaded escalation recipient config") } } r.pending.Rules = rulesByID + r.pendingLastChange[dbutils.TableName(rulePtr)] = changedAt return nil } diff --git a/internal/config/runtime.go b/internal/config/runtime.go index acd744038..a53d2a73b 100644 --- a/internal/config/runtime.go +++ b/internal/config/runtime.go @@ -10,6 +10,7 @@ import ( "github.com/icinga/icinga-notifications/internal/timeperiod" "github.com/icinga/icingadb/pkg/icingadb" "github.com/icinga/icingadb/pkg/logging" + "github.com/icinga/icingadb/pkg/types" "github.com/jmoiron/sqlx" "go.uber.org/zap" "golang.org/x/crypto/bcrypt" @@ -30,7 +31,9 @@ type RuntimeConfig struct { EventStreamLaunchFunc func(source *Source) // pending contains changes to config objects that are to be applied to the embedded live config. - pending ConfigSet + pending *ConfigSet + // pendingLastChange holds the changed_at timestamp for incremental config updates. + pendingLastChange map[string]types.UnixMilli logs *logging.Logging logger *logging.Logger @@ -48,6 +51,8 @@ func NewRuntimeConfig( return &RuntimeConfig{ EventStreamLaunchFunc: esLaunch, + pendingLastChange: make(map[string]types.UnixMilli), + logs: logs, logger: logs.GetChildLogger("runtime-updates"), db: db, @@ -66,6 +71,9 @@ type ConfigSet struct { } func (r *RuntimeConfig) UpdateFromDatabase(ctx context.Context) error { + r.pending = &ConfigSet{} + defer func() { r.pending = nil }() + err := r.fetchFromDatabase(ctx) if err != nil { return err @@ -204,9 +212,6 @@ func (r *RuntimeConfig) fetchFromDatabase(ctx context.Context) error { r.logger.Debug("fetching configuration from database") start := time.Now() - // Reset all pending state to start from a clean state. - r.pending = ConfigSet{} - tx, err := r.db.BeginTxx(ctx, &sql.TxOptions{ Isolation: sql.LevelRepeatableRead, ReadOnly: true, diff --git a/internal/config/schedule.go b/internal/config/schedule.go index b43af6475..d186637b7 100644 --- a/internal/config/schedule.go +++ b/internal/config/schedule.go @@ -3,36 +3,53 @@ package config import ( "context" "github.com/icinga/icinga-notifications/internal/recipient" + "github.com/icinga/icinga-notifications/internal/utils" + dbutils "github.com/icinga/icingadb/pkg/utils" "github.com/jmoiron/sqlx" "go.uber.org/zap" ) func (r *RuntimeConfig) fetchSchedules(ctx context.Context, tx *sqlx.Tx) error { var schedulePtr *recipient.Schedule - stmt := r.db.BuildSelectStmt(schedulePtr, schedulePtr) - r.logger.Debugf("Executing query %q", stmt) + changedAt := r.pendingLastChange[dbutils.TableName(schedulePtr)] + stmt := r.buildSelectStmtWhereChangedAt(schedulePtr) + r.logger.Debugw("Executing query to fetch schedule", + zap.String("query", stmt), zap.Time("changed_at_after", changedAt.Time())) var schedules []*recipient.Schedule - if err := tx.SelectContext(ctx, &schedules, stmt); err != nil { + if err := tx.SelectContext(ctx, &schedules, stmt, changedAt); err != nil { r.logger.Errorln(err) return err } schedulesById := make(map[int64]*recipient.Schedule) - for _, g := range schedules { - schedulesById[g.ID] = g + for _, s := range schedules { + changedAt = s.ChangedAt - r.logger.Debugw("loaded schedule config", - zap.Int64("id", g.ID), - zap.String("name", g.Name)) - } + scheduleLogger := r.logger.With( + zap.Int64("id", s.ID), + zap.String("name", s.Name), + zap.Time("changed_at", s.ChangedAt.Time())) - var memberPtr *recipient.ScheduleMemberRow - stmt = r.db.BuildSelectStmt(memberPtr, memberPtr) - r.logger.Debugf("Executing query %q", stmt) + if s.IsDeleted.Valid && s.IsDeleted.Bool { + schedulesById[s.ID] = nil + scheduleLogger.Debug("Marking schedule as deleted") + } else { + schedulesById[s.ID] = s + scheduleLogger.Debug("Loaded schedule config") + } + } var members []*recipient.ScheduleMemberRow - if err := tx.SelectContext(ctx, &members, stmt); err != nil { + err := r.selectRelationshipTableEntries( + ctx, + tx, + new(recipient.ScheduleMemberRow), + "schedule_id", + utils.MapKeys(schedulesById), + r.Schedules == nil, + &members) + if err != nil { r.logger.Errorln(err) return err } @@ -40,25 +57,21 @@ func (r *RuntimeConfig) fetchSchedules(ctx context.Context, tx *sqlx.Tx) error { for _, member := range members { memberLogger := makeScheduleMemberLogger(r.logger.SugaredLogger, member) - if s := schedulesById[member.ScheduleID]; s == nil { - memberLogger.Warnw("ignoring schedule member for unknown schedule_id") - } else { - s.MemberRows = append(s.MemberRows, member) - - memberLogger.Debugw("member") + s, ok := schedulesById[member.ScheduleID] + if !ok { + memberLogger.Warn("Ignoring entry for unknown schedule_id") + continue + } else if s == nil { + memberLogger.Debug("Skipping deleted entry for unknown schedule_id") + continue } - } - if r.Schedules != nil { - // mark no longer existing schedules for deletion - for id := range r.Schedules { - if _, ok := schedulesById[id]; !ok { - schedulesById[id] = nil - } - } + s.MemberRows = append(s.MemberRows, member) + memberLogger.Debug("Member") } r.pending.Schedules = schedulesById + r.pendingLastChange[dbutils.TableName(schedulePtr)] = changedAt return nil } diff --git a/internal/config/source.go b/internal/config/source.go index fc32e34f5..0ea092e88 100644 --- a/internal/config/source.go +++ b/internal/config/source.go @@ -3,6 +3,7 @@ package config import ( "context" "github.com/icinga/icingadb/pkg/types" + "github.com/icinga/icingadb/pkg/utils" "github.com/jmoiron/sqlx" "go.uber.org/zap" ) @@ -27,6 +28,9 @@ type Source struct { // Icinga2SourceConf for Event Stream API sources, only if Source.Type == SourceTypeIcinga2. Icinga2SourceCancel context.CancelFunc `db:"-" json:"-"` + + ChangedAt types.UnixMilli `db:"changed_at" json:"changed_at"` + IsDeleted types.Bool `db:"deleted" json:"deleted"` } // fieldEquals checks if this Source's database fields are equal to those of another Source. @@ -56,41 +60,38 @@ func (source *Source) stop() { func (r *RuntimeConfig) fetchSources(ctx context.Context, tx *sqlx.Tx) error { var sourcePtr *Source - stmt := r.db.BuildSelectStmt(sourcePtr, sourcePtr) - r.logger.Debugf("Executing query %q", stmt) + changedAt := r.pendingLastChange[utils.TableName(sourcePtr)] + stmt := r.buildSelectStmtWhereChangedAt(sourcePtr) + r.logger.Debugw("Executing query to fetch sources", + zap.String("query", stmt), zap.Time("changed_at_after", changedAt.Time())) var sources []*Source - if err := tx.SelectContext(ctx, &sources, stmt); err != nil { + if err := tx.SelectContext(ctx, &sources, stmt, changedAt); err != nil { r.logger.Errorln(err) return err } sourcesById := make(map[int64]*Source) for _, s := range sources { + changedAt = s.ChangedAt + sourceLogger := r.logger.With( zap.Int64("id", s.ID), zap.String("name", s.Name), zap.String("type", s.Type), - ) - if sourcesById[s.ID] != nil { - sourceLogger.Error("Ignoring duplicate config for source ID") - continue - } + zap.Time("changed_at", s.ChangedAt.Time())) - sourcesById[s.ID] = s - sourceLogger.Debug("loaded source config") - } - - if r.Sources != nil { - // mark no longer existing sources for deletion - for id := range r.Sources { - if _, ok := sourcesById[id]; !ok { - sourcesById[id] = nil - } + if s.IsDeleted.Valid && s.IsDeleted.Bool { + sourcesById[s.ID] = nil + sourceLogger.Debug("Marking channel as deleted") + } else { + sourcesById[s.ID] = s + sourceLogger.Debug("Loaded source config") } } r.pending.Sources = sourcesById + r.pendingLastChange[utils.TableName(sourcePtr)] = changedAt return nil } @@ -102,28 +103,27 @@ func (r *RuntimeConfig) applyPendingSources() { for id, pendingSource := range r.pending.Sources { logger := r.logger.With(zap.Int64("id", id)) - currentSource := r.Sources[id] + currentSource, currentSourceOk := r.Sources[id] // Compare the pending source with an optional existing source; instruct the Event Source Client, if necessary. - if pendingSource == nil && currentSource != nil { + if pendingSource == nil { + if !currentSourceOk { + continue + } logger.Info("Source has been removed") currentSource.stop() delete(r.Sources, id) continue - } else if pendingSource != nil && currentSource != nil { + } else if currentSourceOk { if currentSource.fieldEquals(pendingSource) { continue } logger.Info("Source has been updated") currentSource.stop() - } else if pendingSource != nil && currentSource == nil { - logger.Info("Source has been added") } else { - // Neither an active nor a pending source? - logger.Error("Cannot applying pending configuration: neither an active nor a pending source") - continue + logger.Info("Source has been added") } if pendingSource.Type == SourceTypeIcinga2 { diff --git a/internal/config/timeperiod.go b/internal/config/timeperiod.go index 9acb6576b..e08e7d8e2 100644 --- a/internal/config/timeperiod.go +++ b/internal/config/timeperiod.go @@ -5,7 +5,9 @@ import ( "database/sql" "fmt" "github.com/icinga/icinga-notifications/internal/timeperiod" + "github.com/icinga/icinga-notifications/internal/utils" "github.com/icinga/icingadb/pkg/types" + dbutils "github.com/icinga/icingadb/pkg/utils" "github.com/jmoiron/sqlx" "go.uber.org/zap" "time" @@ -13,17 +15,33 @@ import ( func (r *RuntimeConfig) fetchTimePeriods(ctx context.Context, tx *sqlx.Tx) error { var timePeriodPtr *timeperiod.TimePeriod - stmt := r.db.BuildSelectStmt(timePeriodPtr, timePeriodPtr) - r.logger.Debugf("Executing query %q", stmt) + changedAt := r.pendingLastChange[dbutils.TableName(timePeriodPtr)] + stmt := r.buildSelectStmtWhereChangedAt(timePeriodPtr) + r.logger.Debugw("Executing query to fetch time periods", + zap.String("query", stmt), zap.Time("changed_at_after", changedAt.Time())) var timePeriods []*timeperiod.TimePeriod - if err := tx.SelectContext(ctx, &timePeriods, stmt); err != nil { + if err := tx.SelectContext(ctx, &timePeriods, stmt, changedAt); err != nil { r.logger.Errorln(err) return err } + timePeriodsById := make(map[int64]*timeperiod.TimePeriod) - for _, period := range timePeriods { - timePeriodsById[period.ID] = period + for _, p := range timePeriods { + changedAt = p.ChangedAt + + timePeriodLogger := r.logger.With( + zap.Int64("id", p.ID), + zap.String("name", p.Name), + zap.Time("changed_at", p.ChangedAt.Time())) + + if p.IsDeleted.Valid && p.IsDeleted.Bool { + timePeriodsById[p.ID] = nil + timePeriodLogger.Debug("Marking time period as deleted") + } else { + timePeriodsById[p.ID] = p + timePeriodLogger.Debug("Load time period config") + } } type TimeperiodEntry struct { @@ -36,22 +54,31 @@ func (r *RuntimeConfig) fetchTimePeriods(ctx context.Context, tx *sqlx.Tx) error Description sql.NullString `db:"description"` } - var entryPtr *TimeperiodEntry - stmt = r.db.BuildSelectStmt(entryPtr, entryPtr) - r.logger.Debugf("Executing query %q", stmt) - var entries []*TimeperiodEntry - if err := tx.SelectContext(ctx, &entries, stmt); err != nil { + err := r.selectRelationshipTableEntries( + ctx, + tx, + new(TimeperiodEntry), + "timeperiod_id", + utils.MapKeys(timePeriodsById), + r.TimePeriods == nil, + &entries) + if err != nil { r.logger.Errorln(err) return err } for _, row := range entries { - p := timePeriodsById[row.TimePeriodID] - if p == nil { - r.logger.Warnw("ignoring entry for unknown timeperiod_id", - zap.Int64("timeperiod_entry_id", row.ID), - zap.Int64("timeperiod_id", row.TimePeriodID)) + entryLogger := r.logger.With( + zap.Int64("timeperiod_entry_id", row.ID), + zap.Int64("timeperiod_id", row.TimePeriodID)) + + p, ok := timePeriodsById[row.TimePeriodID] + if !ok { + entryLogger.Warn("Ignoring entry for unknown timeperiod_id") + continue + } else if p == nil { + entryLogger.Debug("Skipping deleted entry for unknown timeperiod_id") continue } @@ -64,8 +91,7 @@ func (r *RuntimeConfig) fetchTimePeriods(ctx context.Context, tx *sqlx.Tx) error loc, err := time.LoadLocation(row.Timezone) if err != nil { - r.logger.Warnw("ignoring time period entry with unknown timezone", - zap.Int64("timeperiod_entry_id", row.ID), + entryLogger.Warnw("Ignoring time period entry with unknown timezone", zap.String("timezone", row.Timezone), zap.Error(err)) continue @@ -83,8 +109,7 @@ func (r *RuntimeConfig) fetchTimePeriods(ctx context.Context, tx *sqlx.Tx) error err = entry.Init() if err != nil { - r.logger.Warnw("ignoring time period entry", - zap.Int64("timeperiod_entry_id", row.ID), + entryLogger.Warnw("Ignoring time period entry", zap.String("rrule", entry.RecurrenceRule), zap.Error(err)) continue @@ -92,7 +117,7 @@ func (r *RuntimeConfig) fetchTimePeriods(ctx context.Context, tx *sqlx.Tx) error p.Entries = append(p.Entries, entry) - r.logger.Debugw("loaded time period entry", + entryLogger.Debugw("Loaded time period entry", zap.String("timeperiod", p.Name), zap.Time("start", entry.Start), zap.Time("end", entry.End), @@ -100,21 +125,17 @@ func (r *RuntimeConfig) fetchTimePeriods(ctx context.Context, tx *sqlx.Tx) error } for _, p := range timePeriodsById { - if p.Name == "" { - p.Name = fmt.Sprintf("Time Period #%d (empty)", p.ID) + if p == nil { + continue } - } - if r.TimePeriods != nil { - // mark no longer existing time periods for deletion - for id := range r.TimePeriods { - if _, ok := timePeriodsById[id]; !ok { - timePeriodsById[id] = nil - } + if p.Name == "" { + p.Name = fmt.Sprintf("Time Period #%d (empty)", p.ID) } } r.pending.TimePeriods = timePeriodsById + r.pendingLastChange[dbutils.TableName(timePeriodPtr)] = changedAt return nil } diff --git a/internal/config/utils.go b/internal/config/utils.go new file mode 100644 index 000000000..f9d1789e9 --- /dev/null +++ b/internal/config/utils.go @@ -0,0 +1,63 @@ +package config + +import ( + "context" + "github.com/icinga/icingadb/pkg/utils" + "github.com/jmoiron/sqlx" + "go.uber.org/zap" + "strings" +) + +// buildSelectStmtWhereChangedAt creates a SQL SELECT for an incremental configuration synchronization. +// +// The query, which will be a prepared statement, expects a types.UnixMilli parameter to be compared against. This +// parameter might be NULL, being COALESCEd to a numeric zero, returning all rows. +func (r *RuntimeConfig) buildSelectStmtWhereChangedAt(typePtr interface{}) string { + return r.db.Rebind(r.db.BuildSelectStmt(typePtr, typePtr) + + ` WHERE "changed_at" > COALESCE(?, CAST(0 AS BIGINT)) ORDER BY "changed_at"`) +} + +// selectRelationshipTableEntries constructs and execute a SELECT query for the config relationship tables. +// +// A SELECT query against the relationship tables without the changed_at and deleted fields is generated for either all +// entries at the initial sync (selectAll := true) or a subset specified by `ids` in the `idField`. In case of an +// incremental sync without any `ìds`, the function directly returns without performing any query. +func (r *RuntimeConfig) selectRelationshipTableEntries( + ctx context.Context, + tx *sqlx.Tx, + typePtr interface{}, + idField string, + ids []int64, + selectAll bool, + dest interface{}, +) error { + if !selectAll && len(ids) == 0 { + r.logger.Debugw("Skipping query fetching relationship table as no IDs were requested", + zap.String("table", utils.TableName(typePtr))) + return nil + } + + stmt := r.db.BuildSelectStmt(typePtr, typePtr) + if !selectAll { + stmt += ` WHERE "` + idField + `" IN (` + stmt += strings.Repeat("?,", len(ids)) + stmt = stmt[:len(stmt)-1] + ")" + } + stmt = r.db.Rebind(stmt) + + args := make([]interface{}, 0, len(ids)) + if !selectAll { + for id := range ids { + args = append(args, id) + } + } + + r.logger.Debugw("Executing query to fetch relationship table", + zap.String("table", utils.TableName(typePtr)), + zap.String("id_field", idField), + zap.String("query", stmt), + zap.Bool("select_all", selectAll), + zap.Int("ids_in", len(ids))) + + return tx.SelectContext(ctx, dest, stmt, args...) +} diff --git a/internal/incident/incidents_test.go b/internal/incident/incidents_test.go index 23b24cd7f..9a9dba27d 100644 --- a/internal/incident/incidents_test.go +++ b/internal/incident/incidents_test.go @@ -23,7 +23,14 @@ func TestLoadOpenIncidents(t *testing.T) { db := testutils.GetTestDB(ctx, t) // Insert a dummy sources for our test cases! - source := config.Source{ID: 1, Type: "notifications", Name: "Icinga Notifications", Icinga2InsecureTLS: types.Bool{Bool: false, Valid: true}} + source := config.Source{ + ID: 1, + Type: "notifications", + Name: "Icinga Notifications", + Icinga2InsecureTLS: types.Bool{Bool: false, Valid: true}, + ChangedAt: types.UnixMilli(time.UnixMilli(1700000000)), + IsDeleted: types.Bool{Bool: false, Valid: true}, + } stmt, _ := db.BuildInsertStmt(source) _, err := db.NamedExecContext(ctx, stmt, source) require.NoError(t, err, "populating source table should not fail") diff --git a/internal/recipient/contact.go b/internal/recipient/contact.go index bf0eeb96a..b4666472b 100644 --- a/internal/recipient/contact.go +++ b/internal/recipient/contact.go @@ -2,6 +2,7 @@ package recipient import ( "database/sql" + "github.com/icinga/icingadb/pkg/types" "time" ) @@ -11,6 +12,9 @@ type Contact struct { Username sql.NullString `db:"username"` DefaultChannelID int64 `db:"default_channel_id"` Addresses []*Address + + ChangedAt types.UnixMilli `db:"changed_at" json:"changed_at"` + IsDeleted types.Bool `db:"deleted" json:"deleted"` } func (c *Contact) String() string { @@ -28,6 +32,9 @@ type Address struct { ContactID int64 `db:"contact_id"` Type string `db:"type"` Address string `db:"address"` + + ChangedAt types.UnixMilli `db:"changed_at" json:"changed_at"` + IsDeleted types.Bool `db:"deleted" json:"deleted"` } func (a *Address) TableName() string { diff --git a/internal/recipient/group.go b/internal/recipient/group.go index 1325667a6..6493602c6 100644 --- a/internal/recipient/group.go +++ b/internal/recipient/group.go @@ -1,12 +1,18 @@ package recipient -import "time" +import ( + "github.com/icinga/icingadb/pkg/types" + "time" +) type Group struct { ID int64 `db:"id"` Name string `db:"name"` Members []*Contact MemberIDs []int64 + + ChangedAt types.UnixMilli `db:"changed_at" json:"changed_at"` + IsDeleted types.Bool `db:"deleted" json:"deleted"` } func (g *Group) GetContactsAt(t time.Time) []*Contact { diff --git a/internal/recipient/schedule.go b/internal/recipient/schedule.go index 3be25bd42..22e90837f 100644 --- a/internal/recipient/schedule.go +++ b/internal/recipient/schedule.go @@ -3,6 +3,7 @@ package recipient import ( "database/sql" "github.com/icinga/icinga-notifications/internal/timeperiod" + "github.com/icinga/icingadb/pkg/types" "time" ) @@ -11,6 +12,9 @@ type Schedule struct { Name string `db:"name"` Members []*Member MemberRows []*ScheduleMemberRow + + ChangedAt types.UnixMilli `db:"changed_at" json:"changed_at"` + IsDeleted types.Bool `db:"deleted" json:"deleted"` } type Member struct { diff --git a/internal/rule/rule.go b/internal/rule/rule.go index b1f130574..55e1e2546 100644 --- a/internal/rule/rule.go +++ b/internal/rule/rule.go @@ -15,4 +15,7 @@ type Rule struct { ObjectFilter filter.Filter `db:"-"` ObjectFilterExpr types.String `db:"object_filter"` Escalations map[int64]*Escalation + + ChangedAt types.UnixMilli `db:"changed_at" json:"changed_at"` + IsDeleted types.Bool `db:"deleted" json:"deleted"` } diff --git a/internal/timeperiod/timeperiod.go b/internal/timeperiod/timeperiod.go index fb2ad6e60..4430e65cd 100644 --- a/internal/timeperiod/timeperiod.go +++ b/internal/timeperiod/timeperiod.go @@ -1,6 +1,7 @@ package timeperiod import ( + "github.com/icinga/icingadb/pkg/types" "github.com/teambition/rrule-go" "log" "time" @@ -10,6 +11,9 @@ type TimePeriod struct { ID int64 `db:"id"` Name string Entries []*Entry + + ChangedAt types.UnixMilli `db:"changed_at" json:"changed_at"` + IsDeleted types.Bool `db:"deleted" json:"deleted"` } func (p *TimePeriod) TableName() string { diff --git a/internal/utils/utils.go b/internal/utils/utils.go index f936e29e0..8ab3e0e0b 100644 --- a/internal/utils/utils.go +++ b/internal/utils/utils.go @@ -182,3 +182,14 @@ func IterateOrderedMap[K cmp.Ordered, V any](m map[K]V) func(func(K, V) bool) { } } } + +// MapKeys returns the keys of a map. +// +// As Golang maps are unordered, the returned keys are also in no particular or reproducible order. +func MapKeys[K comparable, V any](m map[K]V) []K { + keys := make([]K, 0, len(m)) + for key := range m { + keys = append(keys, key) + } + return keys +} diff --git a/schema/pgsql/schema.sql b/schema/pgsql/schema.sql index ad4ede4d1..28f0c7ffa 100644 --- a/schema/pgsql/schema.sql +++ b/schema/pgsql/schema.sql @@ -44,9 +44,14 @@ CREATE TABLE channel ( -- for now type determines the implementation, in the future, this will need a reference to a concrete -- implementation to allow multiple implementations of a sms channel for example, probably even user-provided ones + changed_at bigint NOT NULL DEFAULT extract(epoch from now()) * 1000, + deleted boolenum NOT NULL DEFAULT 'n', + CONSTRAINT pk_channel PRIMARY KEY (id) ); +CREATE INDEX idx_channel_changed_at ON channel(changed_at); + CREATE TABLE contact ( id bigserial, full_name text NOT NULL, @@ -54,28 +59,44 @@ CREATE TABLE contact ( default_channel_id bigint NOT NULL REFERENCES channel(id), color varchar(7) NOT NULL, -- hex color codes e.g #000000 + changed_at bigint NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()) * 1000, + deleted boolenum NOT NULL DEFAULT 'n', + CONSTRAINT pk_contact PRIMARY KEY (id), UNIQUE (username) ); +CREATE INDEX idx_contact_changed_at ON contact(changed_at); + CREATE TABLE contact_address ( id bigserial, contact_id bigint NOT NULL REFERENCES contact(id), type text NOT NULL, -- 'phone', 'email', ... address text NOT NULL, -- phone number, email address, ... + changed_at bigint NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()) * 1000, + deleted boolenum NOT NULL DEFAULT 'n', + CONSTRAINT pk_contact_address PRIMARY KEY (id), UNIQUE (contact_id, type) -- constraint may be relaxed in the future to support multiple addresses per type ); +CREATE INDEX idx_contact_address_changed_at ON contact_address(changed_at); + CREATE TABLE contactgroup ( id bigserial, name text NOT NULL, color varchar(7) NOT NULL, -- hex color codes e.g #000000 + changed_at bigint NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()) * 1000, + deleted boolenum NOT NULL DEFAULT 'n', + CONSTRAINT pk_contactgroup PRIMARY KEY (id) ); +CREATE INDEX idx_contactgroup_changed_at ON contactgroup(changed_at); + +-- Changes to contactgroup_member should be notified by an updated contactgroup.changed_at. CREATE TABLE contactgroup_member ( contactgroup_id bigint NOT NULL REFERENCES contactgroup(id), contact_id bigint NOT NULL REFERENCES contact(id), @@ -87,16 +108,27 @@ CREATE TABLE schedule ( id bigserial, name text NOT NULL, + changed_at bigint NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()) * 1000, + deleted boolenum NOT NULL DEFAULT 'n', + CONSTRAINT pk_schedule PRIMARY KEY (id) ); +CREATE INDEX idx_schedule_changed_at ON schedule(changed_at); + CREATE TABLE timeperiod ( id bigserial, owned_by_schedule_id bigint REFERENCES schedule(id), -- nullable for future standalone timeperiods + changed_at bigint NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()) * 1000, + deleted boolenum NOT NULL DEFAULT 'n', + CONSTRAINT pk_timeperiod PRIMARY KEY (id) ); +CREATE INDEX idx_timeperiod_changed_at ON timeperiod(changed_at); + +-- Changes to timeperiod_entry should be notified by an updated timeperiod.changed_at. CREATE TABLE timeperiod_entry ( id bigserial, timeperiod_id bigint NOT NULL REFERENCES timeperiod(id), @@ -115,6 +147,7 @@ CREATE TABLE timeperiod_entry ( CONSTRAINT pk_timeperiod_entry PRIMARY KEY (id) ); +-- Changes to schedule_member should be notified by an updated schedule.changed_at. CREATE TABLE schedule_member ( schedule_id bigint NOT NULL REFERENCES schedule(id), timeperiod_id bigint NOT NULL REFERENCES timeperiod(id), @@ -156,6 +189,9 @@ CREATE TABLE source ( icinga2_common_name text, icinga2_insecure_tls boolenum NOT NULL DEFAULT 'n', + changed_at bigint NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()) * 1000, + deleted boolenum NOT NULL DEFAULT 'n', + -- The hash is a PHP password_hash with PASSWORD_DEFAULT algorithm, defaulting to bcrypt. This check roughly ensures -- that listener_password_hash can only be populated with bcrypt hashes. -- https://icinga.com/docs/icinga-web/latest/doc/20-Advanced-Topics/#manual-user-creation-for-database-authentication-backend @@ -165,6 +201,8 @@ CREATE TABLE source ( CONSTRAINT pk_source PRIMARY KEY (id) ); +CREATE INDEX idx_source_changed_at ON source(changed_at); + CREATE TABLE object ( id bytea NOT NULL, -- SHA256 of identifying tags and the source.id source_id bigint NOT NULL REFERENCES source(id), @@ -214,9 +252,15 @@ CREATE TABLE rule ( object_filter text, is_active boolenum NOT NULL DEFAULT 'y', + changed_at bigint NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()) * 1000, + deleted boolenum NOT NULL DEFAULT 'n', + CONSTRAINT pk_rule PRIMARY KEY (id) ); +CREATE INDEX idx_rule_changed_at ON rule(changed_at); + +-- Changes to rule_escalation should be notified by an updated rule.changed_at. CREATE TABLE rule_escalation ( id bigserial, rule_id bigint NOT NULL REFERENCES rule(id), @@ -231,6 +275,7 @@ CREATE TABLE rule_escalation ( CHECK (NOT (condition IS NOT NULL AND fallback_for IS NOT NULL)) ); +-- Changes to rule_escalation_recipient should be notified by an updated rule.changed_at via rule_escalation.rule_id. CREATE TABLE rule_escalation_recipient ( id bigserial, rule_escalation_id bigint NOT NULL REFERENCES rule_escalation(id), diff --git a/schema/pgsql/upgrades/025.sql b/schema/pgsql/upgrades/025.sql new file mode 100644 index 000000000..22f734626 --- /dev/null +++ b/schema/pgsql/upgrades/025.sql @@ -0,0 +1,40 @@ +ALTER TABLE channel + ADD COLUMN changed_at bigint NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()) * 1000, + ADD COLUMN deleted boolenum NOT NULL DEFAULT 'n'; + +ALTER TABLE contact + ADD COLUMN changed_at bigint NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()) * 1000, + ADD COLUMN deleted boolenum NOT NULL DEFAULT 'n'; + +ALTER TABLE contact_address + ADD COLUMN changed_at bigint NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()) * 1000, + ADD COLUMN deleted boolenum NOT NULL DEFAULT 'n'; + +ALTER TABLE contactgroup + ADD COLUMN changed_at bigint NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()) * 1000, + ADD COLUMN deleted boolenum NOT NULL DEFAULT 'n'; + +ALTER TABLE timeperiod + ADD COLUMN changed_at bigint NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()) * 1000, + ADD COLUMN deleted boolenum NOT NULL DEFAULT 'n'; + +ALTER TABLE schedule + ADD COLUMN changed_at bigint NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()) * 1000, + ADD COLUMN deleted boolenum NOT NULL DEFAULT 'n'; + +ALTER TABLE rule + ADD COLUMN changed_at bigint NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()) * 1000, + ADD COLUMN deleted boolenum NOT NULL DEFAULT 'n'; + +ALTER TABLE source + ADD COLUMN changed_at bigint NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()) * 1000, + ADD COLUMN deleted boolenum NOT NULL DEFAULT 'n'; + +CREATE INDEX idx_channel_changed_at ON channel(changed_at); +CREATE INDEX idx_contact_changed_at ON contact(changed_at); +CREATE INDEX idx_contact_address_changed_at ON contact_address(changed_at); +CREATE INDEX idx_contactgroup_changed_at ON contactgroup(changed_at); +CREATE INDEX idx_timeperiod_changed_at ON timeperiod(changed_at); +CREATE INDEX idx_schedule_changed_at ON schedule(changed_at); +CREATE INDEX idx_rule_changed_at ON rule(changed_at); +CREATE INDEX idx_source_changed_at ON source(changed_at);