Skip to content

Commit

Permalink
Merge pull request juju#17122 from SimonRichardson/eventsource-multiw…
Browse files Browse the repository at this point in the history
…atcher

juju#17122

The multiwatcher was originally a notifywatcher, so it emitted changes
for type struct{}. This isn't required, we could just generalize it for type T. 
This moves the multi-watcher from core watcher to event source. 
Eventually, everything in `core/watcher` will be replaced with 
`core/watcher/eventsource` (just not yet).

All these places used a variation of the apiserver common multi-watcher
or the core multiwatcher. By moving to the eventsource version, we're
consistently the same throughout.

----

We need to decide if we coalesce events at the watcher level. I've
currently left that out, but is simple enough to add it back. We already
coalesce events at the database level, then doing additional coalescing
at the watcher level seems overkill. Although I understand why it's in
the current location. For example, if an implementations expect 2 
change events then that could be baked into the code directly. By
stating at least 1 will be received, implementations will likely be
more robust (chaos monkey if you will).

## Checklist

- [x] Code style: imports ordered, good names, simple structure, etc
- [x] Comments saying why design decisions were made
- [x] Go unit tests, with comments saying what you're testing

## QA steps

This tests a lot of aspects of juju, so ensure a bootstrap works and keeping
an eye out for the integration tests will be required.

```sh
$ juju bootstrap lxd test
$ juju add-model default
$ juju deploy ubuntu
```

## Links

**Jira card:** JUJU-
  • Loading branch information
jujubot authored Apr 3, 2024
2 parents f0a9698 + 0c1898f commit 7019749
Show file tree
Hide file tree
Showing 28 changed files with 378 additions and 390 deletions.
4 changes: 2 additions & 2 deletions api/controller/caasmodeloperator/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ func (c *Client) SetPassword(password string) error {

// WatchModelOperatorProvisioningInfo provides a watcher for changes that affect the
// information returned by ModelOperatorProvisioningInfo.
func (c *Client) WatchModelOperatorProvisioningInfo() (watcher.NotifyWatcher, error) {
func (c *Client) WatchModelOperatorProvisioningInfo(ctx context.Context) (watcher.NotifyWatcher, error) {
var result params.NotifyWatchResult
if err := c.facade.FacadeCall(context.TODO(), "WatchModelOperatorProvisioningInfo", nil, &result); err != nil {
if err := c.facade.FacadeCall(ctx, "WatchModelOperatorProvisioningInfo", nil, &result); err != nil {
return nil, err
}
if result.Error != nil {
Expand Down
37 changes: 24 additions & 13 deletions apiserver/common/cloudspec/cloudspec.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ import (
"github.com/juju/juju/apiserver/common"
apiservererrors "github.com/juju/juju/apiserver/errors"
"github.com/juju/juju/apiserver/facade"
"github.com/juju/juju/apiserver/internal"
corewatcher "github.com/juju/juju/core/watcher"
"github.com/juju/juju/core/watcher/eventsource"
environscloudspec "github.com/juju/juju/environs/cloudspec"
"github.com/juju/juju/rpc/params"
"github.com/juju/juju/state"
"github.com/juju/juju/state/watcher"
)

// CloudSpecer defines the CloudSpec api interface
Expand Down Expand Up @@ -199,22 +200,32 @@ func (s CloudSpecAPI) watchCloudSpecChanges(ctx context.Context, tag names.Model
if err != nil {
return result, errors.Trace(err)
}
var watch *common.MultiNotifyWatcher
var watcher eventsource.Watcher[struct{}]
if credentialContentWatch != nil {
watch = common.NewMultiNotifyWatcher(&watcherAdaptor{cloudWatch}, credentialReferenceWatch, &watcherAdaptor{credentialContentWatch})
watcher, err = eventsource.NewMultiNotifyWatcher(ctx,
&watcherAdaptor{NotifyWatcher: cloudWatch},
credentialReferenceWatch,
&watcherAdaptor{NotifyWatcher: credentialContentWatch},
)
} else {
// It's rare but possible that a model does not have a credential.
// In this case there is no point trying to 'watch' content changes.
watch = common.NewMultiNotifyWatcher(&watcherAdaptor{cloudWatch}, credentialReferenceWatch)
}
// Consume the initial event. Technically, API
// calls to Watch 'transmit' the initial event
// in the Watch response. But NotifyWatchers
// have no state to transmit.
if _, ok := <-watch.Changes(); ok {
result.NotifyWatcherId = s.resources.Register(watch)
} else {
return result, watcher.EnsureErr(watch)
watcher, err = eventsource.NewMultiNotifyWatcher(ctx,
&watcherAdaptor{NotifyWatcher: cloudWatch},
credentialReferenceWatch,
)
}
if err != nil {
return result, errors.Trace(err)
}
// Consume the initial result for the API.
_, err = internal.FirstResult[struct{}](ctx, watcher)
if err != nil {
return result, errors.Trace(err)
}

// Ensure we register the watcher, once we know it's working.
result.NotifyWatcherId = s.resources.Register(watcher)

return result, nil
}
109 changes: 0 additions & 109 deletions apiserver/common/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,9 @@ package common

import (
"context"
"sync"
"time"

"github.com/juju/errors"
"github.com/juju/names/v5"
"gopkg.in/tomb.v2"

apiservererrors "github.com/juju/juju/apiserver/errors"
"github.com/juju/juju/apiserver/facade"
Expand Down Expand Up @@ -86,109 +83,3 @@ func (a *AgentEntityWatcher) Watch(ctx context.Context, args params.Entities) (p
}
return result, nil
}

// MultiNotifyWatcher implements state.NotifyWatcher, combining
// multiple NotifyWatchers.
type MultiNotifyWatcher struct {
tomb tomb.Tomb
watchers []state.NotifyWatcher
changes chan struct{}
}

// NewMultiNotifyWatcher creates a NotifyWatcher that combines
// each of the NotifyWatchers passed in. Each watcher's initial
// event is consumed, and a single initial event is sent.
// Subsequent events are not coalesced.
func NewMultiNotifyWatcher(w ...state.NotifyWatcher) *MultiNotifyWatcher {
m := &MultiNotifyWatcher{
watchers: w,
changes: make(chan struct{}),
}
var wg sync.WaitGroup
wg.Add(len(w))
staging := make(chan struct{})
for _, w := range w {
// Consume the first event of each watcher.
<-w.Changes()
go func(wCopy state.NotifyWatcher) {
defer wg.Done()
_ = wCopy.Wait()
}(w)
// Copy events from the watcher to the staging channel.
go copyEvents(staging, w.Changes(), &m.tomb)
}
m.tomb.Go(func() error {
m.loop(staging)
wg.Wait()
return nil
})
return m
}

// loop copies events from the input channel to the output channel,
// coalescing events by waiting a short time between receiving and
// sending.
func (w *MultiNotifyWatcher) loop(in <-chan struct{}) {
defer close(w.changes)
// out is initialised to m.changes to send the initial event.
out := w.changes
var timer <-chan time.Time
for {
select {
case <-w.tomb.Dying():
return
case <-in:
if timer == nil {
// TODO(fwereade): 2016-03-17 lp:1558657
timer = time.After(10 * time.Millisecond)
}
case <-timer:
timer = nil
out = w.changes
case out <- struct{}{}:
out = nil
}
}
}

// copyEvents copies channel events from "in" to "out", coalescing.
func copyEvents(out chan<- struct{}, in <-chan struct{}, tomb *tomb.Tomb) {
var outC chan<- struct{}
for {
select {
case <-tomb.Dying():
return
case _, ok := <-in:
if !ok {
return
}
outC = out
case outC <- struct{}{}:
outC = nil
}
}
}

func (w *MultiNotifyWatcher) Kill() {
w.tomb.Kill(nil)
for _, w := range w.watchers {
w.Kill()
}
}

func (w *MultiNotifyWatcher) Wait() error {
return w.tomb.Wait()
}

func (w *MultiNotifyWatcher) Stop() error {
w.Kill()
return w.Wait()
}

func (w *MultiNotifyWatcher) Err() error {
return w.tomb.Err()
}

func (w *MultiNotifyWatcher) Changes() <-chan struct{} {
return w.changes
}
37 changes: 0 additions & 37 deletions apiserver/common/watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,12 @@ import (

"github.com/juju/names/v5"
jc "github.com/juju/testing/checkers"
"github.com/juju/worker/v4/workertest"
gc "gopkg.in/check.v1"

"github.com/juju/juju/apiserver/common"
apiservertesting "github.com/juju/juju/apiserver/testing"
"github.com/juju/juju/rpc/params"
"github.com/juju/juju/state"
statetesting "github.com/juju/juju/state/testing"
)

type agentEntityWatcherSuite struct{}
Expand Down Expand Up @@ -92,38 +90,3 @@ func (*agentEntityWatcherSuite) TestWatchNoArgsNoError(c *gc.C) {
c.Assert(err, jc.ErrorIsNil)
c.Assert(result.Results, gc.HasLen, 0)
}

type multiNotifyWatcherSuite struct{}

var _ = gc.Suite(&multiNotifyWatcherSuite{})

func (*multiNotifyWatcherSuite) TestMultiNotifyWatcher(c *gc.C) {
w0 := apiservertesting.NewFakeNotifyWatcher()
w1 := apiservertesting.NewFakeNotifyWatcher()

mw := common.NewMultiNotifyWatcher(w0, w1)
defer workertest.CleanKill(c, mw)

wc := statetesting.NewNotifyWatcherC(c, mw)
wc.AssertOneChange()

w0.C <- struct{}{}
wc.AssertOneChange()
w1.C <- struct{}{}
wc.AssertOneChange()

w0.C <- struct{}{}
w1.C <- struct{}{}
wc.AssertOneChange()
}

func (*multiNotifyWatcherSuite) TestMultiNotifyWatcherStop(c *gc.C) {
w0 := apiservertesting.NewFakeNotifyWatcher()
w1 := apiservertesting.NewFakeNotifyWatcher()

mw := common.NewMultiNotifyWatcher(w0, w1)
wc := statetesting.NewNotifyWatcherC(c, mw)
wc.AssertOneChange()
statetesting.AssertCanStopWhenSending(c, mw)
wc.AssertClosed()
}
28 changes: 17 additions & 11 deletions apiserver/facades/agent/proxyupdater/proxyupdater.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ import (
"github.com/juju/names/v5"
"github.com/juju/proxy"

"github.com/juju/juju/apiserver/common"
apiservererrors "github.com/juju/juju/apiserver/errors"
"github.com/juju/juju/apiserver/facade"
"github.com/juju/juju/apiserver/internal"
"github.com/juju/juju/controller"
"github.com/juju/juju/core/network"
"github.com/juju/juju/core/watcher/eventsource"
"github.com/juju/juju/environs/config"
"github.com/juju/juju/rpc/params"
"github.com/juju/juju/state"
"github.com/juju/juju/state/watcher"
)

// ProxyUpdaterV2 defines the public methods for the v2 facade.
Expand Down Expand Up @@ -88,18 +88,24 @@ func NewAPIV2(controller ControllerBackend, backend Backend, resources facade.Re
func (api *API) oneWatch(ctx context.Context) params.NotifyWatchResult {
var result params.NotifyWatchResult

watch := common.NewMultiNotifyWatcher(
watch, err := eventsource.NewMultiNotifyWatcher(ctx,
api.backend.WatchForModelConfigChanges(),
api.controller.WatchAPIHostPortsForAgents())
api.controller.WatchAPIHostPortsForAgents(),
)
if err != nil {
result.Error = apiservererrors.ServerError(err)
return result
}

if _, ok := <-watch.Changes(); ok {
result = params.NotifyWatchResult{
NotifyWatcherId: api.resources.Register(watch),
}
} else {
result.Error = apiservererrors.ServerError(watcher.EnsureErr(watch))
_, err = internal.FirstResult[struct{}](ctx, watch)
if err != nil {
result.Error = apiservererrors.ServerError(err)
return result
}

return params.NotifyWatchResult{
NotifyWatcherId: api.resources.Register(watch),
}
return result
}

// WatchForProxyConfigAndAPIHostPortChanges watches for changes to the proxy and api host port settings.
Expand Down
24 changes: 14 additions & 10 deletions apiserver/facades/agent/uniter/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ import (
"github.com/juju/juju/apiserver/common/storagecommon"
apiservererrors "github.com/juju/juju/apiserver/errors"
"github.com/juju/juju/apiserver/facade"
"github.com/juju/juju/apiserver/internal"
"github.com/juju/juju/core/life"
corewatcher "github.com/juju/juju/core/watcher"
"github.com/juju/juju/core/watcher/eventsource"
"github.com/juju/juju/rpc/params"
"github.com/juju/juju/state"
"github.com/juju/juju/state/watcher"
Expand Down Expand Up @@ -307,12 +309,14 @@ func (s *StorageAPI) watchOneStorageAttachment(ctx context.Context, id params.St
if err != nil {
return nothing, errors.Trace(err)
}
if _, ok := <-watch.Changes(); ok {
return params.NotifyWatchResult{
NotifyWatcherId: s.resources.Register(watch),
}, nil

if _, err := internal.FirstResult[struct{}](ctx, watch); err != nil {
return nothing, errors.Trace(err)
}
return nothing, watcher.EnsureErr(watch)

return params.NotifyWatchResult{
NotifyWatcherId: s.resources.Register(watch),
}, nil
}

// RemoveStorageAttachments removes the specified storage
Expand Down Expand Up @@ -407,12 +411,12 @@ func watchStorageAttachment(
storageTag names.StorageTag,
hostTag names.Tag,
unitTag names.UnitTag,
) (state.NotifyWatcher, error) {
) (eventsource.Watcher[struct{}], error) {
storageInstance, err := st.StorageInstance(storageTag)
if err != nil {
return nil, errors.Annotate(err, "getting storage instance")
}
var watchers []state.NotifyWatcher
var watchers []eventsource.Watcher[struct{}]
switch storageInstance.Kind() {
case state.StorageKindBlock:
if stVolume == nil {
Expand All @@ -425,7 +429,7 @@ func watchStorageAttachment(
// We need to watch both the volume attachment, and the
// machine's block devices. A volume attachment's block
// device could change (most likely, become present).
watchers = []state.NotifyWatcher{
watchers = []eventsource.Watcher[struct{}]{
stVolume.WatchVolumeAttachment(hostTag, volume.VolumeTag()),
}

Expand All @@ -452,14 +456,14 @@ func watchStorageAttachment(
if err != nil {
return nil, errors.Annotate(err, "getting storage filesystem")
}
watchers = []state.NotifyWatcher{
watchers = []eventsource.Watcher[struct{}]{
stFile.WatchFilesystemAttachment(hostTag, filesystem.FilesystemTag()),
}
default:
return nil, errors.Errorf("invalid storage kind %v", storageInstance.Kind())
}
watchers = append(watchers, st.WatchStorageAttachment(storageTag, unitTag))
return common.NewMultiNotifyWatcher(watchers...), nil
return eventsource.NewMultiNotifyWatcher(ctx, watchers...)
}

// watcherAdaptor adapts a core watcher to a state watcher.
Expand Down
4 changes: 2 additions & 2 deletions apiserver/facades/agent/uniter/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ import (
"github.com/juju/juju/apiserver/facades/agent/uniter"
apiservertesting "github.com/juju/juju/apiserver/testing"
"github.com/juju/juju/core/watcher"
"github.com/juju/juju/core/watcher/watchertest"
"github.com/juju/juju/rpc/params"
"github.com/juju/juju/state"
statetesting "github.com/juju/juju/state/testing"
"github.com/juju/juju/testing"
)

Expand Down Expand Up @@ -578,7 +578,7 @@ func (s *watchStorageAttachmentSuite) testWatchStorageAttachment(c *gc.C, change
s.unitTag,
)
c.Assert(err, jc.ErrorIsNil)
wc := statetesting.NewNotifyWatcherC(c, w)
wc := watchertest.NewNotifyWatcherC(c, w)
wc.AssertOneChange()
change()
wc.AssertOneChange()
Expand Down
Loading

0 comments on commit 7019749

Please sign in to comment.