Skip to content

Commit

Permalink
Limit number of reloads per time
Browse files Browse the repository at this point in the history
Limit the number of module reloads to once per 5s.
  • Loading branch information
Ulexus committed Mar 29, 2020
1 parent 2ca83c0 commit 9580f0a
Showing 1 changed file with 118 additions and 40 deletions.
158 changes: 118 additions & 40 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"archive/zip"
"context"
"fmt"
"io"
"io/ioutil"
Expand All @@ -11,6 +12,7 @@ import (
"path"
"path/filepath"
"strings"
"sync"
"time"

"github.com/CyCoreSystems/kubetemplate"
Expand All @@ -24,6 +26,7 @@ const renderFlagFilename = ".asterisk-config"

var maxShortDeaths = 10
var minRuntime = time.Minute
var defaultMinReloadInterval = 5 * time.Second

// Service maintains an Asterisk configuration set
type Service struct {
Expand Down Expand Up @@ -138,8 +141,13 @@ func main() {

// Run executes the Service
func (s *Service) Run() error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

renderChan := make(chan error, 1)

r := newReloader(ctx, ariUsername, s.Secret, s.Modules)

s.engine = kubetemplate.NewEngine(renderChan, s.Discoverer)
defer s.engine.Close()

Expand All @@ -158,6 +166,8 @@ func (s *Service) Run() error {
return errors.Wrap(err, "failed to write render flag file")
}

r.Reload()

s.engine.FirstRenderComplete(true)

// Wait for Asterisk to come up before proceeding, so as to not interrupt
Expand All @@ -182,9 +192,7 @@ func (s *Service) Run() error {
return errors.Wrap(err, "failed to render configuration")
}

if err := reload(ariUsername, s.Secret, s.Modules); err != nil {
return errors.Wrap(err, "failed to reload asterisk modules")
}
r.Reload()
}
}

Expand Down Expand Up @@ -311,43 +319,6 @@ func waitAsterisk(username, secret string) error {
}
}

func reload(username, secret, modules string) (err error) {
urlFormat := "http://127.0.0.1:8088/ari/asterisk/modules/%s"

for _, m := range strings.Split(modules, ",") {

var r *http.Request
r, err = http.NewRequest("PUT", fmt.Sprintf(urlFormat, m), nil)
if err != nil {
return errors.Wrapf(err, "failed to construct module reload request for module %s", m)
}
r.Header.Set("Content-Type", "application/json")
r.SetBasicAuth(username, secret)

ret, err := http.DefaultClient.Do(r)
if err != nil {
return errors.Wrapf(err, "failed to contact ARI to reload module %s", m)
}
ret.Body.Close() // nolint

switch ret.StatusCode {
case http.StatusNoContent:
continue
case http.StatusNotFound:
return errors.Errorf("module %s not already loaded", m)
case http.StatusUnauthorized:
return errors.Errorf("module %s failed to reload due bad authentication", m)
case 409:
return errors.Errorf("module %s could not be reloaded", m)
default:
return errors.Errorf("module %s reload failed: %s", m, ret.Status)
}
}

log.Println("reloads complete")
return nil
}

func extractSource(source, customRoot string) (err error) {
if strings.HasPrefix(source, "http") {
source, err = downloadSource(source)
Expand Down Expand Up @@ -434,3 +405,110 @@ func downloadSource(uri string) (string, error) {

return tf.Name(), err
}

type reloader struct {
lastReload time.Time
minReloadInterval time.Duration

username string
secret string

modules []string

needReload bool

mu sync.Mutex
}

func newReloader(ctx context.Context, username, secret, modules string) *reloader {
r := &reloader{
minReloadInterval: defaultMinReloadInterval,
username: username,
secret: secret,
}

for _, m := range strings.Split(modules, ",") {
r.modules = append(r.modules, strings.TrimSpace(m))
}

go r.run(ctx)

return r
}

func (r *reloader) run(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-time.After(r.minReloadInterval):
}

if err := r.maybeRunReload(); err != nil {
log.Println("failed to reload modules", err)
}
}
}

func (r *reloader) maybeRunReload() error {
r.mu.Lock()
defer r.mu.Unlock()

if r.needReload {
if err := r.reload(); err != nil {
return err
}

r.needReload = false
}

return nil
}

func (r *reloader) Reload() {
r.mu.Lock()
r.needReload = true
r.mu.Unlock()
}

func (r *reloader) reload() error {
log.Println("reloading Asterisk modules")
for _, m := range r.modules {
if err := r.reloadModule(m); err != nil {
return err
}
}
log.Println("Asterisk modules reloaded")

return nil
}

func (r *reloader) reloadModule(name string) error {
url := fmt.Sprintf("http://127.0.0.1:8088/ari/asterisk/modules/%s", name)

req, err := http.NewRequest("PUT", url, nil)
if err != nil {
return errors.Wrapf(err, "failed to construct module reload request for module %s", name)
}
req.Header.Set("Content-Type", "application/json")
req.SetBasicAuth(r.username, r.secret)

ret, err := http.DefaultClient.Do(req)
if err != nil {
return errors.Wrapf(err, "failed to contact ARI to reload module %s", name)
}
ret.Body.Close() // nolint

switch ret.StatusCode {
case http.StatusNoContent:
return nil
case http.StatusNotFound:
return errors.Errorf("module %s not already loaded", name)
case http.StatusUnauthorized:
return errors.Errorf("module %s failed to reload due bad authentication", name)
case 409:
return errors.Errorf("module %s could not be reloaded", name)
default:
return errors.Errorf("module %s reload failed: %s", name, ret.Status)
}
}

0 comments on commit 9580f0a

Please sign in to comment.