Skip to content

Commit

Permalink
Incremental Config Updates
Browse files Browse the repository at this point in the history
Enable incremental configuration updates by introducing two new columns
- changed_at and deleted - for all tables directly referenced in the
ConfigSet. The other relationship tables are requiring a changed_at
update in their relative parent table.

As a limitation, deleted rows within the database cannot be detected as
the deletion logic now completely relies on the deleted column. Thus,
deletions must be performed by setting both changed_at and deleted.

Closes #5.
  • Loading branch information
oxzi committed May 7, 2024
1 parent 1bd95b5 commit 4c482cf
Show file tree
Hide file tree
Showing 20 changed files with 498 additions and 214 deletions.
4 changes: 4 additions & 0 deletions internal/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/icinga/icinga-notifications/internal/event"
"github.com/icinga/icinga-notifications/internal/recipient"
"github.com/icinga/icinga-notifications/pkg/plugin"
"github.com/icinga/icingadb/pkg/types"
"go.uber.org/zap"
"net/url"
)
Expand All @@ -18,6 +19,9 @@ type Channel struct {
Type string `db:"type"`
Config string `db:"config" json:"-"` // excluded from JSON config dump as this may contain sensitive information

ChangedAt types.UnixMilli `db:"changed_at" json:"changed_at"`
IsDeleted types.Bool `db:"deleted" json:"deleted"`

Logger *zap.SugaredLogger

restartCh chan newConfig
Expand Down
62 changes: 31 additions & 31 deletions internal/config/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,49 +3,47 @@ package config
import (
"context"
"github.com/icinga/icinga-notifications/internal/channel"
"github.com/icinga/icingadb/pkg/utils"
"github.com/jmoiron/sqlx"
"go.uber.org/zap"
)

func (r *RuntimeConfig) fetchChannels(ctx context.Context, tx *sqlx.Tx) error {
var channelPtr *channel.Channel
stmt := r.db.BuildSelectStmt(channelPtr, channelPtr)
r.logger.Debugf("Executing query %q", stmt)
changedAt := r.pendingLastChange[utils.TableName(channelPtr)]
stmt := r.buildSelectStmtWhereChangedAt(channelPtr)
r.logger.Debugw("Executing query to fetch channels",
zap.String("query", stmt), zap.Time("changed_at_after", changedAt.Time()))

var channels []*channel.Channel
if err := tx.SelectContext(ctx, &channels, stmt); err != nil {
if err := tx.SelectContext(ctx, &channels, stmt, changedAt); err != nil {
r.logger.Errorln(err)
return err
}

channelsById := make(map[int64]*channel.Channel)
for _, c := range channels {
changedAt = c.ChangedAt

channelLogger := r.logger.With(
zap.Int64("id", c.ID),
zap.String("name", c.Name),
zap.String("type", c.Type),
)
if channelsById[c.ID] != nil {
channelLogger.Warnw("ignoring duplicate config for channel type")
zap.Time("changed_at", c.ChangedAt.Time()))

if c.IsDeleted.Valid && c.IsDeleted.Bool {
channelsById[c.ID] = nil
channelLogger.Debug("Marking channel as deleted")
} else if err := channel.ValidateType(c.Type); err != nil {
channelLogger.Errorw("Cannot load channel config", zap.Error(err))
} else {
channelsById[c.ID] = c

channelLogger.Debugw("loaded channel config")
}
}

if r.Channels != nil {
// mark no longer existing channels for deletion
for id := range r.Channels {
if _, ok := channelsById[id]; !ok {
channelsById[id] = nil
}
channelLogger.Debug("Loaded channel config")
}
}

r.pending.Channels = channelsById
r.pendingLastChange[utils.TableName(channelPtr)] = changedAt

return nil
}
Expand All @@ -56,28 +54,30 @@ func (r *RuntimeConfig) applyPendingChannels() {
}

for id, pendingChannel := range r.pending.Channels {
existingChannel, existingChannelOk := r.Channels[id]
if pendingChannel == nil {
r.Channels[id].Logger.Info("Channel has been removed")
r.Channels[id].Stop()
if !existingChannelOk {
continue
}
existingChannel.Logger.Info("Channel has been removed")

existingChannel.Stop()
delete(r.Channels, id)
} else if currentChannel := r.Channels[id]; currentChannel != nil {
// Currently, the whole config is reloaded from the database frequently, replacing everything.
// Prevent restarting the plugin processes every time by explicitly checking for config changes.
// The if condition should no longer be necessary when https://github.com/Icinga/icinga-notifications/issues/5
// is solved properly.
if currentChannel.Type != pendingChannel.Type || currentChannel.Name != pendingChannel.Name || currentChannel.Config != pendingChannel.Config {
currentChannel.Type = pendingChannel.Type
currentChannel.Name = pendingChannel.Name
currentChannel.Config = pendingChannel.Config

currentChannel.Restart()
}
} else if existingChannelOk {
existingChannel.Logger.Info("Channel has been updated")

existingChannel.Name = pendingChannel.Name
existingChannel.Type = pendingChannel.Type
existingChannel.Config = pendingChannel.Config

existingChannel.Restart()
} else {
pendingChannel.Start(context.TODO(), r.logs.GetChildLogger("channel").With(
zap.Int64("id", pendingChannel.ID),
zap.String("name", pendingChannel.Name)))

pendingChannel.Logger.Info("Channel has been added")

r.Channels[id] = pendingChannel
}
}
Expand Down
44 changes: 30 additions & 14 deletions internal/config/contact.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,40 +3,44 @@ package config
import (
"context"
"github.com/icinga/icinga-notifications/internal/recipient"
"github.com/icinga/icingadb/pkg/utils"
"github.com/jmoiron/sqlx"
"go.uber.org/zap"
)

func (r *RuntimeConfig) fetchContacts(ctx context.Context, tx *sqlx.Tx) error {
var contactPtr *recipient.Contact
stmt := r.db.BuildSelectStmt(contactPtr, contactPtr)
r.logger.Debugf("Executing query %q", stmt)
changedAt := r.pendingLastChange[utils.TableName(contactPtr)]
stmt := r.buildSelectStmtWhereChangedAt(contactPtr)
r.logger.Debugw("Executing query to fetch contacts",
zap.String("query", stmt), zap.Time("changed_at_after", changedAt.Time()))

var contacts []*recipient.Contact
if err := tx.SelectContext(ctx, &contacts, stmt); err != nil {
if err := tx.SelectContext(ctx, &contacts, stmt, changedAt); err != nil {
r.logger.Errorln(err)
return err
}

contactsByID := make(map[int64]*recipient.Contact)
for _, c := range contacts {
contactsByID[c.ID] = c
changedAt = c.ChangedAt

r.logger.Debugw("loaded contact config",
contactLogger := r.logger.With(
zap.Int64("id", c.ID),
zap.String("name", c.FullName))
}
zap.String("name", c.FullName),
zap.Time("changed_at", c.ChangedAt.Time()))

if r.Contacts != nil {
// mark no longer existing contacts for deletion
for id := range r.Contacts {
if _, ok := contactsByID[id]; !ok {
contactsByID[id] = nil
}
if c.IsDeleted.Valid && c.IsDeleted.Bool {
contactsByID[c.ID] = nil
contactLogger.Debug("Marking contact as deleted")
} else {
contactsByID[c.ID] = c
contactLogger.Debug("Loaded contact config")
}
}

r.pending.Contacts = contactsByID
r.pendingLastChange[utils.TableName(contactPtr)] = changedAt

return nil
}
Expand All @@ -47,13 +51,25 @@ func (r *RuntimeConfig) applyPendingContacts() {
}

for id, pendingContact := range r.pending.Contacts {
contactLogger := r.logger.With(zap.Int64("id", id))
currentContact, currentContactOk := r.Contacts[id]

if pendingContact == nil {
if !currentContactOk {
continue
}
contactLogger.Info("Contact has been removed")

delete(r.Contacts, id)
} else if currentContact := r.Contacts[id]; currentContact != nil {
} else if currentContactOk {
contactLogger.Info("Contact has been updated")

currentContact.FullName = pendingContact.FullName
currentContact.Username = pendingContact.Username
currentContact.DefaultChannelID = pendingContact.DefaultChannelID
} else {
contactLogger.Info("Contact has been added")

r.Contacts[id] = pendingContact
}
}
Expand Down
37 changes: 22 additions & 15 deletions internal/config/contact_address.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,43 +3,47 @@ package config
import (
"context"
"github.com/icinga/icinga-notifications/internal/recipient"
"github.com/icinga/icingadb/pkg/utils"
"github.com/jmoiron/sqlx"
"go.uber.org/zap"
"slices"
)

func (r *RuntimeConfig) fetchContactAddresses(ctx context.Context, tx *sqlx.Tx) error {
var addressPtr *recipient.Address
stmt := r.db.BuildSelectStmt(addressPtr, addressPtr)
r.logger.Debugf("Executing query %q", stmt)
changedAt := r.pendingLastChange[utils.TableName(addressPtr)]
stmt := r.buildSelectStmtWhereChangedAt(addressPtr)
r.logger.Debugw("Executing query to fetch contact addresses",
zap.String("query", stmt), zap.Time("changed_at_after", changedAt.Time()))

var addresses []*recipient.Address
if err := tx.SelectContext(ctx, &addresses, stmt); err != nil {
if err := tx.SelectContext(ctx, &addresses, stmt, changedAt); err != nil {
r.logger.Errorln(err)
return err
}

addressesById := make(map[int64]*recipient.Address)
for _, a := range addresses {
addressesById[a.ID] = a
r.logger.Debugw("loaded contact_address config",
changedAt = a.ChangedAt

addressLogger := r.logger.With(
zap.Int64("id", a.ID),
zap.Int64("contact_id", a.ContactID),
zap.String("type", a.Type),
zap.String("address", a.Address),
)
}
zap.Time("changed_at", a.ChangedAt.Time()))

if r.ContactAddresses != nil {
// mark no longer existing contacts for deletion
for id := range r.ContactAddresses {
if _, ok := addressesById[id]; !ok {
addressesById[id] = nil
}
if a.IsDeleted.Valid && a.IsDeleted.Bool {
addressesById[a.ID] = nil
addressLogger.Debug("Marking contact address as deleted")
} else {
addressesById[a.ID] = a
addressLogger.Debug("Loaded contact address config")
}
}

r.pending.ContactAddresses = addressesById
r.pendingLastChange[utils.TableName(addressPtr)] = changedAt

return nil
}
Expand All @@ -50,11 +54,14 @@ func (r *RuntimeConfig) applyPendingContactAddresses() {
}

for id, pendingAddress := range r.pending.ContactAddresses {
currentAddress := r.ContactAddresses[id]
currentAddress, currentAddressOk := r.ContactAddresses[id]

if pendingAddress == nil {
if !currentAddressOk {
continue
}
r.removeContactAddress(currentAddress)
} else if currentAddress != nil {
} else if currentAddressOk {
r.updateContactAddress(currentAddress, pendingAddress)
} else {
r.addContactAddress(pendingAddress)
Expand Down
64 changes: 38 additions & 26 deletions internal/config/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,41 +3,58 @@ package config
import (
"context"
"github.com/icinga/icinga-notifications/internal/recipient"
"github.com/icinga/icinga-notifications/internal/utils"
dbutils "github.com/icinga/icingadb/pkg/utils"
"github.com/jmoiron/sqlx"
"go.uber.org/zap"
)

func (r *RuntimeConfig) fetchGroups(ctx context.Context, tx *sqlx.Tx) error {
var groupPtr *recipient.Group
stmt := r.db.BuildSelectStmt(groupPtr, groupPtr)
r.logger.Debugf("Executing query %q", stmt)
changedAt := r.pendingLastChange[dbutils.TableName(groupPtr)]
stmt := r.buildSelectStmtWhereChangedAt(groupPtr)
r.logger.Debugw("Executing query to fetch groups",
zap.String("query", stmt), zap.Time("changed_at_after", changedAt.Time()))

var groups []*recipient.Group
if err := tx.SelectContext(ctx, &groups, stmt); err != nil {
if err := tx.SelectContext(ctx, &groups, stmt, changedAt); err != nil {
r.logger.Errorln(err)
return err
}

groupsById := make(map[int64]*recipient.Group)
for _, g := range groups {
groupsById[g.ID] = g
changedAt = g.ChangedAt

r.logger.Debugw("loaded group config",
groupLogger := r.logger.With(
zap.Int64("id", g.ID),
zap.String("name", g.Name))
zap.String("name", g.Name),
zap.Time("changed_at", g.ChangedAt.Time()))

if g.IsDeleted.Valid && g.IsDeleted.Bool {
groupsById[g.ID] = nil
groupLogger.Debug("Marking group as deleted")
} else {
groupsById[g.ID] = g
groupLogger.Debug("Loaded group config")
}
}

type ContactgroupMember struct {
GroupId int64 `db:"contactgroup_id"`
ContactId int64 `db:"contact_id"`
}

var memberPtr *ContactgroupMember
stmt = r.db.BuildSelectStmt(memberPtr, memberPtr)
r.logger.Debugf("Executing query %q", stmt)

var members []*ContactgroupMember
if err := tx.SelectContext(ctx, &members, stmt); err != nil {
err := r.selectRelationshipTableEntries(
ctx,
tx,
new(ContactgroupMember),
"contactgroup_id",
utils.MapKeys(groupsById),
r.Groups == nil,
&members)
if err != nil {
r.logger.Errorln(err)
return err
}
Expand All @@ -48,26 +65,21 @@ func (r *RuntimeConfig) fetchGroups(ctx context.Context, tx *sqlx.Tx) error {
zap.Int64("contactgroup_id", m.GroupId),
)

if g := groupsById[m.GroupId]; g == nil {
memberLogger.Warnw("ignoring member for unknown contactgroup_id")
} else {
g.MemberIDs = append(g.MemberIDs, m.ContactId)

memberLogger.Debugw("loaded contact group member",
zap.String("contactgroup_name", g.Name))
g, ok := groupsById[m.GroupId]
if !ok {
memberLogger.Warn("Ignoring member for unknown contactgroup_id")
continue
} else if g == nil {
memberLogger.Debug("Skipping deleted member for unknown contactgroup_id")
continue
}
}

if r.Groups != nil {
// mark no longer existing groups for deletion
for id := range r.Groups {
if _, ok := groupsById[id]; !ok {
groupsById[id] = nil
}
}
g.MemberIDs = append(g.MemberIDs, m.ContactId)
memberLogger.Info("Loaded contact group member", zap.String("contactgroup_name", g.Name))
}

r.pending.Groups = groupsById
r.pendingLastChange[dbutils.TableName(groupPtr)] = changedAt

return nil
}
Expand Down
Loading

0 comments on commit 4c482cf

Please sign in to comment.