Skip to content

Commit

Permalink
WIP2
Browse files Browse the repository at this point in the history
  • Loading branch information
sukhwinder33445 committed Oct 16, 2023
1 parent 6ea1198 commit 4672c77
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 101 deletions.
6 changes: 5 additions & 1 deletion cmd/icinga-notifications-daemon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func syncPlugins(channelPluginDir string, logs *logging.Logging, db *icingadb.DB

for _, file := range files {
p := channel.Plugin{
Logger: logger.With(zap.String("plugin", file.Name())),
Logger: logger.With(zap.String("name", file.Name())),
}
err := p.Start(file.Name())
if err != nil {
Expand All @@ -115,6 +115,7 @@ func syncPlugins(channelPluginDir string, logs *logging.Logging, db *icingadb.DB
info, err := p.GetInfo()
if err != nil {
p.Logger.Error(err)
p.Stop()
continue
}
p.Stop()
Expand All @@ -132,15 +133,18 @@ func syncPlugins(channelPluginDir string, logs *logging.Logging, db *icingadb.DB
defer func() { _ = tx.Rollback() }()
if err != nil {
logger.Error("failed to start channel plugin database transaction: ", err)
return
}

stmt, _ := db.BuildUpsertStmt(&plugin.Info{})
_, err = tx.NamedExecContext(ctx, stmt, PluginInfos)
if err != nil {
logger.Error("failed to upsert channel plugin: ", err)
return
}
if err = tx.Commit(); err != nil {
logger.Error("can't commit channel plugin transaction: ", err)
return
}

logger.Infof("Successfully upsert %d plugins", len(PluginInfos))
Expand Down
143 changes: 107 additions & 36 deletions internal/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,18 @@ package channel

import (
"bufio"
"errors"
"fmt"
"github.com/icinga/icinga-notifications/internal/contracts"
"github.com/icinga/icinga-notifications/internal/daemonConfig"
"github.com/icinga/icinga-notifications/internal/event"
"github.com/icinga/icinga-notifications/internal/recipient"
"github.com/icinga/icinga-notifications/pkg/plugin"
"github.com/icinga/icinga-notifications/pkg/rpc"
"go.uber.org/zap"
"io"
"os/exec"
"path/filepath"
"strings"
"sync"
"time"
)
Expand All @@ -22,32 +26,115 @@ type Channel struct {

Logger *zap.SugaredLogger

Plugin *Plugin
newConfigCh chan string
stopPluginCh chan struct{}
notificationCh chan *plugin.NotificationRequest
}

func (c *Channel) InitPlugin() {
var currentlyRunningPlugin *Plugin

c.newConfigCh = make(chan string, 1)
c.stopPluginCh = make(chan struct{})
c.notificationCh = make(chan *plugin.NotificationRequest, 1)

for {
plugin := &Plugin{Logger: c.Logger}
if currentlyRunningPlugin == nil {
currentlyRunningPlugin = &Plugin{Logger: c.Logger}

if err := plugin.Start(c.Type); err != nil {
plugin.Logger.Errorw("Could not initialize channel plugin", zap.String("type", c.Type), zap.Error(err))
c.Plugin = nil
return
if err := currentlyRunningPlugin.Start(c.Type); err != nil {
c.Logger.Errorw("Could not initialize channel plugin", zap.String("type", c.Type), zap.Error(err))
return
}

if err := currentlyRunningPlugin.SetConfig(c.Config); err != nil {
c.Logger.Errorw("failed to set channel plugin config: %w", err)
close(c.stopPluginCh)
}
}

if err := plugin.SetConfig(c.Config); err != nil {
plugin.Stop()
plugin.Logger.Errorw("failed to set channel plugin config: %w", err)
c.Plugin = nil
select {
case <-currentlyRunningPlugin.rpc.Done():
if currentlyRunningPlugin.rpc.RequestedShutdown() {
go terminate(currentlyRunningPlugin)
}

currentlyRunningPlugin = nil

continue
case <-c.newConfigCh:
go terminate(currentlyRunningPlugin)
currentlyRunningPlugin = nil

continue
case <-c.stopPluginCh:
go terminate(currentlyRunningPlugin)
currentlyRunningPlugin = nil

return
case req := <-c.notificationCh:
err := currentlyRunningPlugin.SendNotification(req)
if err != nil {
c.Logger.Errorw("Failed to send notification via channel plugin", zap.String("type", c.Type), zap.Error(err))
} else {
c.Logger.Infow(
"Successfully sent a notification via channel plugin", zap.String("type", c.Type), zap.String("contact", req.Contact.FullName),
)
}
}
}
}

func (c *Channel) ReloadConfig() {
c.newConfigCh <- c.Config
}

func (c *Channel) IsPluginStopped() bool {
select {
case <-c.stopPluginCh:
return true
default:
return false
}
}

c.Plugin = plugin
func (c *Channel) Notify(contact *recipient.Contact, i contracts.Incident, ev *event.Event, icingaweb2Url string) {
if c.IsPluginStopped() {
c.Logger.Error("Cannot send notification, plugin is not running")
return
}

contactStruct := &plugin.Contact{FullName: contact.FullName}
for _, addr := range contact.Addresses {
contactStruct.Addresses = append(contactStruct.Addresses, &plugin.Address{Type: addr.Type, Address: addr.Address})
}

if !strings.HasSuffix(icingaweb2Url, "/") {
icingaweb2Url += "/"
}

<-c.Plugin.rpc.Done()
c.Plugin.Logger.Debug("plugin exited, restarting")
req := &plugin.NotificationRequest{
Contact: contactStruct,
Object: &plugin.Object{
Name: i.ObjectDisplayName(),
Url: ev.URL,
Tags: ev.Tags,
ExtraTags: ev.ExtraTags,
},
Incident: &plugin.Incident{
Id: i.ID(),
Url: fmt.Sprintf("%snotifications/incident?id=%d", icingaweb2Url, i.ID()),
},
Event: &plugin.Event{
Time: ev.Time,
Type: ev.Type,
Severity: ev.Severity.String(),
Username: ev.Username,
Message: ev.Message,
},
}

c.notificationCh <- req
}

type Plugin struct {
Expand Down Expand Up @@ -88,44 +175,28 @@ func (p *Plugin) Start(pluginName string) error {
p.cmd = cmd
p.rpc = rpc.NewRPC(writer, reader, p.Logger)

go p.wait()

return nil
}

func (p *Plugin) wait() {
err := p.cmd.Wait()

if p.rpc != nil {
p.rpc.SetErr(errors.New("channel plugin terminated"))
}
func (p *Plugin) Stop() {
p.Logger.Debug("plugin.Stop() triggered")

p.Logger.Debug("plugin.wait() triggered", err)
go terminate(p)
}

func (p *Plugin) Stop() {
func terminate(p *Plugin) {
p.mu.Lock()
defer p.mu.Unlock()

p.Logger.Debug("plugin.Stop() triggered")
if p.cmd == nil {
p.Logger.Debug("channel plugin has already been stopped")
return
}

_ = p.rpc.Close()

p.Logger.Debug("termination the channel plugin")
timer := time.AfterFunc(5*time.Second, func() {
p.Logger.Debug("killing the channel plugin")
_ = p.cmd.Process.Kill()
})

<-p.rpc.Done()
p.cmd.Wait()
timer.Stop()

p.cmd = nil
p.rpc = nil

p.Logger.Debug("Stopped channel plugin successfully")
}

Expand Down
44 changes: 4 additions & 40 deletions internal/channel/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,8 @@ package channel

import (
"encoding/json"
"errors"
"fmt"
"github.com/icinga/icinga-notifications/internal/contracts"
"github.com/icinga/icinga-notifications/internal/event"
"github.com/icinga/icinga-notifications/internal/recipient"
"github.com/icinga/icinga-notifications/pkg/plugin"
"github.com/icinga/icinga-notifications/pkg/rpc"
"strings"
)

func (p *Plugin) GetInfo() (*plugin.Info, error) {
Expand All @@ -33,37 +27,7 @@ func (p *Plugin) SetConfig(config string) error {
return err
}

func (p *Plugin) SendNotification(contact *recipient.Contact, i contracts.Incident, ev *event.Event, icingaweb2Url string) error {
contactStruct := &plugin.Contact{FullName: contact.FullName}
for _, addr := range contact.Addresses {
contactStruct.Addresses = append(contactStruct.Addresses, &plugin.Address{Type: addr.Type, Address: addr.Address})
}

if !strings.HasSuffix(icingaweb2Url, "/") {
icingaweb2Url += "/"
}

req := plugin.NotificationRequest{
Contact: contactStruct,
Object: &plugin.Object{
Name: i.ObjectDisplayName(),
Url: ev.URL,
Tags: ev.Tags,
ExtraTags: ev.ExtraTags,
},
Incident: &plugin.Incident{
Id: i.ID(),
Url: fmt.Sprintf("%snotifications/incident?id=%d", icingaweb2Url, i.ID()),
},
Event: &plugin.Event{
Time: ev.Time,
Type: ev.Type,
Severity: ev.Severity.String(),
Username: ev.Username,
Message: ev.Message,
},
}

func (p *Plugin) SendNotification(req *plugin.NotificationRequest) error {
params, err := json.Marshal(req)
if err != nil {
return fmt.Errorf("failed to prepare request params: %w", err)
Expand All @@ -77,10 +41,10 @@ func (p *Plugin) SendNotification(contact *recipient.Contact, i contracts.Incide
func (p *Plugin) rpcCall(method string, params json.RawMessage) (json.RawMessage, error) {
result, err := p.rpc.Call(method, params)

var rpcErr *rpc.Error
if errors.As(err, &rpcErr) {
/*var rpcErr *rpc.Error
if errors.As(err, &rpcErr) { // not required, rpc.Done() is doing the job
p.Stop()
}
}*/

return result, err
}
9 changes: 4 additions & 5 deletions internal/config/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,11 @@ func (r *RuntimeConfig) applyPendingChannels() {
currentChannel.Name = pendingChannel.Name
currentChannel.Config = pendingChannel.Config

if currentChannel.Plugin != nil {
currentChannel.Logger.Info("Stopping the channel plugin because the config has been changed")
currentChannel.Plugin.Stop()
} else {
// If the plugin could not start and is not initialised to currentChannel
currentChannel.Logger.Info("Reloading the channel plugin config")
if currentChannel.IsPluginStopped() {
go currentChannel.InitPlugin()
} else {
currentChannel.ReloadConfig()
}
}
} else {
Expand Down
15 changes: 1 addition & 14 deletions internal/incident/incident.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,20 +445,7 @@ func (i *Incident) notifyContacts(ctx context.Context, tx *sqlx.Tx, ev *event.Ev
continue
}

if ch.Plugin == nil {
i.logger.Errorw("Plugin could not be started", zap.String("type", chType), zap.Error(err))
continue
}

err = ch.Plugin.SendNotification(contact, i, ev, daemonConfig.Config().Icingaweb2URL)
if err != nil {
i.logger.Errorw("Failed to send notification via channel plugin", zap.String("type", chType), zap.Error(err))
continue
}

i.logger.Infow(
"Successfully sent a notification via channel plugin", zap.String("type", chType), zap.String("contact", contact.String()),
)
ch.Notify(contact, i, ev, daemonConfig.Config().Icingaweb2URL)
}
}

Expand Down
14 changes: 9 additions & 5 deletions pkg/rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ type RPC struct {
lastRequestId uint64
requestsMu sync.Mutex

errChannel chan struct{} // never transports a value, only closed through SetErr() to signal an occurred error
err *Error // only initialized via SetErr(), if a rpc (Fatal/non-recoverable) error has occurred
errChannel chan struct{} // never transports a value, only closed through setErr() to signal an occurred error
err *Error // only initialized via setErr(), if a rpc (Fatal/non-recoverable) error has occurred
errMu sync.Mutex
requestedShutdown bool
}
Expand Down Expand Up @@ -91,7 +91,7 @@ func (r *RPC) Call(method string, params json.RawMessage) (json.RawMessage, erro
r.encoderMu.Unlock()
if err != nil {
err = fmt.Errorf("failed to write request: %w", err)
r.SetErr(err)
r.setErr(err)

return nil, r.Err()
}
Expand Down Expand Up @@ -122,6 +122,10 @@ func (r *RPC) Done() <-chan struct{} {
return r.errChannel
}

func (r *RPC) RequestedShutdown() bool {
return r.requestedShutdown
}

func (r *RPC) Close() error {
r.encoderMu.Lock()
defer r.encoderMu.Unlock()
Expand All @@ -130,7 +134,7 @@ func (r *RPC) Close() error {
return r.writer.Close()
}

func (r *RPC) SetErr(err error) {
func (r *RPC) setErr(err error) {
r.errMu.Lock()
defer r.errMu.Unlock()

Expand All @@ -155,7 +159,7 @@ func (r *RPC) processResponses() {
err := r.decoder.Decode(&response)

if err != nil {
r.SetErr(fmt.Errorf("failed to read json response: %w", err))
r.setErr(fmt.Errorf("failed to read json response: %w", err))

return
}
Expand Down

0 comments on commit 4672c77

Please sign in to comment.