Skip to content

Commit

Permalink
WIP3
Browse files Browse the repository at this point in the history
  • Loading branch information
sukhwinder33445 committed Oct 16, 2023
1 parent 4672c77 commit 46cb339
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 104 deletions.
103 changes: 12 additions & 91 deletions internal/channel/channel.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,13 @@
package channel

import (
"bufio"
"fmt"
"github.com/icinga/icinga-notifications/internal/contracts"
"github.com/icinga/icinga-notifications/internal/daemonConfig"
"github.com/icinga/icinga-notifications/internal/event"
"github.com/icinga/icinga-notifications/internal/recipient"
"github.com/icinga/icinga-notifications/pkg/plugin"
"github.com/icinga/icinga-notifications/pkg/rpc"
"go.uber.org/zap"
"io"
"os/exec"
"path/filepath"
"strings"
"sync"
"time"
)

type Channel struct {
Expand All @@ -31,7 +23,7 @@ type Channel struct {
notificationCh chan *plugin.NotificationRequest
}

func (c *Channel) InitPlugin() {
func (c *Channel) RunPlugin() {
var currentlyRunningPlugin *Plugin

c.newConfigCh = make(chan string, 1)
Expand All @@ -43,13 +35,13 @@ func (c *Channel) InitPlugin() {
currentlyRunningPlugin = &Plugin{Logger: c.Logger}

if err := currentlyRunningPlugin.Start(c.Type); err != nil {
c.Logger.Errorw("Could not initialize channel plugin", zap.String("type", c.Type), zap.Error(err))
c.Logger.Errorw("Failed to initialize channel plugin", zap.String("type", c.Type), zap.Error(err))
return
}

if err := currentlyRunningPlugin.SetConfig(c.Config); err != nil {
c.Logger.Errorw("failed to set channel plugin config: %w", err)
close(c.stopPluginCh)
c.Logger.Errorw("Failed to set channel plugin config", err)
c.StopPlugin()
}
}

Expand Down Expand Up @@ -85,6 +77,14 @@ func (c *Channel) InitPlugin() {
}
}

func (c *Channel) StopPlugin() {
if c.IsPluginStopped() {
return
}

close(c.stopPluginCh)
}

func (c *Channel) ReloadConfig() {
c.newConfigCh <- c.Config
}
Expand Down Expand Up @@ -136,82 +136,3 @@ func (c *Channel) Notify(contact *recipient.Contact, i contracts.Incident, ev *e

c.notificationCh <- req
}

type Plugin struct {
cmd *exec.Cmd
rpc *rpc.RPC
mu sync.Mutex
Logger *zap.SugaredLogger
}

func (p *Plugin) Start(pluginName string) error {
p.mu.Lock()
defer p.mu.Unlock()

cmd := exec.Command(filepath.Join(daemonConfig.Config().ChannelPluginDir, pluginName))

writer, err := cmd.StdinPipe()
if err != nil {
return fmt.Errorf("failed to create stdin pipe: %w", err)
}

reader, err := cmd.StdoutPipe()
if err != nil {
return fmt.Errorf("failed to create stdout pipe: %w", err)
}

errPipe, err := cmd.StderrPipe()
if err != nil {
return fmt.Errorf("failed to create stderr pipe: %w", err)
}

go forwardLogs(errPipe, p.Logger)

if err = cmd.Start(); err != nil {
return fmt.Errorf("failed to start cmd: %w", err)
}
p.Logger.Debug("Cmd started successfully")

p.cmd = cmd
p.rpc = rpc.NewRPC(writer, reader, p.Logger)

return nil
}

func (p *Plugin) Stop() {
p.Logger.Debug("plugin.Stop() triggered")

go terminate(p)
}

func terminate(p *Plugin) {
p.mu.Lock()
defer p.mu.Unlock()

_ = p.rpc.Close()
p.Logger.Debug("termination the channel plugin")
timer := time.AfterFunc(5*time.Second, func() {
p.Logger.Debug("killing the channel plugin")
_ = p.cmd.Process.Kill()
})

p.cmd.Wait()
timer.Stop()
p.Logger.Debug("Stopped channel plugin successfully")
}

func forwardLogs(errPipe io.Reader, logger *zap.SugaredLogger) {
reader := bufio.NewReader(errPipe)
for {
line, err := reader.ReadString('\n')
if err != nil {
if err != io.EOF {
logger.Errorw("Failed to read stderr line", zap.Error(err))
}

return
}

logger.Info(line)
}
}
97 changes: 87 additions & 10 deletions internal/channel/plugin.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,69 @@
package channel

import (
"bufio"
"encoding/json"
"fmt"
"github.com/icinga/icinga-notifications/internal/daemonConfig"
"github.com/icinga/icinga-notifications/pkg/plugin"
"github.com/icinga/icinga-notifications/pkg/rpc"
"go.uber.org/zap"
"io"
"os/exec"
"path/filepath"
"sync"
"time"
)

type Plugin struct {
cmd *exec.Cmd
rpc *rpc.RPC
mu sync.Mutex
Logger *zap.SugaredLogger
}

func (p *Plugin) Start(pluginName string) error {
p.mu.Lock()
defer p.mu.Unlock()

cmd := exec.Command(filepath.Join(daemonConfig.Config().ChannelPluginDir, pluginName))

writer, err := cmd.StdinPipe()
if err != nil {
return fmt.Errorf("failed to create stdin pipe: %w", err)
}

reader, err := cmd.StdoutPipe()
if err != nil {
return fmt.Errorf("failed to create stdout pipe: %w", err)
}

errPipe, err := cmd.StderrPipe()
if err != nil {
return fmt.Errorf("failed to create stderr pipe: %w", err)
}

go forwardLogs(errPipe, p.Logger)

if err = cmd.Start(); err != nil {
return fmt.Errorf("failed to start cmd: %w", err)
}
p.Logger.Info("Channel plugin started successfully")

p.cmd = cmd
p.rpc = rpc.NewRPC(writer, reader, p.Logger)

return nil
}

func (p *Plugin) Stop() {
p.Logger.Debug("plugin.Stop() triggered")

go terminate(p)
}

func (p *Plugin) GetInfo() (*plugin.Info, error) {
result, err := p.rpcCall(plugin.MethodGetInfo, nil)
result, err := p.rpc.Call(plugin.MethodGetInfo, nil)
if err != nil {
return nil, err
}
Expand All @@ -22,7 +78,7 @@ func (p *Plugin) GetInfo() (*plugin.Info, error) {
}

func (p *Plugin) SetConfig(config string) error {
_, err := p.rpcCall(plugin.MethodSetConfig, json.RawMessage(config))
_, err := p.rpc.Call(plugin.MethodSetConfig, json.RawMessage(config))

return err
}
Expand All @@ -33,18 +89,39 @@ func (p *Plugin) SendNotification(req *plugin.NotificationRequest) error {
return fmt.Errorf("failed to prepare request params: %w", err)
}

_, err = p.rpcCall(plugin.MethodSendNotification, params)
_, err = p.rpc.Call(plugin.MethodSendNotification, params)

return err
}

func (p *Plugin) rpcCall(method string, params json.RawMessage) (json.RawMessage, error) {
result, err := p.rpc.Call(method, params)
func terminate(p *Plugin) {
p.mu.Lock()
defer p.mu.Unlock()

_ = p.rpc.Close()
p.Logger.Debug("Stopping the channel plugin")
timer := time.AfterFunc(5*time.Second, func() {
p.Logger.Debug("killing the channel plugin")
_ = p.cmd.Process.Kill()
})

<-p.rpc.Done()
timer.Stop()
p.Logger.Debug("Stopped channel plugin successfully")
}

func forwardLogs(errPipe io.Reader, logger *zap.SugaredLogger) {
reader := bufio.NewReader(errPipe)
for {
line, err := reader.ReadString('\n')
if err != nil {
if err != io.EOF {
logger.Errorw("Failed to read stderr line", zap.Error(err))
}

/*var rpcErr *rpc.Error
if errors.As(err, &rpcErr) { // not required, rpc.Done() is doing the job
p.Stop()
}*/
return
}

return result, err
logger.Info(line)
}
}
8 changes: 5 additions & 3 deletions internal/config/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@ func (r *RuntimeConfig) fetchChannels(ctx context.Context, tx *sqlx.Tx) error {

if r.Channels != nil {
// mark no longer existing channels for deletion
for typ := range r.Channels {
for typ, ch := range r.Channels {
if _, ok := channelsByType[typ]; !ok {
ch.Logger.Info("Channel has been removed, stopping plugin")
ch.StopPlugin()
channelsByType[typ] = nil
}
}
Expand Down Expand Up @@ -69,13 +71,13 @@ func (r *RuntimeConfig) applyPendingChannels() {

currentChannel.Logger.Info("Reloading the channel plugin config")
if currentChannel.IsPluginStopped() {
go currentChannel.InitPlugin()
go currentChannel.RunPlugin()
} else {
currentChannel.ReloadConfig()
}
}
} else {
go pendingChannel.InitPlugin()
go pendingChannel.RunPlugin()
r.Channels[typ] = pendingChannel
}
}
Expand Down

0 comments on commit 46cb339

Please sign in to comment.