Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
sukhwinder33445 committed Oct 10, 2023
1 parent 4fc2897 commit 7bac817
Show file tree
Hide file tree
Showing 9 changed files with 242 additions and 54 deletions.
70 changes: 69 additions & 1 deletion cmd/channel/email/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,75 @@ func (ch *Email) SetConfig(jsonStr json.RawMessage) error {
}

func (ch *Email) GetInfo() *plugin.Info {
return &plugin.Info{Name: "Email"}
var elements []*plugin.FormElement
elements = append(elements,
&plugin.FormElement{
Name: "host",
Type: "text",
Options: map[string]string{
"label": "SMTP Host",
"placeholder": "localhost",
},
},
&plugin.FormElement{
Name: "port",
Type: "select",
Options: map[string]string{
"label": "SMTP Port",
"options": `{"25" : "25"," 465" :"465", "587" : "587", "2525" : "2525"}`,
},
},
&plugin.FormElement{
Name: "from",
Type: "text",
Options: map[string]string{
"auto-complete": "off",
"label": "From",
},
},
&plugin.FormElement{
Name: "password",
Type: "password",
Options: map[string]string{
"auto-complete": "off",
"label": "Password",
},
},
&plugin.FormElement{
Name: "tls",
Type: "checkbox",
Options: map[string]string{
"label": "TLS / SSL",
"class": "autosubmit",
"checkedValue": "1",
"uncheckedValue": "0",
"value": "1",
},
},
&plugin.FormElement{
Name: "tls_certcheck",
Type: "checkbox",
Options: map[string]string{
"label": "Certificate Check",
"class": "autosubmit",
"checkedValue": "1",
"uncheckedValue": "0",
"value": "0",
},
},
)

marshal, err := json.Marshal(elements)
if err != nil {
return nil
}

return &plugin.Info{
Name: "Email",
Version: "0.0",
AuthorName: "",
ConfigAttributes: marshal,
}
}

func (ch *Email) GetServer() string {
Expand Down
42 changes: 41 additions & 1 deletion cmd/channel/rocketchat/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,5 +80,45 @@ func (ch *RocketChat) SetConfig(jsonStr json.RawMessage) error {
}

func (ch *RocketChat) GetInfo() *plugin.Info {
return &plugin.Info{Name: "Rocket.Chat"}
var elements []*plugin.FormElement
elements = append(elements,
&plugin.FormElement{
Name: "url",
Type: "text",
Options: map[string]string{
"required": "true",
"label": "Rocket.Chat URL",
},
},
&plugin.FormElement{
Name: "user_id",
Type: "text",
Options: map[string]string{
"required": "true",
"auto-complete": "off",
"label": "User ID",
},
},
&plugin.FormElement{
Name: "token",
Type: "password",
Options: map[string]string{
"required": "true",
"auto-complete": "off",
"label": "Personal Access Token",
},
},
)

marshal, err := json.Marshal(elements)
if err != nil {
return nil
}

return &plugin.Info{
Name: "Rocket.Chat",
Version: "0.0",
AuthorName: "",
ConfigAttributes: marshal,
}
}
49 changes: 49 additions & 0 deletions cmd/icinga-notifications-daemon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,15 @@ import (
"flag"
"fmt"
"github.com/icinga/icinga-notifications/internal"
"github.com/icinga/icinga-notifications/internal/channel"
"github.com/icinga/icinga-notifications/internal/config"
"github.com/icinga/icinga-notifications/internal/listener"
"github.com/icinga/icinga-notifications/pkg/plugin"
"github.com/icinga/icingadb/pkg/icingadb"
"github.com/icinga/icingadb/pkg/logging"
"github.com/icinga/icingadb/pkg/utils"
"go.uber.org/zap"
"log"
"os"
"runtime"
"time"
Expand Down Expand Up @@ -73,6 +77,8 @@ func main() {
}
}

syncPlugins(conf.ChannelPluginDir, logs, db)

runtimeConfig := config.NewRuntimeConfig(db, logs)
if err := runtimeConfig.UpdateFromDatabase(context.TODO()); err != nil {
logger.Fatalw("failed to load config from database", zap.Error(err))
Expand All @@ -84,3 +90,46 @@ func main() {
panic(err)
}
}

func syncPlugins(channelPluginDir string, logs *logging.Logging, db *icingadb.DB) {
files, err := os.ReadDir(channelPluginDir)
if err != nil {
log.Fatal(err)
}

var PluginInfos []*plugin.Info

for _, file := range files {
p := channel.Plugin{
Logger: logs.GetChildLogger("plugin initialise").With(zap.String("name", file.Name())),
}
err := p.StartPlugin(file.Name())

Check failure on line 106 in cmd/icinga-notifications-daemon/main.go

View workflow job for this annotation

GitHub Actions / build

p.StartPlugin undefined (type channel.Plugin has no field or method StartPlugin)
if err != nil {
return
}

info, err := p.GetInfo()
if err != nil {
return
}
p.ResetPlugin()

Check failure on line 115 in cmd/icinga-notifications-daemon/main.go

View workflow job for this annotation

GitHub Actions / build

p.ResetPlugin undefined (type channel.Plugin has no field or method ResetPlugin)

PluginInfos = append(PluginInfos, info)
}
ctx := context.Background()
tx, err := db.BeginTxx(ctx, nil)
defer func() { _ = tx.Rollback() }()
if err != nil {
log.Fatal("failed to start channel plugin database transaction: %w", err)
}

stmt, _ := db.BuildUpsertStmt(&plugin.Info{})
println(stmt)
_, err = tx.NamedExecContext(ctx, stmt, PluginInfos)
if err != nil {
log.Fatal("failed to upsert channel plugin: %s", err)
}
if err = tx.Commit(); err != nil {
log.Fatal("can't commit channel plugin transaction: %w", err)
}
}
67 changes: 30 additions & 37 deletions internal/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"io"
"os/exec"
"path/filepath"
"regexp"
"sync"
"time"
)
Expand All @@ -20,26 +19,30 @@ type Channel struct {
Config string `db:"config" json:"-"` // excluded from JSON config dump as this may contain sensitive information

Logger *zap.SugaredLogger
}

type Plugin struct {
cmd *exec.Cmd
rpc *rpc.RPC
mu sync.Mutex
Logger *zap.SugaredLogger
}

func (c *Channel) Start(pluginDir string) error {
c.mu.Lock()
defer c.mu.Unlock()
func (p *Plugin) Start(pluginPath string) error {
p.mu.Lock()
defer p.mu.Unlock()

if c.cmd != nil && c.rpc.Err() == nil {
if p.cmd != nil && p.rpc.Err() == nil {
return nil
}

if matched, _ := regexp.MatchString("^[a-zA-Z0-9]*$", c.Type); !matched {
return fmt.Errorf("channel type must only contain a-zA-Z0-9, %q given", c.Type)
}
/*if matched, _ := regexp.MatchString("^[a-zA-Z0-9]*$", p.Type); !matched {
return fmt.Errorf("channel type must only contain a-zA-Z0-9, %q given", p.Type)
}*/

path := filepath.Join(pluginDir, c.Type)
pluginPath = filepath.Join("/usr/libexec/icinga-notifications/channel", pluginPath)

cmd := exec.Command(path)
cmd := exec.Command(pluginPath)

writer, err := cmd.StdinPipe()
if err != nil {
Expand All @@ -56,44 +59,34 @@ func (c *Channel) Start(pluginDir string) error {
return fmt.Errorf("failed to create stderr pipe: %w", err)
}

go forwardLogs(errPipe, c.Logger)
go forwardLogs(errPipe, p.Logger)

if err = cmd.Start(); err != nil {
return fmt.Errorf("failed to start cmd: %w", err)
}
c.Logger.Debug("Cmd started successfully")
p.Logger.Debug("Cmd started successfully")

c.cmd = cmd
c.rpc = rpc.NewRPC(writer, reader, c.Logger)

if err = c.SetConfig(c.Config); err != nil {
go c.terminate(c.cmd, c.rpc)

c.cmd = nil
c.rpc = nil

return fmt.Errorf("failed to set config: %w", err)
}
c.Logger.Debugw("Successfully set config", zap.String("config", c.Config))
p.cmd = cmd
p.rpc = rpc.NewRPC(writer, reader, p.Logger)

return nil
}

func (c *Channel) Stop() {
c.mu.Lock()
defer c.mu.Unlock()
func (p *Plugin) Stop() {
p.mu.Lock()
defer p.mu.Unlock()

if c.cmd == nil {
c.Logger.Debug("channel plugin has already been stopped")
if p.cmd == nil {
p.Logger.Debug("channel plugin has already been stopped")
return
}

go c.terminate(c.cmd, c.rpc)
go p.terminate(p.cmd, p.rpc)

c.cmd = nil
c.rpc = nil
p.cmd = nil
p.rpc = nil

c.Logger.Debug("Stopped channel plugin successfully")
p.Logger.Debug("Stopped channel plugin successfully")
}

func forwardLogs(errPipe io.Reader, logger *zap.SugaredLogger) {
Expand All @@ -113,16 +106,16 @@ func forwardLogs(errPipe io.Reader, logger *zap.SugaredLogger) {
}

// run as go routine to terminate given channel
func (c *Channel) terminate(cmd *exec.Cmd, rpc *rpc.RPC) {
c.Logger.Debug("terminating channel plugin")
func (p *Plugin) terminate(cmd *exec.Cmd, rpc *rpc.RPC) {
p.Logger.Debug("terminating channel plugin")
_ = rpc.Close()

timer := time.AfterFunc(5*time.Second, func() {
c.Logger.Debug("killing the channel plugin")
p.Logger.Debug("killing the channel plugin")
_ = cmd.Process.Kill()
})

_ = cmd.Wait()
timer.Stop()
c.Logger.Debug("Channel plugin terminated successfully")
p.Logger.Debug("Channel plugin terminated successfully")
}
18 changes: 9 additions & 9 deletions internal/channel/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import (
"strings"
)

func (c *Channel) GetInfo() (*plugin.Info, error) {
result, err := c.rpcCall(plugin.MethodGetInfo, nil)
func (p *Plugin) GetInfo() (*plugin.Info, error) {
result, err := p.rpcCall(plugin.MethodGetInfo, nil)
if err != nil {
return nil, err
}
Expand All @@ -27,13 +27,13 @@ func (c *Channel) GetInfo() (*plugin.Info, error) {
return info, nil
}

func (c *Channel) SetConfig(config string) error {
_, err := c.rpcCall(plugin.MethodSetConfig, json.RawMessage(config))
func (p *Plugin) SetConfig(config string) error {
_, err := p.rpcCall(plugin.MethodSetConfig, json.RawMessage(config))

return err
}

func (c *Channel) SendNotification(contact *recipient.Contact, i contracts.Incident, ev *event.Event, icingaweb2Url string) error {
func (p *Plugin) SendNotification(contact *recipient.Contact, i contracts.Incident, ev *event.Event, icingaweb2Url string) error {
contactStruct := &plugin.Contact{FullName: contact.FullName}
for _, addr := range contact.Addresses {
contactStruct.Addresses = append(contactStruct.Addresses, &plugin.Address{Type: addr.Type, Address: addr.Address})
Expand Down Expand Up @@ -69,17 +69,17 @@ func (c *Channel) SendNotification(contact *recipient.Contact, i contracts.Incid
return fmt.Errorf("failed to prepare request params: %w", err)
}

_, err = c.rpcCall(plugin.MethodSendNotification, params)
_, err = p.rpcCall(plugin.MethodSendNotification, params)

return err
}

func (c *Channel) rpcCall(method string, params json.RawMessage) (json.RawMessage, error) {
result, err := c.rpc.Call(method, params)
func (p *Plugin) rpcCall(method string, params json.RawMessage) (json.RawMessage, error) {
result, err := p.rpc.Call(method, params)

var rpcErr *rpc.Error
if errors.As(err, &rpcErr) {
c.Stop()
p.Stop()
}

return result, err
Expand Down
2 changes: 1 addition & 1 deletion internal/config/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (r *RuntimeConfig) applyPendingChannels() {
currentChannel.Config = pendingChannel.Config

currentChannel.Logger.Info("Stopping the channel plugin because the config has been changed")
currentChannel.Stop()
//currentChannel.Stop()
}
} else {
r.Channels[typ] = pendingChannel
Expand Down
Loading

0 comments on commit 7bac817

Please sign in to comment.