Skip to content

Commit

Permalink
Merge pull request juju#17351 from SimonRichardson/context-aware-simp…
Browse files Browse the repository at this point in the history
…le-worker

juju#17351

All our workers are becoming context compatible. This is because all requests from a worker to a domain (database) need the concept of a cancelation (completion or timeout or deadline). Moving simple worker to context pushes the needle in the right direction.

<!-- Why this change is needed and what it does. -->

## Checklist

<!-- If an item is not applicable, use `~strikethrough~`. -->

- [x] Code style: imports ordered, good names, simple structure, etc
- [x] Comments saying why design decisions were made
- [x] Go unit tests, with comments saying what you're testing

## QA steps

```sh
$ juju bootstrap lxd test
$ juju add-model default
$ juju deploy ubuntu
$ juju destroy-model default
```

There should be no errors when destroying a model, as the undertaker should have cleaned things nicely.

## Links


**Jira card:** JUJU-
  • Loading branch information
jujubot authored May 9, 2024
2 parents 9a19972 + 6ad34c3 commit 4fb706d
Show file tree
Hide file tree
Showing 14 changed files with 49 additions and 54 deletions.
6 changes: 3 additions & 3 deletions cmd/jujud-controller/agent/machine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ func (s *MachineSuite) TestMachineAgentRunsDiskManagerWorker(c *gc.C) {
started := newSignal()
newWorker := func(diskmanager.ListBlockDevicesFunc, diskmanager.BlockDeviceSetter) worker.Worker {
started.trigger()
return jworker.NewNoOpWorker()
return jworker.NoopWorker()
}
s.PatchValue(&diskmanager.NewWorker, newWorker)

Expand Down Expand Up @@ -424,7 +424,7 @@ func (s *MachineSuite) TestMachineAgentRunsMachineStorageWorker(c *gc.C) {
c.Check(config.Scope, gc.Equals, m.Tag())
c.Check(config.Validate(), jc.ErrorIsNil)
started.trigger()
return jworker.NewNoOpWorker(), nil
return jworker.NoopWorker(), nil
}
s.PatchValue(&storageprovisioner.NewStorageProvisioner, newWorker)

Expand Down Expand Up @@ -501,7 +501,7 @@ func (s *MachineSuite) setupIgnoreAddresses(c *gc.C, expectedIgnoreValue bool) c

// The test just cares that NewMachiner is called with the correct
// value, nothing else is done with the worker.
return newDummyWorker(), nil
return jworker.NoopWorker(), nil
})

attrs := coretesting.Attrs{"ignore-machine-addresses": expectedIgnoreValue}
Expand Down
7 changes: 0 additions & 7 deletions cmd/jujud-controller/agent/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,13 +386,6 @@ func runWithTimeout(c *gc.C, r runner) error {
return fmt.Errorf("timed out waiting for agent to finish; stop error: %v", err)
}

func newDummyWorker() worker.Worker {
return jworker.NewSimpleWorker(func(stop <-chan struct{}) error {
<-stop
return nil
})
}

type FakeConfig struct {
agent.ConfigSetter
values map[string]string
Expand Down
3 changes: 2 additions & 1 deletion cmd/jujud/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/juju/juju/cmd/jujud/agent/agenttest"
"github.com/juju/juju/core/network"
imagetesting "github.com/juju/juju/environs/imagemetadata/testing"
jworker "github.com/juju/juju/internal/worker"
"github.com/juju/juju/internal/worker/proxyupdater"
)

Expand Down Expand Up @@ -66,7 +67,7 @@ func (s *AgentSuite) SetUpTest(c *gc.C) {
err = st.SetAPIHostPorts(controllerConfig, hostPorts, hostPorts)
c.Assert(err, jc.ErrorIsNil)
s.PatchValue(&proxyupdater.NewWorker, func(proxyupdater.Config) (worker.Worker, error) {
return newDummyWorker(), nil
return jworker.NoopWorker(), nil
})

// Tests should not try to use internet. Ensure base url is empty.
Expand Down
6 changes: 3 additions & 3 deletions cmd/jujud/agent/machine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ func (s *MachineSuite) TestMachineAgentRunsDiskManagerWorker(c *gc.C) {
started := newSignal()
newWorker := func(diskmanager.ListBlockDevicesFunc, diskmanager.BlockDeviceSetter) worker.Worker {
started.trigger()
return jworker.NewNoOpWorker()
return jworker.NoopWorker()
}
s.PatchValue(&diskmanager.NewWorker, newWorker)

Expand Down Expand Up @@ -405,7 +405,7 @@ func (s *MachineSuite) TestMachineAgentRunsMachineStorageWorker(c *gc.C) {
c.Check(config.Scope, gc.Equals, m.Tag())
c.Check(config.Validate(), jc.ErrorIsNil)
started.trigger()
return jworker.NewNoOpWorker(), nil
return jworker.NoopWorker(), nil
}
s.PatchValue(&storageprovisioner.NewStorageProvisioner, newWorker)

Expand All @@ -427,7 +427,7 @@ func (s *MachineSuite) setupIgnoreAddresses(c *gc.C, expectedIgnoreValue bool) c

// The test just cares that NewMachiner is called with the correct
// value, nothing else is done with the worker.
return newDummyWorker(), nil
return jworker.NoopWorker(), nil
})

attrs := coretesting.Attrs{"ignore-machine-addresses": expectedIgnoreValue}
Expand Down
7 changes: 0 additions & 7 deletions cmd/jujud/agent/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,13 +339,6 @@ func runWithTimeout(c *gc.C, r runner) error {
return fmt.Errorf("timed out waiting for agent to finish; stop error: %v", err)
}

func newDummyWorker() worker.Worker {
return jworker.NewSimpleWorker(func(stop <-chan struct{}) error {
<-stop
return nil
})
}

type FakeConfig struct {
agent.ConfigSetter
values map[string]string
Expand Down
4 changes: 3 additions & 1 deletion internal/worker/caasenvironupgrader/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
package caasenvironupgrader

import (
"context"

"github.com/juju/errors"
"github.com/juju/names/v5"
"github.com/juju/worker/v4"
Expand Down Expand Up @@ -61,7 +63,7 @@ func NewWorker(config Config) (worker.Worker, error) {
}
// There are no upgrade steps for a CAAS model.
// We just set the status to available and unlock the gate.
return jujuworker.NewSimpleWorker(func(<-chan struct{}) error {
return jujuworker.NewSimpleWorker(func(context.Context) error {
setStatus := func(s status.Status, info string) error {
return config.Facade.SetModelStatus(config.ModelTag, s, info, nil)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/worker/environupgrader/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func newUpgradeWorker(config Config, targetVersion int) (worker.Worker, error) {
return nil, errors.Trace(err)
}

return jujuworker.NewSimpleWorker(func(<-chan struct{}) error {
return jujuworker.NewSimpleWorker(func(ctx stdcontext.Context) error {
// NOTE(axw) the abort channel is ignored, because upgrade
// steps are not interruptible. If we find they need to be
// interruptible, we should consider passing through a
Expand Down
2 changes: 1 addition & 1 deletion internal/worker/identityfilewriter/manifold.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func newWorker(ctx context.Context, a agent.Agent, apiCaller base.APICaller) (wo
}

var NewWorker = func(agentConfig agent.Config) (worker.Worker, error) {
inner := func(<-chan struct{}) error {
inner := func(ctx context.Context) error {
return agent.WriteSystemIdentityFile(agentConfig)
}
return jworker.NewSimpleWorker(inner), nil
Expand Down
10 changes: 5 additions & 5 deletions internal/worker/logsender/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type LogSenderAPI interface {
// New starts a logsender worker which reads log message structs from
// a channel and sends them to the controller via the logsink API.
func New(logs LogRecordCh, logSenderAPI LogSenderAPI) worker.Worker {
loop := func(stop <-chan struct{}) error {
loop := func(ctx context.Context) error {
// It has been observed that sometimes the logsender.API gets wedged
// attempting to get the LogWriter while the agent is being torn down,
// and the call to logSenderAPI.LogWriter() doesn't return. This stops
Expand All @@ -40,13 +40,13 @@ func New(logs LogRecordCh, logSenderAPI LogSenderAPI) worker.Worker {
if err != nil {
select {
case errChan <- err:
case <-stop:
case <-ctx.Done():
}
return
}
select {
case sender <- logWriter:
case <-stop:
case <-ctx.Done():
logWriter.Close()
}
}()
Expand All @@ -56,7 +56,7 @@ func New(logs LogRecordCh, logSenderAPI LogSenderAPI) worker.Worker {
case logWriter = <-sender:
case err = <-errChan:
return errors.Annotate(err, "logsender dial failed")
case <-stop:
case <-ctx.Done():
return nil
}
defer logWriter.Close()
Expand Down Expand Up @@ -101,7 +101,7 @@ func New(logs LogRecordCh, logSenderAPI LogSenderAPI) worker.Worker {
}
}

case <-stop:
case <-ctx.Done():
return nil
}
}
Expand Down
15 changes: 7 additions & 8 deletions internal/worker/noopworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,15 @@
package worker

import (
"context"

"github.com/juju/worker/v4"
)

func NewNoOpWorker() worker.Worker {
return NewSimpleWorker(doNothing)
}

func doNothing(stop <-chan struct{}) error {
select {
case <-stop:
// NoopWorker returns a worker that waits for the context to be done.
func NoopWorker() worker.Worker {
return NewSimpleWorker(func(ctx context.Context) error {
<-ctx.Done()
return nil
}
})
}
8 changes: 4 additions & 4 deletions internal/worker/periodicworker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,18 @@ func (s *periodicWorkerSuite) TestWait(c *gc.C) {
funcHasRun := make(chan struct{})
doWork := func(_ <-chan struct{}) error {
funcHasRun <- struct{}{}
return testError
return errTest
}

w := NewPeriodicWorker(doWork, defaultPeriod, NewTimer)
defer func() { c.Assert(worker.Stop(w), gc.Equals, testError) }()
defer func() { c.Assert(worker.Stop(w), gc.Equals, errTest) }()
select {
case <-funcHasRun:
case <-time.After(testing.ShortWait):
c.Fatalf("The doWork function should have been called by now")
}
w.Kill()
c.Assert(w.Wait(), gc.Equals, testError)
c.Assert(w.Wait(), gc.Equals, errTest)
select {
case <-funcHasRun:
c.Fatalf("After the kill we don't expect anymore calls to the function")
Expand Down Expand Up @@ -146,7 +146,7 @@ func (s *periodicWorkerSuite) TestKill(c *gc.C) {
ExpectedValue error
}{
{nil, nil},
{testError, testError},
{errTest, errTest},
{ErrKilled, nil},
}

Expand Down
9 changes: 7 additions & 2 deletions internal/worker/simpleworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
package worker

import (
"context"

"github.com/juju/worker/v4"
"gopkg.in/tomb.v2"
)
Expand All @@ -16,10 +18,13 @@ type simpleWorker struct {
// NewSimpleWorker returns a worker that runs the given function. The
// stopCh argument will be closed when the worker is killed. The error returned
// by the doWork function will be returned by the worker's Wait function.
func NewSimpleWorker(doWork func(stopCh <-chan struct{}) error) worker.Worker {
func NewSimpleWorker(doWork func(context.Context) error) worker.Worker {
w := &simpleWorker{}
w.tomb.Go(func() error {
return doWork(w.tomb.Dying())
ctx, cancel := context.WithCancel(w.tomb.Context(context.Background()))
defer cancel()

return doWork(ctx)
})
return w
}
Expand Down
19 changes: 10 additions & 9 deletions internal/worker/simpleworker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package worker

import (
"context"
"errors"

gc "gopkg.in/check.v1"
Expand All @@ -17,19 +18,19 @@ type simpleWorkerSuite struct {

var _ = gc.Suite(&simpleWorkerSuite{})

var testError = errors.New("test error")
var errTest = errors.New("test error")

func (s *simpleWorkerSuite) TestWait(c *gc.C) {
doWork := func(_ <-chan struct{}) error {
return testError
doWork := func(context.Context) error {
return errTest
}

w := NewSimpleWorker(doWork)
c.Assert(w.Wait(), gc.Equals, testError)
c.Assert(w.Wait(), gc.Equals, errTest)
}

func (s *simpleWorkerSuite) TestWaitNil(c *gc.C) {
doWork := func(_ <-chan struct{}) error {
doWork := func(context.Context) error {
return nil
}

Expand All @@ -38,14 +39,14 @@ func (s *simpleWorkerSuite) TestWaitNil(c *gc.C) {
}

func (s *simpleWorkerSuite) TestKill(c *gc.C) {
doWork := func(stopCh <-chan struct{}) error {
<-stopCh
return testError
doWork := func(ctx context.Context) error {
<-ctx.Done()
return errTest
}

w := NewSimpleWorker(doWork)
w.Kill()
c.Assert(w.Wait(), gc.Equals, testError)
c.Assert(w.Wait(), gc.Equals, errTest)

// test we can kill again without a panic
w.Kill()
Expand Down
5 changes: 3 additions & 2 deletions internal/worker/undertaker/undertaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,11 +155,12 @@ func (u *Undertaker) run() (errOut error) {

// Watch for changes to model destroy values, if so, cancel the context
// and restart the worker.
err = u.catacomb.Add(worker.NewSimpleWorker(func(stopCh <-chan struct{}) error {
err = u.catacomb.Add(worker.NewSimpleWorker(func(ctx context.Context) error {
for {
select {
case <-stopCh:
case <-ctx.Done():
return nil

case <-modelWatcher.Changes():
result, err := u.config.Facade.ModelInfo()
if errors.Is(err, errors.NotFound) || err != nil || result.Error != nil {
Expand Down

0 comments on commit 4fb706d

Please sign in to comment.