From 02603883b729383fcdca330004c7261d6247474b Mon Sep 17 00:00:00 2001 From: Sukhwinder Dhillon Date: Tue, 10 Oct 2023 12:22:52 +0200 Subject: [PATCH] WIP --- cmd/channel/email/main.go | 78 +++++++++- cmd/channel/rocketchat/main.go | 45 +++++- cmd/icinga-notifications-daemon/main.go | 3 + internal/channel/channel.go | 199 ++++++++++++++---------- internal/channel/plugin.go | 184 ++++++++++++++++------ internal/config/channel.go | 15 +- internal/incident/incident.go | 19 +-- pkg/plugin/plugin.go | 31 +++- pkg/rpc/rpc.go | 17 +- schema/pgsql/schema.sql | 12 +- schema/pgsql/upgrades/plugin_table.sql | 15 ++ 11 files changed, 458 insertions(+), 160 deletions(-) create mode 100644 schema/pgsql/upgrades/plugin_table.sql diff --git a/cmd/channel/email/main.go b/cmd/channel/email/main.go index ac6edd56b..543c76290 100644 --- a/cmd/channel/email/main.go +++ b/cmd/channel/email/main.go @@ -3,7 +3,9 @@ package main import ( "bytes" "encoding/json" + "errors" "fmt" + "github.com/icinga/icinga-notifications/internal" "github.com/icinga/icinga-notifications/pkg/plugin" "net" "net/smtp" @@ -54,6 +56,10 @@ func (ch *Email) SetConfig(jsonStr json.RawMessage) error { return fmt.Errorf("failed to load config: %s %w", jsonStr, err) } + if ch.From == "fail" { //TODO: remove it + return errors.New("dummy fail") + } + if ch.Host == "" { ch.Host = "localhost" } @@ -80,7 +86,77 @@ func (ch *Email) SetConfig(jsonStr json.RawMessage) error { } func (ch *Email) GetInfo() *plugin.Info { - return &plugin.Info{Name: "Email"} + elements := []*plugin.FormElement{ + { + Name: "host", + Type: "string", + Label: map[string]string{ + "en_US": "SMTP Host", + "de_DE": "SMTP Host", + }, + Default: "localhost", + }, + { + Name: "port", + Type: "option", + Label: map[string]string{ + "en_US": "SMTP Port", + "de_DE": "SMTP Port", + }, + Options: map[string]string{ + "25": "25", + "465": "465", + "587": "587", + "2525": "2525", + }, + }, + { + Name: "from", + Type: "string", + Label: map[string]string{ + "en_US": "From", + "de_DE": "From", + }, + Default: "notifications@icinga", + }, + { + Name: "password", + Type: "secret", + Label: map[string]string{ + "en_US": "Password", + "de_DE": "Password", + }, + }, + { + Name: "tls", + Type: "bool", + Label: map[string]string{ + "en_US": "TLS / SSL", + "de_DE": "TLS / SSL", + }, + }, + { + Name: "tls_certcheck", + Type: "bool", + Label: map[string]string{ + "en_US": "Certificate Check", + "de_DE": "Certificate Check", + }, + }, + } + + configAttrs, err := json.Marshal(elements) + if err != nil { + panic(err) + } + + return &plugin.Info{ + Type: "email", + Name: "Email", + Version: internal.Version.Version, + AuthorName: "Icinga GmbH", + ConfigAttributes: configAttrs, + } } func (ch *Email) GetServer() string { diff --git a/cmd/channel/rocketchat/main.go b/cmd/channel/rocketchat/main.go index db919529b..f3e6026b9 100644 --- a/cmd/channel/rocketchat/main.go +++ b/cmd/channel/rocketchat/main.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "github.com/icinga/icinga-notifications/internal" "github.com/icinga/icinga-notifications/pkg/plugin" "net/http" "time" @@ -80,5 +81,47 @@ func (ch *RocketChat) SetConfig(jsonStr json.RawMessage) error { } func (ch *RocketChat) GetInfo() *plugin.Info { - return &plugin.Info{Name: "Rocket.Chat"} + + elements := []*plugin.FormElement{ + { + Name: "url", + Type: "string", + Label: map[string]string{ + "en_US": "Rocket.Chat URL", + "de_DE": "Rocket.Chat URL", + }, + Required: true, + }, + { + Name: "user_id", + Type: "string", + Label: map[string]string{ + "en_US": "User ID", + "de_DE": "User ID", + }, + Required: true, + }, + { + Name: "token", + Type: "secret", + Label: map[string]string{ + "en_US": "Personal Access Token", + "de_DE": "Personal Access Token", + }, + Required: true, + }, + } + + configAttrs, err := json.Marshal(elements) + if err != nil { + panic(err) + } + + return &plugin.Info{ + Type: "rocketchat", + Name: "Rocket.Chat", + Version: internal.Version.Version, + AuthorName: "Icinga GmbH", + ConfigAttributes: configAttrs, + } } diff --git a/cmd/icinga-notifications-daemon/main.go b/cmd/icinga-notifications-daemon/main.go index 4a685245f..b94cc2fb4 100644 --- a/cmd/icinga-notifications-daemon/main.go +++ b/cmd/icinga-notifications-daemon/main.go @@ -5,6 +5,7 @@ import ( "flag" "fmt" "github.com/icinga/icinga-notifications/internal" + "github.com/icinga/icinga-notifications/internal/channel" "github.com/icinga/icinga-notifications/internal/config" "github.com/icinga/icinga-notifications/internal/daemonConfig" "github.com/icinga/icinga-notifications/internal/listener" @@ -76,6 +77,8 @@ func main() { } } + channel.SyncPlugins(conf.ChannelPluginDir, logs, db) + runtimeConfig := config.NewRuntimeConfig(db, logs) if err := runtimeConfig.UpdateFromDatabase(context.TODO()); err != nil { logger.Fatalw("failed to load config from database", zap.Error(err)) diff --git a/internal/channel/channel.go b/internal/channel/channel.go index 6486b18fe..cbbb90f55 100644 --- a/internal/channel/channel.go +++ b/internal/channel/channel.go @@ -1,17 +1,14 @@ package channel import ( - "bufio" + "errors" "fmt" - "github.com/icinga/icinga-notifications/internal/daemonConfig" - "github.com/icinga/icinga-notifications/pkg/rpc" + "github.com/icinga/icinga-notifications/internal/contracts" + "github.com/icinga/icinga-notifications/internal/event" + "github.com/icinga/icinga-notifications/internal/recipient" + "github.com/icinga/icinga-notifications/pkg/plugin" "go.uber.org/zap" - "io" - "os/exec" - "path/filepath" - "regexp" - "sync" - "time" + "strings" ) type Channel struct { @@ -21,109 +18,141 @@ type Channel struct { Config string `db:"config" json:"-"` // excluded from JSON config dump as this may contain sensitive information Logger *zap.SugaredLogger - cmd *exec.Cmd - rpc *rpc.RPC - mu sync.Mutex -} -func (c *Channel) Start() error { - c.mu.Lock() - defer c.mu.Unlock() + newConfigCh chan struct{} + stopPluginCh chan struct{} + notificationCh chan Req +} - if c.cmd != nil && c.rpc.Err() == nil { - return nil - } +type Req struct { + req *plugin.NotificationRequest + out chan<- error +} - if matched, _ := regexp.MatchString("^[a-zA-Z0-9]*$", c.Type); !matched { - return fmt.Errorf("channel type must only contain a-zA-Z0-9, %q given", c.Type) - } +func (c *Channel) Start(logger *zap.SugaredLogger) { + c.Logger = logger + c.newConfigCh = make(chan struct{}) + c.stopPluginCh = make(chan struct{}) + c.notificationCh = make(chan Req, 1) - path := filepath.Join(daemonConfig.Config().ChannelPluginDir, c.Type) + go c.runPlugin() +} - cmd := exec.Command(path) +func (c *Channel) initPlugin() *Plugin { + c.Logger.Debug("Initializing channel plugin") - writer, err := cmd.StdinPipe() + p, err := NewPlugin(c.Type, c.Logger) if err != nil { - return fmt.Errorf("failed to create stdin pipe: %w", err) + c.Logger.Errorw("Failed to initialize channel plugin", zap.Error(err)) + return nil + } else if err := p.SetConfig(c.Config); err != nil { + c.Logger.Errorw("Failed to set channel plugin config", err) + go terminate(p) + return nil } - reader, err := cmd.StdoutPipe() - if err != nil { - return fmt.Errorf("failed to create stdout pipe: %w", err) - } + return p +} - errPipe, err := cmd.StderrPipe() - if err != nil { - return fmt.Errorf("failed to create stderr pipe: %w", err) - } +func (c *Channel) runPlugin() { + var currentlyRunningPlugin *Plugin + for { + if currentlyRunningPlugin == nil { + currentlyRunningPlugin = c.initPlugin() + } - go forwardLogs(errPipe, c.Logger) + var rpcDone <-chan struct{} + if currentlyRunningPlugin != nil { + rpcDone = currentlyRunningPlugin.rpc.Done() + } - if err = cmd.Start(); err != nil { - return fmt.Errorf("failed to start cmd: %w", err) - } - c.Logger.Debug("Cmd started successfully") + select { + case <-rpcDone: + c.Logger.Debug("rpc.Done(): Restarting plugin") - c.cmd = cmd - c.rpc = rpc.NewRPC(writer, reader, c.Logger) + if currentlyRunningPlugin != nil { + go terminate(currentlyRunningPlugin) + currentlyRunningPlugin = nil + } - if err = c.SetConfig(c.Config); err != nil { - go c.terminate(c.cmd, c.rpc) + continue + case <-c.newConfigCh: + c.Logger.Debug("new config detected: Restarting plugin") - c.cmd = nil - c.rpc = nil + if currentlyRunningPlugin != nil { + go terminate(currentlyRunningPlugin) + currentlyRunningPlugin = nil + } - return fmt.Errorf("failed to set config: %w", err) - } - c.Logger.Debugw("Successfully set config", zap.String("config", c.Config)) + continue + case <-c.stopPluginCh: + c.Logger.Debug("Stopping the channel plugin") - return nil -} + if currentlyRunningPlugin != nil { + go terminate(currentlyRunningPlugin) + currentlyRunningPlugin = nil + } -func (c *Channel) Stop() { - c.mu.Lock() - defer c.mu.Unlock() + return + case req := <-c.notificationCh: + if currentlyRunningPlugin == nil { + currentlyRunningPlugin = c.initPlugin() + } - if c.cmd == nil { - c.Logger.Debug("channel plugin has already been stopped") - return + if currentlyRunningPlugin == nil { + errMsg := "cannot send notification, plugin is not running" + c.Logger.Debug(errMsg) + req.out <- errors.New(errMsg) + } else { + go func(p *Plugin) { + req.out <- p.SendNotification(req.req) + }(currentlyRunningPlugin) + } + } } +} - go c.terminate(c.cmd, c.rpc) - - c.cmd = nil - c.rpc = nil - - c.Logger.Debug("Stopped channel plugin successfully") +func (c *Channel) Stop() { + c.stopPluginCh <- struct{}{} } -func forwardLogs(errPipe io.Reader, logger *zap.SugaredLogger) { - reader := bufio.NewReader(errPipe) - for { - line, err := reader.ReadString('\n') - if err != nil { - if err != io.EOF { - logger.Errorw("Failed to read stderr line", zap.Error(err)) - } +func (c *Channel) ReloadConfig() { + c.newConfigCh <- struct{}{} +} - return - } +func (c *Channel) Notify(contact *recipient.Contact, i contracts.Incident, ev *event.Event, icingaweb2Url string) error { + contactStruct := &plugin.Contact{FullName: contact.FullName} + for _, addr := range contact.Addresses { + contactStruct.Addresses = append(contactStruct.Addresses, &plugin.Address{Type: addr.Type, Address: addr.Address}) + } - logger.Info(line) + if !strings.HasSuffix(icingaweb2Url, "/") { + icingaweb2Url += "/" } -} -// run as go routine to terminate given channel -func (c *Channel) terminate(cmd *exec.Cmd, rpc *rpc.RPC) { - c.Logger.Debug("terminating channel plugin") - _ = rpc.Close() + req := &plugin.NotificationRequest{ + Contact: contactStruct, + Object: &plugin.Object{ + Name: i.ObjectDisplayName(), + Url: ev.URL, + Tags: ev.Tags, + ExtraTags: ev.ExtraTags, + }, + Incident: &plugin.Incident{ + Id: i.ID(), + Url: fmt.Sprintf("%snotifications/incident?id=%d", icingaweb2Url, i.ID()), + }, + Event: &plugin.Event{ + Time: ev.Time, + Type: ev.Type, + Severity: ev.Severity.String(), + Username: ev.Username, + Message: ev.Message, + }, + } - timer := time.AfterFunc(5*time.Second, func() { - c.Logger.Debug("killing the channel plugin") - _ = cmd.Process.Kill() - }) + out := make(chan error, 1) + c.notificationCh <- Req{req: req, out: out} - _ = cmd.Wait() - timer.Stop() - c.Logger.Debug("Channel plugin terminated successfully") + return <-out } diff --git a/internal/channel/plugin.go b/internal/channel/plugin.go index 8d510e980..216fc01af 100644 --- a/internal/channel/plugin.go +++ b/internal/channel/plugin.go @@ -1,19 +1,77 @@ package channel import ( + "bufio" "encoding/json" - "errors" "fmt" - "github.com/icinga/icinga-notifications/internal/contracts" - "github.com/icinga/icinga-notifications/internal/event" - "github.com/icinga/icinga-notifications/internal/recipient" + "github.com/icinga/icinga-notifications/internal/daemonConfig" "github.com/icinga/icinga-notifications/pkg/plugin" "github.com/icinga/icinga-notifications/pkg/rpc" - "strings" + "github.com/icinga/icingadb/pkg/icingadb" + "github.com/icinga/icingadb/pkg/logging" + "go.uber.org/zap" + "io" + "os" + "os/exec" + "path/filepath" + "sync" + "time" ) -func (c *Channel) GetInfo() (*plugin.Info, error) { - result, err := c.rpcCall(plugin.MethodGetInfo, nil) +type Plugin struct { + cmd *exec.Cmd + rpc *rpc.RPC + mu sync.Mutex + logger *zap.SugaredLogger +} + +// NewPlugin starts and returns a new plugin instance. If the start of the plugin fails, an error is returned +func NewPlugin(pluginName string, logger *zap.SugaredLogger) (*Plugin, error) { + p := &Plugin{logger: logger} + + p.mu.Lock() + defer p.mu.Unlock() + + cmd := exec.Command(filepath.Join(daemonConfig.Config().ChannelPluginDir, pluginName)) + + writer, err := cmd.StdinPipe() + if err != nil { + return nil, fmt.Errorf("failed to create stdin pipe: %w", err) + } + + reader, err := cmd.StdoutPipe() + if err != nil { + return nil, fmt.Errorf("failed to create stdout pipe: %w", err) + } + + errPipe, err := cmd.StderrPipe() + if err != nil { + return nil, fmt.Errorf("failed to create stderr pipe: %w", err) + } + + go forwardLogs(errPipe, p.logger) + + if err = cmd.Start(); err != nil { + return nil, fmt.Errorf("failed to start cmd: %w", err) + } + p.logger.Info("Channel plugin started successfully") + + p.cmd = cmd + p.rpc = rpc.NewRPC(writer, reader, p.logger) + + return p, nil +} + +// Stop stops the plugin +func (p *Plugin) Stop() { + p.logger.Debug("plugin.Stop() triggered") + + go terminate(p) +} + +// GetInfo sends the PluginInfo request and returns the response or an error if an error occurred +func (p *Plugin) GetInfo() (*plugin.Info, error) { + result, err := p.rpc.Call(plugin.MethodGetInfo, nil) if err != nil { return nil, err } @@ -27,60 +85,94 @@ func (c *Channel) GetInfo() (*plugin.Info, error) { return info, nil } -func (c *Channel) SetConfig(config string) error { - _, err := c.rpcCall(plugin.MethodSetConfig, json.RawMessage(config)) +// SetConfig sends the setConfig request with given config, returns an error if an error occurred +func (p *Plugin) SetConfig(config string) error { + _, err := p.rpc.Call(plugin.MethodSetConfig, json.RawMessage(config)) return err } -func (c *Channel) SendNotification(contact *recipient.Contact, i contracts.Incident, ev *event.Event, icingaweb2Url string) error { - contactStruct := &plugin.Contact{FullName: contact.FullName} - for _, addr := range contact.Addresses { - contactStruct.Addresses = append(contactStruct.Addresses, &plugin.Address{Type: addr.Type, Address: addr.Address}) +func (p *Plugin) SendNotification(req *plugin.NotificationRequest) error { + params, err := json.Marshal(req) + if err != nil { + return fmt.Errorf("failed to prepare request params: %w", err) } - if !strings.HasSuffix(icingaweb2Url, "/") { - icingaweb2Url += "/" - } + _, err = p.rpc.Call(plugin.MethodSendNotification, params) + + return err +} + +func terminate(p *Plugin) { + p.logger.Debug("Stopping the channel plugin") + p.mu.Lock() + defer p.mu.Unlock() + + _ = p.rpc.Close() + timer := time.AfterFunc(5*time.Second, func() { + p.logger.Debug("killing the channel plugin") + _ = p.cmd.Process.Kill() + }) + + <-p.rpc.Done() + timer.Stop() + p.logger.Debug("Stopped channel plugin successfully") +} - req := plugin.NotificationRequest{ - Contact: contactStruct, - Object: &plugin.Object{ - Name: i.ObjectDisplayName(), - Url: ev.URL, - Tags: ev.Tags, - ExtraTags: ev.ExtraTags, - }, - Incident: &plugin.Incident{ - Id: i.ID(), - Url: fmt.Sprintf("%snotifications/incident?id=%d", icingaweb2Url, i.ID()), - }, - Event: &plugin.Event{ - Time: ev.Time, - Type: ev.Type, - Severity: ev.Severity.String(), - Username: ev.Username, - Message: ev.Message, - }, +func forwardLogs(errPipe io.Reader, logger *zap.SugaredLogger) { + reader := bufio.NewReader(errPipe) + for { + line, err := reader.ReadString('\n') + if err != nil { + if err != io.EOF { + logger.Errorw("Failed to read stderr line", zap.Error(err)) + } + + return + } + + logger.Info(line) } +} - params, err := json.Marshal(req) +func SyncPlugins(channelPluginDir string, logs *logging.Logging, db *icingadb.DB) { + logger := logs.GetChildLogger("channel") + files, err := os.ReadDir(channelPluginDir) if err != nil { - return fmt.Errorf("failed to prepare request params: %w", err) + logger.Error(err) } - _, err = c.rpcCall(plugin.MethodSendNotification, params) + var pluginInfos []*plugin.Info - return err -} + for _, file := range files { + pluginLogger := logger.With(zap.String("name", file.Name())) + p, err := NewPlugin(file.Name(), pluginLogger) + if err != nil { + pluginLogger.Error(err) + continue + } + + info, err := p.GetInfo() + if err != nil { + p.logger.Error(err) + p.Stop() + continue + } + p.Stop() -func (c *Channel) rpcCall(method string, params json.RawMessage) (json.RawMessage, error) { - result, err := c.rpc.Call(method, params) + pluginInfos = append(pluginInfos, info) + } - var rpcErr *rpc.Error - if errors.As(err, &rpcErr) { - c.Stop() + if len(pluginInfos) == 0 { + logger.Info("No working plugin found") + return } - return result, err + stmt, _ := db.BuildUpsertStmt(&plugin.Info{}) + _, err = db.NamedExec(stmt, pluginInfos) + if err != nil { + logger.Error("failed to upsert channel plugin: ", err) + } else { + logger.Infof("Successfully upsert %d plugins", len(pluginInfos)) + } } diff --git a/internal/config/channel.go b/internal/config/channel.go index cc04572c1..8bb30271b 100644 --- a/internal/config/channel.go +++ b/internal/config/channel.go @@ -5,6 +5,7 @@ import ( "github.com/icinga/icinga-notifications/internal/channel" "github.com/jmoiron/sqlx" "go.uber.org/zap" + "regexp" ) func (r *RuntimeConfig) fetchChannels(ctx context.Context, tx *sqlx.Tx) error { @@ -27,8 +28,9 @@ func (r *RuntimeConfig) fetchChannels(ctx context.Context, tx *sqlx.Tx) error { ) if channelsById[c.ID] != nil { channelLogger.Warnw("ignoring duplicate config for channel type") + } else if matched, _ := regexp.MatchString("^[a-zA-Z0-9]*$", c.Type); !matched { + channelLogger.Errorf("channel type must only contain a-zA-Z0-9, %q given", c.Type) } else { - c.Logger = r.logs.GetChildLogger("channel").With(zap.Int64("id", c.ID), zap.String("name", c.Name)) channelsById[c.ID] = c channelLogger.Debugw("loaded channel config") @@ -56,6 +58,9 @@ func (r *RuntimeConfig) applyPendingChannels() { for id, pendingChannel := range r.pending.Channels { if pendingChannel == nil { + r.Channels[id].Logger.Info("Channel has been removed, stopping channel plugin") + r.Channels[id].Stop() + delete(r.Channels, id) } else if currentChannel := r.Channels[id]; currentChannel != nil { // Currently, the whole config is reloaded from the database frequently, replacing everything. @@ -67,10 +72,14 @@ func (r *RuntimeConfig) applyPendingChannels() { currentChannel.Name = pendingChannel.Name currentChannel.Config = pendingChannel.Config - currentChannel.Logger.Info("Stopping the channel plugin because the config has been changed") - currentChannel.Stop() + currentChannel.Logger.Info("Reloading the channel plugin config") + currentChannel.ReloadConfig() } } else { + pendingChannel.Start(r.logs.GetChildLogger("channel").With( + zap.Int64("id", pendingChannel.ID), + zap.String("name", pendingChannel.Name))) + r.Channels[id] = pendingChannel } } diff --git a/internal/incident/incident.go b/internal/incident/incident.go index 7926e9272..1db83b8af 100644 --- a/internal/incident/incident.go +++ b/internal/incident/incident.go @@ -441,21 +441,14 @@ func (i *Incident) notifyContacts(ctx context.Context, tx *sqlx.Tx, ev *event.Ev ) } - err = ch.Start() + err = ch.Notify(contact, i, ev, daemonConfig.Config().Icingaweb2URL) if err != nil { - i.logger.Errorw("Could not initialize channel", zap.String("type", chType), zap.Error(err)) - continue - } - - err = ch.SendNotification(contact, i, ev, daemonConfig.Config().Icingaweb2URL) - if err != nil { - i.logger.Errorw("Failed to send via channel", zap.String("type", chType), zap.Error(err)) - continue + i.logger.Errorw("Failed to send notification via channel plugin", zap.String("type", ch.Type), zap.Error(err)) + } else { + i.logger.Infow( + "Successfully sent a notification via channel plugin", zap.String("type", ch.Type), zap.String("contact", contact.FullName), + ) } - - i.logger.Infow( - "Successfully sent a message via channel", zap.String("type", chType), zap.String("contact", contact.String()), - ) } } diff --git a/pkg/plugin/plugin.go b/pkg/plugin/plugin.go index e5c1093eb..e3c8bf582 100644 --- a/pkg/plugin/plugin.go +++ b/pkg/plugin/plugin.go @@ -18,9 +18,36 @@ const ( MethodSendNotification = "SendNotification" ) +type FormElement struct { + Name string `json:"name"` + Type string `json:"type"` + Label map[string]string `json:"label"` + Options map[string]string `json:"options,omitempty"` + Help map[string]string `json:"help,omitempty"` + Default any `json:"default,omitempty"` // bool OR string + Required bool `json:"required,omitempty"` +} + type Info struct { - Name string `json:"display_name"` - ConfigAttributes json.RawMessage `json:"config_attrs"` + Type string `db:"type" json:"type"` + Name string `db:"name" json:"name"` + Version string `db:"version" json:"version"` + AuthorName string `db:"author_name" json:"author_name"` + ConfigAttributes json.RawMessage `db:"config_attrs" json:"config_attrs"` +} + +// TableName implements the contracts.TableNamer interface. +func (c *Info) TableName() string { + return "plugin" +} + +// Upsert implements the contracts.Upserter interface. +func (c *Info) Upsert() interface{} { + return struct { + Version string `db:"version"` + AuthorName string `db:"author_name"` + ConfigAttributes json.RawMessage `db:"config_attrs"` + }{} } type Contact struct { diff --git a/pkg/rpc/rpc.go b/pkg/rpc/rpc.go index 2c1d806e6..2514691c5 100644 --- a/pkg/rpc/rpc.go +++ b/pkg/rpc/rpc.go @@ -47,7 +47,7 @@ type RPC struct { errChannel chan struct{} // never transports a value, only closed through setErr() to signal an occurred error err *Error // only initialized via setErr(), if a rpc (Fatal/non-recoverable) error has occurred - errMu sync.Mutex + errOnce sync.Once requestedShutdown bool } @@ -104,20 +104,24 @@ func (r *RPC) Call(method string, params json.RawMessage) (json.RawMessage, erro return response.Result, nil - case <-r.errChannel: + case <-r.Done(): return nil, r.Err() } } func (r *RPC) Err() error { select { - case <-r.errChannel: + case <-r.Done(): return r.err default: return nil } } +func (r *RPC) Done() <-chan struct{} { + return r.errChannel +} + func (r *RPC) Close() error { r.encoderMu.Lock() defer r.encoderMu.Unlock() @@ -127,10 +131,7 @@ func (r *RPC) Close() error { } func (r *RPC) setErr(err error) { - r.errMu.Lock() - defer r.errMu.Unlock() - - if r.err == nil { + r.errOnce.Do(func() { pendingReqMsg := fmt.Sprintf("cancelling %d pending request(s)", len(r.pendingRequests)) if r.requestedShutdown { r.logger.Infof("Plugin shutdown triggered: %s", pendingReqMsg) @@ -140,7 +141,7 @@ func (r *RPC) setErr(err error) { r.err = &Error{cause: err} close(r.errChannel) - } + }) } // processResponses sends responses to its channel (identified by response.id) diff --git a/schema/pgsql/schema.sql b/schema/pgsql/schema.sql index 62591aa1d..7a596587c 100644 --- a/schema/pgsql/schema.sql +++ b/schema/pgsql/schema.sql @@ -16,10 +16,20 @@ CREATE OR REPLACE FUNCTION anynonarrayliketext(anynonarray, text) $$; CREATE OPERATOR ~~ (LEFTARG=anynonarray, RIGHTARG=text, PROCEDURE=anynonarrayliketext); +CREATE TABLE plugin ( + type text NOT NULL, + name text NOT NULL, + version text NOT NULL, + author_name text NOT NULL, + config_attrs text NOT NULL, + + CONSTRAINT pk_plugin PRIMARY KEY (type) +); + CREATE TABLE channel ( id bigserial, name text NOT NULL, - type text NOT NULL, -- 'email', 'sms', ... + type text NOT NULL REFERENCES plugin(type), -- 'email', 'sms', ... config text, -- JSON with channel-specific attributes -- for now type determines the implementation, in the future, this will need a reference to a concrete -- implementation to allow multiple implementations of a sms channel for example, probably even user-provided ones diff --git a/schema/pgsql/upgrades/plugin_table.sql b/schema/pgsql/upgrades/plugin_table.sql new file mode 100644 index 000000000..233dfbdc2 --- /dev/null +++ b/schema/pgsql/upgrades/plugin_table.sql @@ -0,0 +1,15 @@ +CREATE TABLE plugin ( + type text NOT NULL, + name text NOT NULL, + version text NOT NULL, + author_name text NOT NULL, + config_attrs text NOT NULL, + + CONSTRAINT pk_plugin PRIMARY KEY (type) +); + +INSERT INTO plugin (type, name, version, author_name, config_attrs) + SELECT type, type, '0.0.0', 'ICINGA GmbH', '' FROM channel; + +ALTER TABLE channel + ADD CONSTRAINT fk_channel_plugin FOREIGN KEY (type) REFERENCES plugin(type);