Skip to content

Commit

Permalink
Fix missing consul keys (#22)
Browse files Browse the repository at this point in the history
Signed-off-by: Constantinos Christofilos <[email protected]>
  • Loading branch information
c0nstantx authored Jun 5, 2019
1 parent a926743 commit 771f3f4
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 38 deletions.
10 changes: 4 additions & 6 deletions harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type Seeder interface {

// Monitor defines a interface for monitoring configuration changes from various sources.
type Monitor interface {
Monitor(ctx context.Context, chErr chan<- error) error
Monitor(ctx context.Context) error
}

// Harvester interface.
Expand All @@ -30,7 +30,6 @@ type harvester struct {
cfg *config.Config
seeder Seeder
monitor Monitor
chErr chan<- error
}

// Harvest take the configuration object, initializes it and monitors for changes.
Expand All @@ -42,7 +41,7 @@ func (h *harvester) Harvest(ctx context.Context) error {
if h.monitor == nil {
return nil
}
return h.monitor.Monitor(ctx, h.chErr)
return h.monitor.Monitor(ctx)
}

// Builder of a harvester instance.
Expand Down Expand Up @@ -104,16 +103,15 @@ func (b *Builder) Create() (Harvester, error) {
if b.err != nil {
return nil, b.err
}
chErr := make(chan<- error)
seed := seed.New(b.seedParams...)

var mon Monitor
if len(b.watchers) == 0 {
return &harvester{seeder: seed, chErr: chErr, cfg: b.cfg}, nil
return &harvester{seeder: seed, cfg: b.cfg}, nil
}
mon, err := monitor.New(b.cfg, b.watchers...)
if err != nil {
return nil, err
}
return &harvester{seeder: seed, monitor: mon, chErr: chErr, cfg: b.cfg}, nil
return &harvester{seeder: seed, monitor: mon, cfg: b.cfg}, nil
}
33 changes: 13 additions & 20 deletions monitor/consul/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func New(addr, dc, token string, timeout time.Duration, ii ...Item) (*Watcher, e
}

// Watch key and prefixes for changes.
func (w *Watcher) Watch(ctx context.Context, ch chan<- []*change.Change, chErr chan<- error) error {
func (w *Watcher) Watch(ctx context.Context, ch chan<- []*change.Change) error {
if ctx == nil {
return errors.New("context is nil")
}
Expand All @@ -79,9 +79,9 @@ func (w *Watcher) Watch(ctx context.Context, ch chan<- []*change.Change, chErr c
var err error
switch i.tp {
case "key":
pl, err = w.runKeyWatcher(i.key, ch, chErr)
pl, err = w.runKeyWatcher(i.key, ch)
case "keyprefix":
pl, err = w.runPrefixWatcher(i.key, ch, chErr)
pl, err = w.runPrefixWatcher(i.key, ch)
}
if err != nil {
return err
Expand All @@ -96,9 +96,6 @@ func (w *Watcher) Watch(ctx context.Context, ch chan<- []*change.Change, chErr c
logger := log.New(output, "", log.LstdFlags)
err := pl.RunWithClientAndLogger(w.cl, logger)
if err != nil {
if chErr != nil {
chErr <- err
}
harvesterlog.Errorf("plan %s of type %s failed: %v", tp, key, err)
}
}(i.tp, i.key)
Expand All @@ -113,42 +110,38 @@ func (w *Watcher) Watch(ctx context.Context, ch chan<- []*change.Change, chErr c
return nil
}

func (w *Watcher) runKeyWatcher(key string, ch chan<- []*change.Change, chErr chan<- error) (*watch.Plan, error) {
func (w *Watcher) runKeyWatcher(key string, ch chan<- []*change.Change) (*watch.Plan, error) {
pl, err := w.getPlan("key", key)
if err != nil {
return nil, err
}
pl.Handler = func(idx uint64, data interface{}) {
pair, ok := data.(*api.KVPair)
if !ok {
if chErr != nil {
chErr <- err
}
harvesterlog.Errorf("data is not kv pair: %v", data)
} else {
ch <- []*change.Change{change.New(config.SourceConsul, pair.Key, string(pair.Value), pair.ModifyIndex)}
}
ch <- []*change.Change{change.New(config.SourceConsul, pair.Key, string(pair.Value), pair.ModifyIndex)}
}
return pl, nil
}

func (w *Watcher) runPrefixWatcher(key string, ch chan<- []*change.Change, chErr chan<- error) (*watch.Plan, error) {
func (w *Watcher) runPrefixWatcher(key string, ch chan<- []*change.Change) (*watch.Plan, error) {
pl, err := w.getPlan("keyprefix", key)
if err != nil {
return nil, err
}
pl.Handler = func(idx uint64, data interface{}) {
pp, ok := data.(api.KVPairs)
if !ok {
if chErr != nil {
chErr <- err
}
harvesterlog.Errorf("data is not kv pairs: %v", data)
} else {
cc := make([]*change.Change, len(pp))
for i := 0; i < len(pp); i++ {
cc[i] = change.New(config.SourceConsul, pp[i].Key, string(pp[i].Value), pp[i].ModifyIndex)
}
ch <- cc
}
cc := make([]*change.Change, len(pp))
for i := 0; i < len(pp); i++ {
cc[i] = change.New(config.SourceConsul, pp[i].Key, string(pp[i].Value), pp[i].ModifyIndex)
}
ch <- cc
}
return pl, nil
}
Expand Down
3 changes: 1 addition & 2 deletions monitor/consul/watcher_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,12 @@ func TestMain(m *testing.M) {

func TestWatch(t *testing.T) {
ch := make(chan []*change.Change)
chErr := make(chan error)
w, err := New(addr, "", "", 0, NewKeyItem("key1"), NewPrefixItem("prefix1"))
require.NoError(t, err)
require.NotNil(t, w)
ctx, cnl := context.WithCancel(context.Background())
defer cnl()
err = w.Watch(ctx, ch, chErr)
err = w.Watch(ctx, ch)
require.NoError(t, err)

for i := 0; i < 2; i++ {
Expand Down
3 changes: 1 addition & 2 deletions monitor/consul/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ func TestNew(t *testing.T) {
func TestWatcher_Watch(t *testing.T) {
w, err := New("xxx", "", "", 0, Item{})
require.NoError(t, err)
chErr := make(chan error)
type args struct {
ctx context.Context
ch chan<- []*change.Change
Expand All @@ -59,7 +58,7 @@ func TestWatcher_Watch(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err = w.Watch(tt.args.ctx, tt.args.ch, chErr)
err = w.Watch(tt.args.ctx, tt.args.ch)
if tt.wantErr {
assert.Error(t, err)
} else {
Expand Down
6 changes: 3 additions & 3 deletions monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

// Watcher interface definition.
type Watcher interface {
Watch(ctx context.Context, ch chan<- []*change.Change, chErr chan<- error) error
Watch(ctx context.Context, ch chan<- []*change.Change) error
}

type sourceMap map[config.Source]map[string]*config.Field
Expand Down Expand Up @@ -61,12 +61,12 @@ func generateMap(ff []*config.Field) (sourceMap, error) {
}

// Monitor configuration changes by starting watchers per source.
func (m *Monitor) Monitor(ctx context.Context, chErr chan<- error) error {
func (m *Monitor) Monitor(ctx context.Context) error {
ch := make(chan []*change.Change)
go m.monitor(ctx, ch)

for _, w := range m.ww {
err := w.Watch(ctx, ch, chErr)
err := w.Watch(ctx, ch)
if err != nil {
return err
}
Expand Down
8 changes: 3 additions & 5 deletions monitor/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ func TestMonitor_Monitor_Error(t *testing.T) {
watchers := []Watcher{&testWatcher{}, &testWatcher{err: true}}
mon, err := New(cfg, watchers...)
require.NoError(t, err)
chErr := make(chan error)
err = mon.Monitor(context.Background(), chErr)
err = mon.Monitor(context.Background())
assert.Error(t, err)
}

Expand All @@ -66,9 +65,8 @@ func TestMonitor_Monitor(t *testing.T) {
watchers := []Watcher{&testWatcher{}}
mon, err := New(cfg, watchers...)
require.NoError(t, err)
chErr := make(chan error)
ctx, cnl := context.WithCancel(context.Background())
err = mon.Monitor(ctx, chErr)
err = mon.Monitor(ctx)
assert.NoError(t, err)
time.Sleep(100 * time.Millisecond)
cnl()
Expand All @@ -88,7 +86,7 @@ type testWatcher struct {
err bool
}

func (tw *testWatcher) Watch(ctx context.Context, ch chan<- []*change.Change, chErr chan<- error) error {
func (tw *testWatcher) Watch(ctx context.Context, ch chan<- []*change.Change) error {
if tw.err {
return errors.New("TEST")
}
Expand Down

0 comments on commit 771f3f4

Please sign in to comment.