Skip to content

Commit

Permalink
add waitgroup to controller reconcile loop
Browse files Browse the repository at this point in the history
resolves #14
  • Loading branch information
jlarfors committed Mar 14, 2024
1 parent be6a590 commit 596ee71
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 1 deletion.
56 changes: 56 additions & 0 deletions pkg/hz/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"reflect"
"runtime/debug"
"strings"
"sync"
"time"

"github.com/nats-io/nats.go"
Expand Down Expand Up @@ -64,6 +65,12 @@ func WithControllerOwns(obj Objecter) ControllerOption {
}
}

func WithControllerStopTimeout(d time.Duration) ControllerOption {
return func(ro *controllerOption) {
ro.stopTimeout = d
}
}

type controllerOption struct {
bucketObjects string
bucketMutex string
Expand All @@ -75,12 +82,15 @@ type controllerOption struct {

forObject Objecter
reconOwns []Objecter

stopTimeout time.Duration
}

var controllerOptionsDefault = controllerOption{
bucketObjects: BucketObjects,
bucketMutex: BucketMutex,
cueValidator: true,
stopTimeout: time.Minute * 10,
}

func StartController(
Expand All @@ -101,6 +111,9 @@ func StartController(
type Controller struct {
Conn *nats.Conn

wg sync.WaitGroup
stopTimeout time.Duration

subscriptions []*nats.Subscription
consumeContexts []jetstream.ConsumeContext
}
Expand All @@ -116,6 +129,8 @@ func (c *Controller) Start(
if ro.forObject == nil {
return fmt.Errorf("no object provided to controller")
}

c.stopTimeout = ro.stopTimeout
// Check the forObject value is not a pointer, as this causes problems for
// the cue encoder. If it is a pointer, get its element.
if reflect.ValueOf(ro.forObject).Type().Kind() == reflect.Ptr {
Expand Down Expand Up @@ -396,9 +411,48 @@ func (c *Controller) Stop() error {
errs = errors.Join(errs, err)
}
}

// Wait for all reconcile loops to finish, with a timeout.
if c.stopWaitTimeout() {
errs = errors.Join(
errs,
fmt.Errorf(
"timeout after %s waiting for reconcile loops to finish",
c.stopTimeout,
),
)
}
return errs
}

func (c *Controller) stopWaitTimeout() bool {
done := make(chan struct{})
go func() {
defer close(done)
c.wg.Wait()
}()
tickDuration := time.Second * 10
ticker := time.NewTicker(tickDuration)
elapsedTime := time.Duration(0)
for {
elapsedTime += tickDuration
select {
case <-ticker.C:
slog.Info(
"waiting for reconcile loops to finish",
"elapsed",
elapsedTime,
"timeout",
c.stopTimeout,
)
case <-done:
return false // completed normally
case <-time.After(c.stopTimeout):
return true // timed out
}
}
}

// handleControlLoop is the main control loop for the controller.
// - kv is the kv store that the controller is watching
// - mutex is the mutex bucket for the kv store
Expand All @@ -418,6 +472,8 @@ func (c *Controller) handleControlLoop(
msg jetstream.Msg,
ttl time.Duration,
) {
c.wg.Add(1)
defer c.wg.Done()
// Check that the message is the last message for the subject.
// If not, we don't care about it and want to avoid acquiring the lock.
isLast, err := isLastMsg(ctx, kv, msg)
Expand Down
66 changes: 65 additions & 1 deletion pkg/hz/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,9 @@ func TestReconcilerSlow(t *testing.T) {
hz.WithControllerFor(&DummyObject{}),
)
tu.AssertNoError(t, err)
defer ctlr.Stop()
t.Cleanup(func() {
ctlr.Stop()
})

do := DummyObject{
ObjectMeta: hz.ObjectMeta{
Expand All @@ -224,12 +226,74 @@ func TestReconcilerSlow(t *testing.T) {
close(done)
}()

timeBefore := time.Now()
select {
case <-done:
t.Log("Slow reconciler finished")
case <-time.After(time.Second * 15):
t.Fatal("timed out waiting for slow reconciler")
}
if time.Since(timeBefore) < time.Second*5 {
t.Fatal("reconciler did not wait for slow reconciler to run twice")
}
}

type SleepReconciler struct {
dur time.Duration
}

func (r *SleepReconciler) Reconcile(
ctx context.Context,
request hz.Request,
) (hz.Result, error) {
fmt.Println("SleepReconciler is sleeping for ", r.dur.String())
// Simulate a long running process...
time.Sleep(r.dur)
return hz.Result{}, nil
}

// TestReconcilerWaitForFinish tests that the controller waits for the
// reconciler to finish before stopping.
func TestReconcilerWaitForFinish(t *testing.T) {
ctx := context.Background()
ti := server.Test(t, ctx)

client := hz.NewClient(
ti.Conn,
hz.WithClientInternal(true),
hz.WithClientDefaultManager(),
)
dummyClient := hz.ObjectClient[DummyObject]{Client: client}

sr := SleepReconciler{
dur: time.Second * 3,
}
ctlr, err := hz.StartController(
ctx,
ti.Conn,
hz.WithControllerReconciler(&sr),
hz.WithControllerFor(&DummyObject{}),
)
tu.AssertNoError(t, err)

do := DummyObject{
ObjectMeta: hz.ObjectMeta{
Account: "test",
Name: "dummy",
},
}

err = dummyClient.Apply(ctx, do)
tu.AssertNoError(t, err)
// Wait just a moment, before stopping the controller.
time.Sleep(time.Millisecond * 100)
timeBefore := time.Now()
err = ctlr.Stop()
tu.AssertNoError(t, err)

if time.Since(timeBefore) < sr.dur-time.Second {
t.Fatal("controller did not wait for slow reconciler to finish")
}
}

// ConcurrentReconciler is made to test that a reconciler is NEVER called
Expand Down

0 comments on commit 596ee71

Please sign in to comment.