Skip to content

Commit

Permalink
Merge pull request juju#16961 from Aflynn50/controlleragentconfig-socket
Browse files Browse the repository at this point in the history
juju#16961

<!-- Why this change is needed and what it does. -->
The current approach of using a SIGHUP signal sent by the controller charm to notify the `controlleragentconfig` worker of a config change only works on machines, not on kubernetes. This PR adds a new socket (`configchange.socket`) that can be used to request a config reload. A `POST` request can be sent to `configchange.socket` at the endpoint `/reload` which will be picked up by the `controlleragentconfig` worker and cause all watchers of the config to be restarted.

This change factors out the socket listener from the `controlsocket` worker to a new package `internal/socketlistener` which is then used in `controlsocket` worker and the `controlleragentconfig` worker.

The `controlsocket` package is refactored to move `handlers.go` and `handlers_test.go` into `worker.go` and `worker_test.go` (which accounts for a decent chunk of the diff of this PR). 



## Checklist

<!-- If an item is not applicable, use `~strikethrough~`. -->

- [x] Code style: imports ordered, good names, simple structure, etc
- [x] Comments saying why design decisions were made
- [x] Go unit tests, with comments saying what you're testing
- [x] [Integration tests](https://github.com/juju/juju/tree/main/tests), with comments saying what you're testing
- [x] [doc.go](https://discourse.charmhub.io/t/readme-in-packages/451) added or updated in changed packages

## QA steps
### Unit tests
```
go test github.com/juju/juju/internal/worker/controlleragentconfig -check.vv
go test github.com/juju/juju/internal/socketlistener -check.vv
go test github.com/juju/juju/internal/worker/controlsocket -check.vv 
```

### Manual
Manually check that the socket exists and that it responds. This should be checked on both machine and k8s clouds.
```
make microk8s-operator-update
juju bootstrap [cloud] microk8s
juju switch controller
juju ssh controller/0
stat /var/lib/juju/configchange.socket
```
Curl the socket (the socket is protected on machine clouds so `sudo` is needed):
```
(sudo) curl -i -X POST http://localhost/reload --unix-socket /var/lib/juju/configchange.socket
```
Now check in `juju debug-log -m controller` for:
```
INFO juju.worker.controlleragentconfig reload config request received, reloading config
``` 
### Integration
Get the modified version of the controller charm juju/juju-controller#67
- `charmcraft pack` this charm.
- Bootstrap a new LXD controller using `--controller-charm-path <path-to-packed-artefact>`.
- `juju enable-ha`.
- Once all units have joined the relation, ssh to the controller machine and check _/var/lib/juju/agents/controller-0/agent.conf_. It should be populated along the lines of:
```
db-bind-addresses:
 controller/0: 10.246.27.181
 controller/1: 10.246.27.53
 controller/2: 10.246.27.12
```
- `juju debug-log -m controller --replay|grep "controlleragentconfig reload config request received, reloading config"` indicate the config has reloaded for each controller

## Links

**Jira card:** JUJU-5453
  • Loading branch information
jujubot authored Mar 4, 2024
2 parents 1b1339d + a64887e commit 4bfe4ab
Show file tree
Hide file tree
Showing 14 changed files with 1,196 additions and 887 deletions.
15 changes: 9 additions & 6 deletions cmd/jujud-controller/agent/machine/manifolds.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,8 +360,10 @@ func commonManifolds(config ManifoldsConfig) dependency.Manifolds {
// Controller agent config manifold watches the controller
// agent config and bounces if it changes.
controllerAgentConfigName: ifController(controlleragentconfig.Manifold(controlleragentconfig.ManifoldConfig{
Clock: config.Clock,
Logger: loggo.GetLogger("juju.worker.controlleragentconfig"),
Clock: config.Clock,
Logger: loggo.GetLogger("juju.worker.controlleragentconfig"),
NewSocketListener: controlleragentconfig.NewSocketListener,
SocketName: paths.ConfigChangeSocket(paths.OSUnixLike),
})),

// The stateconfigwatcher manifold watches the machine agent's
Expand Down Expand Up @@ -820,10 +822,11 @@ func commonManifolds(config ManifoldsConfig) dependency.Manifolds {

// The controlsocket worker runs on the controller machine.
controlSocketName: ifController(controlsocket.Manifold(controlsocket.ManifoldConfig{
StateName: stateName,
Logger: loggo.GetLogger("juju.worker.controlsocket"),
NewWorker: controlsocket.NewWorker,
SocketName: paths.ControlSocket(paths.OSUnixLike),
StateName: stateName,
Logger: loggo.GetLogger("juju.worker.controlsocket"),
NewWorker: controlsocket.NewWorker,
NewSocketListener: controlsocket.NewSocketListener,
SocketName: paths.ControlSocket(paths.OSUnixLike),
})),

objectStoreName: ifController(objectstore.Manifold(objectstore.ManifoldConfig{
Expand Down
7 changes: 7 additions & 0 deletions core/paths/paths.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const (
curtinInstallConfig
transientDataDir
controlSocket
configChangeSocket
)

const (
Expand Down Expand Up @@ -68,6 +69,7 @@ var nixVals = map[osVarType]string{
cloudInitCfgDir: "/etc/cloud/cloud.cfg.d",
curtinInstallConfig: "/root/curtin-install-cfg.yaml",
controlSocket: "/var/lib/juju/control.socket",
configChangeSocket: "/var/lib/juju/configchange.socket",
}

var winVals = map[osVarType]string{
Expand Down Expand Up @@ -203,3 +205,8 @@ func CloudInitCfgDir(os OS) string {
func ControlSocket(os OS) string {
return osVal(os, controlSocket)
}

// ConfigChangeSocket returns the absolute path to the Juju config change socket.
func ConfigChangeSocket(os OS) string {
return osVal(os, configChangeSocket)
}
14 changes: 14 additions & 0 deletions internal/socketlistener/package_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright 2024 Canonical Ltd.
// Licensed under the AGPLv3, see LICENCE file for details.

package socketlistener_test

import (
"testing"

gc "gopkg.in/check.v1"
)

func Test(t *testing.T) {
gc.TestingT(t)
}
127 changes: 127 additions & 0 deletions internal/socketlistener/socketlistener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Copyright 2024 Canonical Ltd.
// Licensed under the AGPLv3, see LICENCE file for details.

// Package socketlistener provides a worker that will listen on a specified unix
// socket identified by a file descriptor. Handlers are provided to the worker
// that specify endpoints and define the action to be taken when they are
// reached.
package socketlistener

import (
"context"
"net"
"net/http"
"time"

"github.com/gorilla/mux"
"github.com/juju/errors"
"gopkg.in/tomb.v2"

"github.com/juju/juju/juju/sockets"
)

// Logger represents the methods used by the worker to log information.
type Logger interface {
Warningf(string, ...any)
Debugf(string, ...any)
}

// Config represents configuration for the socketlistener worker.
type Config struct {
Logger Logger
// SocketName is the socket file descriptor.
SocketName string
// RegisterHandlers should register handlers on the router with
// router.HandlerFunc or similar.
RegisterHandlers func(router *mux.Router)
// ShutdownTimeout is how long the socketlistener has to gracefully shutdown
// when Kill is called on the worker.
ShutdownTimeout time.Duration
}

// Validate returns an error if config cannot drive the SocketListener.
func (config Config) Validate() error {
if config.Logger == nil {
return errors.NotValidf("nil Logger")
}
if config.SocketName == "" {
return errors.NotValidf("empty SocketName")
}
if config.RegisterHandlers == nil {
return errors.NotValidf("nil RegisterHandlers func")
}
if config.ShutdownTimeout == 0 {
return errors.NotValidf("zero value for ShutdownTimeout")
}
return nil
}

// SocketListener is a socketlistener worker.
type SocketListener struct {
config Config
tomb tomb.Tomb
listener net.Listener
shutdownTimeout time.Duration
}

// NewSocketListener returns a socketlistener with the given config.
func NewSocketListener(config Config) (*SocketListener, error) {
if err := config.Validate(); err != nil {
return nil, errors.Trace(err)
}

l, err := sockets.Listen(sockets.Socket{
Address: config.SocketName,
Network: "unix",
})
if err != nil {
return nil, errors.Annotate(err, "unable to listen on unix socket")
}
config.Logger.Debugf("socketlistener listening on socket %q", config.SocketName)

sl := &SocketListener{
config: config,
listener: l,
}
sl.tomb.Go(sl.run)
return sl, nil
}

// Kill is part of the Worker.worker interface.
func (sl *SocketListener) Kill() {
sl.tomb.Kill(nil)
}

// Wait is part of the Worker.worker interface.
func (sl *SocketListener) Wait() error {
return sl.tomb.Wait()
}

// run listens on the control socket and handles requests.
func (sl *SocketListener) run() error {
router := mux.NewRouter()
sl.config.RegisterHandlers(router)

srv := http.Server{Handler: router}
defer func() {
err := srv.Close()
if err != nil {
sl.config.Logger.Warningf("error closing HTTP server: %v", err)
}
}()

sl.tomb.Go(func() error {
// Wait for the tomb to start dying and then shut the server down.
<-sl.tomb.Dying()
ctx, cancel := context.WithTimeout(context.Background(), sl.shutdownTimeout)
defer cancel()
return srv.Shutdown(ctx)
})

sl.config.Logger.Debugf("socketlistener now serving on socket %q", sl.config.SocketName)
defer sl.config.Logger.Debugf("socketlistener finished serving on socket %q", sl.config.SocketName)
if err := srv.Serve(sl.listener); !errors.Is(err, http.ErrServerClosed) {
return err
}
return nil
}
166 changes: 166 additions & 0 deletions internal/socketlistener/socketlistener_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
// Copyright 2024 Canonical Ltd.
// Licensed under the AGPLv3, see LICENCE file for details.

package socketlistener_test

import (
"context"
"fmt"
"io/fs"
"net"
"net/http"
"os"
"path"
"time"

"github.com/gorilla/mux"
jc "github.com/juju/testing/checkers"
"github.com/juju/worker/v4/workertest"
gc "gopkg.in/check.v1"

coretesting "github.com/juju/juju/core/testing"
"github.com/juju/juju/internal/socketlistener"
)

type socketListenerSuite struct {
logger *fakeLogger
}

var _ = gc.Suite(&socketListenerSuite{})

func (s *socketListenerSuite) SetUpTest(c *gc.C) {
s.logger = &fakeLogger{}
}

func handleTestEndpoint1(resp http.ResponseWriter, req *http.Request) {
resp.WriteHeader(http.StatusOK)
}

func registerTestHandlers(r *mux.Router) {
r.HandleFunc("/test-endpoint", handleTestEndpoint1).
Methods(http.MethodGet)
}

func (s *socketListenerSuite) TestStartStopWorker(c *gc.C) {
tmpDir := c.MkDir()
socket := path.Join(tmpDir, "test.socket")

sl, err := socketlistener.NewSocketListener(socketlistener.Config{
Logger: s.logger,
SocketName: socket,
RegisterHandlers: registerTestHandlers,
ShutdownTimeout: coretesting.LongWait,
})
c.Assert(err, jc.ErrorIsNil)

// Check socket is created with correct permissions.
fi, err := os.Stat(socket)
c.Assert(err, jc.ErrorIsNil)
c.Assert(fi.Mode(), gc.Equals, fs.ModeSocket|0700)

// Check server is up.
cl := client(socket)
resp, err := cl.Get("http://localhost:8080/foo")
c.Assert(err, jc.ErrorIsNil)
c.Assert(resp.StatusCode, gc.Equals, http.StatusNotFound)

// Check server is serving.
cl = client(socket)
resp, err = cl.Get("http://localhost:8080/test-endpoint")
c.Assert(err, jc.ErrorIsNil)
c.Assert(resp.StatusCode, gc.Equals, http.StatusOK)

sl.Kill()
err = sl.Wait()
c.Assert(err, jc.ErrorIsNil)

// Check server has stopped.
resp, err = cl.Get("http://localhost:8080/foo")
c.Assert(err, gc.ErrorMatches, ".*connection refused")

// No warnings/errors should have been logged.
for _, entry := range s.logger.entries {
if entry.level == "ERROR" || entry.level == "WARNING" {
c.Errorf("%s: %s", entry.level, entry.msg)
}
}
}

// TestEnsureShutdown checks that a slow handler does not return an error if the
// socket listener is shutdown as it handles.
func (s *socketListenerSuite) TestEnsureShutdown(c *gc.C) {
tmpDir := c.MkDir()
socket := path.Join(tmpDir, "test.socket")

start := make(chan struct{})
sl, err := socketlistener.NewSocketListener(socketlistener.Config{
Logger: s.logger,
SocketName: socket,
RegisterHandlers: func(r *mux.Router) {
r.HandleFunc("/slow-handler", func(resp http.ResponseWriter, req *http.Request) {
// Signal that the handler has started.
close(start)
time.Sleep(time.Second)
resp.WriteHeader(http.StatusOK)
}).Methods(http.MethodGet)
},
ShutdownTimeout: coretesting.LongWait,
})
c.Assert(err, jc.ErrorIsNil)
defer workertest.DirtyKill(c, sl)
done := make(chan struct{})
go func() {
// Send request to slow handler and ensure it does not return error,
// even though server is shut down as soon as it starts.
cl := client(socket)
_, err := cl.Get("http://localhost:8080/slow-handler")
c.Assert(err, jc.ErrorIsNil)
}()

go func() {
// Kill socket listener once handler has started.
select {
case <-start:
case <-time.After(coretesting.ShortWait):
c.Errorf("took too long to start")
}
workertest.CleanKill(c, sl)
close(done)
}()
// Wait for server to cleanly shutdown
select {
case <-done:
case <-time.After(coretesting.LongWait):
c.Errorf("took too long to finish")
}
}

// Return an *http.Client with custom transport that allows it to connect to
// the given Unix socket.
func client(socketPath string) *http.Client {
return &http.Client{
Transport: &http.Transport{
DialContext: func(_ context.Context, _, _ string) (conn net.Conn, err error) {
return net.Dial("unix", socketPath)
},
},
}
}

type fakeLogger struct {
entries []logEntry
}

type logEntry struct{ level, msg string }

func (f *fakeLogger) write(level string, format string, args ...any) {
f.entries = append(f.entries, logEntry{level, fmt.Sprintf(format, args...)})
}

func (f *fakeLogger) Warningf(format string, args ...any) {
f.write("WARNING", format, args...)
}

func (f *fakeLogger) Debugf(format string, args ...any) {
f.write("DEBUG", format, args...)
}
Loading

0 comments on commit 4bfe4ab

Please sign in to comment.