diff --git a/internal/channel/channel.go b/internal/channel/channel.go index be6e9f671..355a6dbfa 100644 --- a/internal/channel/channel.go +++ b/internal/channel/channel.go @@ -1,21 +1,13 @@ package channel import ( - "bufio" "fmt" "github.com/icinga/icinga-notifications/internal/contracts" - "github.com/icinga/icinga-notifications/internal/daemonConfig" "github.com/icinga/icinga-notifications/internal/event" "github.com/icinga/icinga-notifications/internal/recipient" "github.com/icinga/icinga-notifications/pkg/plugin" - "github.com/icinga/icinga-notifications/pkg/rpc" "go.uber.org/zap" - "io" - "os/exec" - "path/filepath" "strings" - "sync" - "time" ) type Channel struct { @@ -31,7 +23,7 @@ type Channel struct { notificationCh chan *plugin.NotificationRequest } -func (c *Channel) InitPlugin() { +func (c *Channel) RunPlugin() { var currentlyRunningPlugin *Plugin c.newConfigCh = make(chan string, 1) @@ -43,13 +35,13 @@ func (c *Channel) InitPlugin() { currentlyRunningPlugin = &Plugin{Logger: c.Logger} if err := currentlyRunningPlugin.Start(c.Type); err != nil { - c.Logger.Errorw("Could not initialize channel plugin", zap.String("type", c.Type), zap.Error(err)) + c.Logger.Errorw("Failed to initialize channel plugin", zap.String("type", c.Type), zap.Error(err)) return } if err := currentlyRunningPlugin.SetConfig(c.Config); err != nil { - c.Logger.Errorw("failed to set channel plugin config: %w", err) - close(c.stopPluginCh) + c.Logger.Errorw("Failed to set channel plugin config", err) + c.StopPlugin() } } @@ -85,6 +77,14 @@ func (c *Channel) InitPlugin() { } } +func (c *Channel) StopPlugin() { + if c.IsPluginStopped() { + return + } + + close(c.stopPluginCh) +} + func (c *Channel) ReloadConfig() { c.newConfigCh <- c.Config } @@ -136,82 +136,3 @@ func (c *Channel) Notify(contact *recipient.Contact, i contracts.Incident, ev *e c.notificationCh <- req } - -type Plugin struct { - cmd *exec.Cmd - rpc *rpc.RPC - mu sync.Mutex - Logger *zap.SugaredLogger -} - -func (p *Plugin) Start(pluginName string) error { - p.mu.Lock() - defer p.mu.Unlock() - - cmd := exec.Command(filepath.Join(daemonConfig.Config().ChannelPluginDir, pluginName)) - - writer, err := cmd.StdinPipe() - if err != nil { - return fmt.Errorf("failed to create stdin pipe: %w", err) - } - - reader, err := cmd.StdoutPipe() - if err != nil { - return fmt.Errorf("failed to create stdout pipe: %w", err) - } - - errPipe, err := cmd.StderrPipe() - if err != nil { - return fmt.Errorf("failed to create stderr pipe: %w", err) - } - - go forwardLogs(errPipe, p.Logger) - - if err = cmd.Start(); err != nil { - return fmt.Errorf("failed to start cmd: %w", err) - } - p.Logger.Debug("Cmd started successfully") - - p.cmd = cmd - p.rpc = rpc.NewRPC(writer, reader, p.Logger) - - return nil -} - -func (p *Plugin) Stop() { - p.Logger.Debug("plugin.Stop() triggered") - - go terminate(p) -} - -func terminate(p *Plugin) { - p.mu.Lock() - defer p.mu.Unlock() - - _ = p.rpc.Close() - p.Logger.Debug("termination the channel plugin") - timer := time.AfterFunc(5*time.Second, func() { - p.Logger.Debug("killing the channel plugin") - _ = p.cmd.Process.Kill() - }) - - p.cmd.Wait() - timer.Stop() - p.Logger.Debug("Stopped channel plugin successfully") -} - -func forwardLogs(errPipe io.Reader, logger *zap.SugaredLogger) { - reader := bufio.NewReader(errPipe) - for { - line, err := reader.ReadString('\n') - if err != nil { - if err != io.EOF { - logger.Errorw("Failed to read stderr line", zap.Error(err)) - } - - return - } - - logger.Info(line) - } -} diff --git a/internal/channel/plugin.go b/internal/channel/plugin.go index 502083b62..df2158153 100644 --- a/internal/channel/plugin.go +++ b/internal/channel/plugin.go @@ -1,13 +1,69 @@ package channel import ( + "bufio" "encoding/json" "fmt" + "github.com/icinga/icinga-notifications/internal/daemonConfig" "github.com/icinga/icinga-notifications/pkg/plugin" + "github.com/icinga/icinga-notifications/pkg/rpc" + "go.uber.org/zap" + "io" + "os/exec" + "path/filepath" + "sync" + "time" ) +type Plugin struct { + cmd *exec.Cmd + rpc *rpc.RPC + mu sync.Mutex + Logger *zap.SugaredLogger +} + +func (p *Plugin) Start(pluginName string) error { + p.mu.Lock() + defer p.mu.Unlock() + + cmd := exec.Command(filepath.Join(daemonConfig.Config().ChannelPluginDir, pluginName)) + + writer, err := cmd.StdinPipe() + if err != nil { + return fmt.Errorf("failed to create stdin pipe: %w", err) + } + + reader, err := cmd.StdoutPipe() + if err != nil { + return fmt.Errorf("failed to create stdout pipe: %w", err) + } + + errPipe, err := cmd.StderrPipe() + if err != nil { + return fmt.Errorf("failed to create stderr pipe: %w", err) + } + + go forwardLogs(errPipe, p.Logger) + + if err = cmd.Start(); err != nil { + return fmt.Errorf("failed to start cmd: %w", err) + } + p.Logger.Info("Channel plugin started successfully") + + p.cmd = cmd + p.rpc = rpc.NewRPC(writer, reader, p.Logger) + + return nil +} + +func (p *Plugin) Stop() { + p.Logger.Debug("plugin.Stop() triggered") + + go terminate(p) +} + func (p *Plugin) GetInfo() (*plugin.Info, error) { - result, err := p.rpcCall(plugin.MethodGetInfo, nil) + result, err := p.rpc.Call(plugin.MethodGetInfo, nil) if err != nil { return nil, err } @@ -22,7 +78,7 @@ func (p *Plugin) GetInfo() (*plugin.Info, error) { } func (p *Plugin) SetConfig(config string) error { - _, err := p.rpcCall(plugin.MethodSetConfig, json.RawMessage(config)) + _, err := p.rpc.Call(plugin.MethodSetConfig, json.RawMessage(config)) return err } @@ -33,18 +89,39 @@ func (p *Plugin) SendNotification(req *plugin.NotificationRequest) error { return fmt.Errorf("failed to prepare request params: %w", err) } - _, err = p.rpcCall(plugin.MethodSendNotification, params) + _, err = p.rpc.Call(plugin.MethodSendNotification, params) return err } -func (p *Plugin) rpcCall(method string, params json.RawMessage) (json.RawMessage, error) { - result, err := p.rpc.Call(method, params) +func terminate(p *Plugin) { + p.mu.Lock() + defer p.mu.Unlock() + + _ = p.rpc.Close() + p.Logger.Debug("Stopping the channel plugin") + timer := time.AfterFunc(5*time.Second, func() { + p.Logger.Debug("killing the channel plugin") + _ = p.cmd.Process.Kill() + }) + + <-p.rpc.Done() + timer.Stop() + p.Logger.Debug("Stopped channel plugin successfully") +} + +func forwardLogs(errPipe io.Reader, logger *zap.SugaredLogger) { + reader := bufio.NewReader(errPipe) + for { + line, err := reader.ReadString('\n') + if err != nil { + if err != io.EOF { + logger.Errorw("Failed to read stderr line", zap.Error(err)) + } - /*var rpcErr *rpc.Error - if errors.As(err, &rpcErr) { // not required, rpc.Done() is doing the job - p.Stop() - }*/ + return + } - return result, err + logger.Info(line) + } } diff --git a/internal/config/channel.go b/internal/config/channel.go index 8b1df089c..3570f8764 100644 --- a/internal/config/channel.go +++ b/internal/config/channel.go @@ -37,8 +37,10 @@ func (r *RuntimeConfig) fetchChannels(ctx context.Context, tx *sqlx.Tx) error { if r.Channels != nil { // mark no longer existing channels for deletion - for typ := range r.Channels { + for typ, ch := range r.Channels { if _, ok := channelsByType[typ]; !ok { + ch.Logger.Info("Channel has been removed, stopping plugin") + ch.StopPlugin() channelsByType[typ] = nil } } @@ -69,13 +71,13 @@ func (r *RuntimeConfig) applyPendingChannels() { currentChannel.Logger.Info("Reloading the channel plugin config") if currentChannel.IsPluginStopped() { - go currentChannel.InitPlugin() + go currentChannel.RunPlugin() } else { currentChannel.ReloadConfig() } } } else { - go pendingChannel.InitPlugin() + go pendingChannel.RunPlugin() r.Channels[typ] = pendingChannel } }