Skip to content

Commit

Permalink
Notification Addition (#73)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sotirios Mantziaris authored Jan 4, 2021
1 parent 33868aa commit 7f03dbb
Show file tree
Hide file tree
Showing 16 changed files with 416 additions and 207 deletions.
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,15 @@ The `Harvester` builder pattern is used to create a `Harvester` instance. The bu

The above snippet set's up a `Harvester` instance with consul seed and monitor.

## Notification support

In order to be able to monitor the changes in the configuration we provide a way to notify when a change is happening via the builder.

```go
h, err := harvester.New(&cfg).WithNotification(chNotify).Create()
...
```

## Consul

Consul has support for versioning (`ModifyIndex`) which allows us to change the value only if the version is higher than the one currently.
Expand Down
35 changes: 32 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,23 +33,37 @@ type CfgType interface {
SetString(string) error
}

// ChangeNotification definition for a configuration change.
type ChangeNotification struct {
Name string
Type string
Previous string
Current string
}

func (n ChangeNotification) String() string {
return fmt.Sprintf("field [%s] of type [%s] changed from [%s] to [%s]", n.Name, n.Type, n.Previous, n.Current)
}

// Field definition of a config value that can change.
type Field struct {
name string
tp string
version uint64
structField CfgType
sources map[Source]string
chNotify chan<- ChangeNotification
}

// newField constructor.
func newField(prefix string, fld reflect.StructField, val reflect.Value) *Field {
func newField(prefix string, fld reflect.StructField, val reflect.Value, chNotify chan<- ChangeNotification) *Field {
f := &Field{
name: prefix + fld.Name,
tp: fld.Type.Name(),
version: 0,
structField: val.Addr().Interface().(CfgType),
sources: make(map[Source]string),
chNotify: chNotify,
}

for _, tag := range sourceTags {
Expand Down Expand Up @@ -94,27 +108,42 @@ func (f *Field) Set(value string, version uint64) error {
return nil
}

prevValue := f.structField.String()

if err := f.structField.SetString(value); err != nil {
return err
}

f.version = version
log.Infof("field %q updated with value %q, version: %d", f.name, f, version)
f.sendNotification(prevValue, value)
return nil
}

func (f *Field) sendNotification(prev string, current string) {
if f.chNotify == nil {
return
}
f.chNotify <- ChangeNotification{
Name: f.name,
Type: f.tp,
Previous: prev,
Current: current,
}
}

// Config manages configuration and handles updates on the values.
type Config struct {
Fields []*Field
}

// New creates a new monitor.
func New(cfg interface{}) (*Config, error) {
func New(cfg interface{}, chNotify chan<- ChangeNotification) (*Config, error) {
if cfg == nil {
return nil, errors.New("configuration is nil")
}

ff, err := newParser().ParseCfg(cfg)
ff, err := newParser().ParseCfg(cfg, chNotify)
if err != nil {
return nil, err
}
Expand Down
63 changes: 37 additions & 26 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,29 @@ import (

func TestField_Set(t *testing.T) {
c := testConfig{}
cfg, err := New(&c)
cfg, err := New(&c, nil)
require.NoError(t, err)
cfg.Fields[0].version = 2
type args struct {
value string
version uint64
}
tests := []struct {
name string
tests := map[string]struct {
field Field
args args
wantErr bool
}{
{name: "success String", field: *cfg.Fields[0], args: args{value: "John Doe", version: 3}, wantErr: false},
{name: "success Int64", field: *cfg.Fields[1], args: args{value: "18", version: 1}, wantErr: false},
{name: "success Float64", field: *cfg.Fields[2], args: args{value: "99.9", version: 1}, wantErr: false},
{name: "success Bool", field: *cfg.Fields[3], args: args{value: "true", version: 1}, wantErr: false},
{name: "failure Int64", field: *cfg.Fields[1], args: args{value: "XXX", version: 1}, wantErr: true},
{name: "failure Float64", field: *cfg.Fields[2], args: args{value: "XXX", version: 1}, wantErr: true},
{name: "failure Bool", field: *cfg.Fields[3], args: args{value: "XXX", version: 1}, wantErr: true},
{name: "warn String version older", field: *cfg.Fields[0], args: args{value: "John Doe", version: 2}, wantErr: false},
"success String": {field: *cfg.Fields[0], args: args{value: "John Doe", version: 3}, wantErr: false},
"success Int64": {field: *cfg.Fields[1], args: args{value: "18", version: 1}, wantErr: false},
"success Float64": {field: *cfg.Fields[2], args: args{value: "99.9", version: 1}, wantErr: false},
"success Bool": {field: *cfg.Fields[3], args: args{value: "true", version: 1}, wantErr: false},
"failure Int64": {field: *cfg.Fields[1], args: args{value: "XXX", version: 1}, wantErr: true},
"failure Float64": {field: *cfg.Fields[2], args: args{value: "XXX", version: 1}, wantErr: true},
"failure Bool": {field: *cfg.Fields[3], args: args{value: "XXX", version: 1}, wantErr: true},
"warn String version older": {field: *cfg.Fields[0], args: args{value: "John Doe", version: 2}, wantErr: false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
for name, tt := range tests {
t.Run(name, func(t *testing.T) {
err := tt.field.Set(tt.args.value, tt.args.version)
if tt.wantErr {
assert.Error(t, err)
Expand All @@ -48,22 +47,21 @@ func TestNew(t *testing.T) {
type args struct {
cfg interface{}
}
tests := []struct {
name string
tests := map[string]struct {
args args
wantErr bool
}{
{name: "success", args: args{cfg: &testConfig{}}, wantErr: false},
{name: "cfg is nil", args: args{cfg: nil}, wantErr: true},
{name: "cfg is not pointer", args: args{cfg: testConfig{}}, wantErr: true},
{name: "cfg field not supported", args: args{cfg: &testInvalidTypeConfig{}}, wantErr: true},
{name: "cfg duplicate consul key", args: args{cfg: &testDuplicateConfig{}}, wantErr: true},
{name: "cfg tagged struct not supported", args: args{cfg: &testInvalidNestedStructWithTags{}}, wantErr: true},
{name: "cfg nested duplicate consul key", args: args{cfg: &testDuplicateNestedConsulConfig{}}, wantErr: true},
"success": {args: args{cfg: &testConfig{}}, wantErr: false},
"cfg is nil": {args: args{cfg: nil}, wantErr: true},
"cfg is not pointer": {args: args{cfg: testConfig{}}, wantErr: true},
"cfg field not supported": {args: args{cfg: &testInvalidTypeConfig{}}, wantErr: true},
"cfg duplicate consul key": {args: args{cfg: &testDuplicateConfig{}}, wantErr: true},
"cfg tagged struct not supported": {args: args{cfg: &testInvalidNestedStructWithTags{}}, wantErr: true},
"cfg nested duplicate consul key": {args: args{cfg: &testDuplicateNestedConsulConfig{}}, wantErr: true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := New(tt.args.cfg)
for name, tt := range tests {
t.Run(name, func(t *testing.T) {
got, err := New(tt.args.cfg, nil)
if tt.wantErr {
assert.Error(t, err)
assert.Nil(t, got)
Expand Down Expand Up @@ -97,20 +95,33 @@ func assertField(t *testing.T, fld *Field, name, typ string, sources map[Source]

func TestConfig_Set(t *testing.T) {
c := testConfig{}
cfg, err := New(&c)
chNotify := make(chan ChangeNotification, 1)
cfg, err := New(&c, chNotify)
require.NoError(t, err)
err = cfg.Fields[0].Set("John Doe", 1)
assert.NoError(t, err)
change := <-chNotify
assert.Equal(t, "field [Name] of type [String] changed from [] to [John Doe]", change.String())
err = cfg.Fields[1].Set("18", 1)
assert.NoError(t, err)
change = <-chNotify
assert.Equal(t, "field [Age] of type [Int64] changed from [0] to [18]", change.String())
err = cfg.Fields[2].Set("99.9", 1)
assert.NoError(t, err)
change = <-chNotify
assert.Equal(t, "field [Balance] of type [Float64] changed from [0.000000] to [99.9]", change.String())
err = cfg.Fields[3].Set("true", 1)
assert.NoError(t, err)
change = <-chNotify
assert.Equal(t, "field [HasJob] of type [Bool] changed from [false] to [true]", change.String())
err = cfg.Fields[4].Set("6000", 1)
assert.NoError(t, err)
change = <-chNotify
assert.Equal(t, "field [PositionSalary] of type [Int64] changed from [0] to [6000]", change.String())
err = cfg.Fields[5].Set("baz", 1)
assert.NoError(t, err)
change = <-chNotify
assert.Equal(t, "field [LevelOneLevelTwoDeepField] of type [String] changed from [] to [baz]", change.String())
assert.Equal(t, "John Doe", c.Name.Get())
assert.Equal(t, int64(18), c.Age.Get())
assert.Equal(t, 99.9, c.Balance.Get())
Expand Down
4 changes: 2 additions & 2 deletions config/custom_type_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

func TestCustomField(t *testing.T) {
c := &testConfig{}
cfg, err := config.New(c)
cfg, err := config.New(c, nil)
assert.NoError(t, err)
err = cfg.Fields[0].Set("expected", 1)
assert.NoError(t, err)
Expand All @@ -24,7 +24,7 @@ func TestCustomField(t *testing.T) {

func TestErrorValidationOnCustomField(t *testing.T) {
c := &testConfig{}
cfg, err := config.New(c)
cfg, err := config.New(c, nil)
assert.NoError(t, err)
err = cfg.Fields[0].Set("not_expected", 1)
assert.Error(t, err)
Expand Down
14 changes: 7 additions & 7 deletions config/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,18 @@ func newParser() *parser {
return &parser{}
}

func (p *parser) ParseCfg(cfg interface{}) ([]*Field, error) {
func (p *parser) ParseCfg(cfg interface{}, chNotify chan<- ChangeNotification) ([]*Field, error) {
p.dups = make(map[Source]string)

tp := reflect.TypeOf(cfg)
if tp.Kind() != reflect.Ptr {
return nil, errors.New("configuration should be a pointer type")
}

return p.getFields("", tp.Elem(), reflect.ValueOf(cfg).Elem())
return p.getFields("", tp.Elem(), reflect.ValueOf(cfg).Elem(), chNotify)
}

func (p *parser) getFields(prefix string, tp reflect.Type, val reflect.Value) ([]*Field, error) {
func (p *parser) getFields(prefix string, tp reflect.Type, val reflect.Value, chNotify chan<- ChangeNotification) ([]*Field, error) {
var ff []*Field

for i := 0; i < tp.NumField(); i++ {
Expand All @@ -46,13 +46,13 @@ func (p *parser) getFields(prefix string, tp reflect.Type, val reflect.Value) ([

switch typ {
case typeField:
fld, err := p.createField(prefix, f, val.Field(i))
fld, err := p.createField(prefix, f, val.Field(i), chNotify)
if err != nil {
return nil, err
}
ff = append(ff, fld)
case typeStruct:
nested, err := p.getFields(prefix+f.Name, f.Type, val.Field(i))
nested, err := p.getFields(prefix+f.Name, f.Type, val.Field(i), chNotify)
if err != nil {
return nil, err
}
Expand All @@ -62,8 +62,8 @@ func (p *parser) getFields(prefix string, tp reflect.Type, val reflect.Value) ([
return ff, nil
}

func (p *parser) createField(prefix string, f reflect.StructField, val reflect.Value) (*Field, error) {
fld := newField(prefix, f, val)
func (p *parser) createField(prefix string, f reflect.StructField, val reflect.Value, chNotify chan<- ChangeNotification) (*Field, error) {
fld := newField(prefix, f, val, chNotify)

value, ok := fld.Sources()[SourceConsul]
if ok {
Expand Down
54 changes: 54 additions & 0 deletions examples/06_notification/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package main

import (
"context"
"log"
"os"
"sync"

"github.com/beatlabs/harvester"
"github.com/beatlabs/harvester/config"
harvestersync "github.com/beatlabs/harvester/sync"
)

type cfg struct {
IndexName harvestersync.String `seed:"customers-v1"`
CacheRetention harvestersync.Int64 `seed:"43200" env:"ENV_CACHE_RETENTION_SECONDS"`
LogLevel harvestersync.String `seed:"DEBUG" flag:"loglevel"`
}

func main() {
ctx, cnl := context.WithCancel(context.Background())
defer cnl()

err := os.Setenv("ENV_CACHE_RETENTION_SECONDS", "86400")
if err != nil {
log.Fatalf("failed to set env var: %v", err)
}

cfg := cfg{}
chNotify := make(chan config.ChangeNotification)
wg := sync.WaitGroup{}
wg.Add(1)

go func() {
for change := range chNotify {
log.Printf("notification: " + change.String())
}
wg.Done()
}()

h, err := harvester.New(&cfg).WithNotification(chNotify).Create()
if err != nil {
log.Fatalf("failed to create harvester: %v", err)
}

err = h.Harvest(ctx)
if err != nil {
log.Fatalf("failed to harvest configuration: %v", err)
}

log.Printf("Config : IndexName: %s, CacheRetention: %d, LogLevel: %s\n", cfg.IndexName.Get(), cfg.CacheRetention.Get(), cfg.LogLevel.Get())
close(chNotify)
wg.Wait()
}
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
module github.com/beatlabs/harvester

go 1.13
go 1.15

require (
github.com/hashicorp/go-hclog v0.15.0
github.com/hashicorp/consul/api v1.8.1
github.com/hashicorp/go-hclog v0.15.0
github.com/stretchr/testify v1.6.1
)
Loading

0 comments on commit 7f03dbb

Please sign in to comment.