From e59278653c1a1cd2c3d92d28f345b1fd07cfaff0 Mon Sep 17 00:00:00 2001 From: michaeljguarino Date: Wed, 25 Oct 2023 04:41:05 -0400 Subject: [PATCH] Implement websocket notifications to agent (#68) Needed to fork and fix the underlying phoenix websocket transport library, but this can technically flake and the system still function fine, so no big deal. Have tested and verified: * socket is established * manual publish transmits to the worker near-instantly --- cmd/main.go | 2 +- go.mod | 9 +++-- go.sum | 15 ++++++++ pkg/agent/agent.go | 17 ++++++++- pkg/websocket/socket.go | 76 +++++++++++++++++++++++++++++++++++++++++ 5 files changed, 115 insertions(+), 4 deletions(-) create mode 100644 pkg/websocket/socket.go diff --git a/cmd/main.go b/cmd/main.go index e5ad6e45..e9f02f72 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -72,7 +72,7 @@ func main() { os.Exit(1) } - a, err := agent.New(mgr.GetConfig(), refresh, consoleUrl, deployToken) + a, err := agent.New(mgr.GetConfig(), refresh, consoleUrl, deployToken, clusterId) if err != nil { setupLog.Error(err, "unable to create agent") os.Exit(1) diff --git a/go.mod b/go.mod index a15a5ffe..288a059b 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/alitto/pond v1.8.3 github.com/orcaman/concurrent-map/v2 v2.0.1 github.com/osteele/liquid v1.3.2 - github.com/pluralsh/console-client-go v0.0.18 + github.com/pluralsh/console-client-go v0.0.21 github.com/pluralsh/polly v0.1.4 github.com/samber/lo v1.38.1 github.com/spf13/pflag v1.0.5 @@ -54,13 +54,17 @@ require ( github.com/google/gofuzz v1.1.0 // indirect github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect github.com/google/uuid v1.3.0 // indirect + github.com/gorilla/websocket v1.5.0 // indirect github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7 // indirect github.com/huandu/xstrings v1.3.3 // indirect github.com/imdario/mergo v0.3.12 // indirect github.com/inconshreveable/mousetrap v1.0.1 // indirect github.com/jonboulle/clockwork v0.2.2 // indirect github.com/josharian/intern v1.0.0 // indirect + github.com/jpillora/backoff v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect + github.com/kataras/golog v0.1.9 // indirect + github.com/kataras/pio v0.0.12 // indirect github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de // indirect github.com/mailru/easyjson v0.7.7 // indirect github.com/mattn/go-runewidth v0.0.13 // indirect @@ -78,6 +82,7 @@ require ( github.com/osteele/tuesday v1.0.3 // indirect github.com/peterbourgon/diskv v2.0.1+incompatible // indirect github.com/pkg/errors v0.9.1 // indirect + github.com/pluralsh/gophoenix v0.1.3-0.20231024165338-04291b4de463 // indirect github.com/prometheus/client_golang v1.14.0 // indirect github.com/prometheus/client_model v0.3.0 // indirect github.com/prometheus/common v0.37.0 // indirect @@ -98,7 +103,7 @@ require ( golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 // indirect golang.org/x/net v0.4.0 // indirect golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b // indirect - golang.org/x/sys v0.6.0 // indirect + golang.org/x/sys v0.9.0 // indirect golang.org/x/term v0.3.0 // indirect golang.org/x/text v0.8.0 // indirect golang.org/x/time v0.3.0 // indirect diff --git a/go.sum b/go.sum index 7d4ba5bc..6f7c222c 100644 --- a/go.sum +++ b/go.sum @@ -201,6 +201,8 @@ github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+ github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7 h1:pdN6V1QBWetyv/0+wjACpqVH+eVULgEjkurDLq3goeM= github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= @@ -218,6 +220,7 @@ github.com/jonboulle/clockwork v0.2.2 h1:UOGuzwb1PwsrDAObMuhUnj0p5ULPj8V/xJ7Kx9q github.com/jonboulle/clockwork v0.2.2/go.mod h1:Pkfl5aHPm1nk2H9h0bjmnJD/BcgbGXUBGnn1kMkgxc8= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= +github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA= github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= @@ -229,6 +232,10 @@ github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/X github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/k0kubun/go-ansi v0.0.0-20180517002512-3bf9e2903213/go.mod h1:vNUNkEQ1e29fT/6vq2aBdFsgNPmy8qMdSay1npru+Sw= +github.com/kataras/golog v0.1.9 h1:vLvSDpP7kihFGKFAvBSofYo7qZNULYSHOH2D7rPTKJk= +github.com/kataras/golog v0.1.9/go.mod h1:jlpk/bOaYCyqDqH18pgDHdaJab72yBE6i0O3s30hpWY= +github.com/kataras/pio v0.0.12 h1:o52SfVYauS3J5X08fNjlGS5arXHjW/ItLkyLcKjoH6w= +github.com/kataras/pio v0.0.12/go.mod h1:ODK/8XBhhQ5WqrAhKy+9lTPS7sBf6O3KcLhc9klfRcY= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= @@ -296,6 +303,12 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pluralsh/console-client-go v0.0.18 h1:GhfThwExfyQbU+NjMLdxH0KPuVJjdsI5NdK0cdTcakA= github.com/pluralsh/console-client-go v0.0.18/go.mod h1:kZjk0pXAWnvyj+miXveCho4kKQaX1Tm3CGAM+iwurWU= +github.com/pluralsh/console-client-go v0.0.21 h1:CacHCqes793bOJbE5pFj3q+P9mSUevPk/IjbI/lF6tw= +github.com/pluralsh/console-client-go v0.0.21/go.mod h1:kZjk0pXAWnvyj+miXveCho4kKQaX1Tm3CGAM+iwurWU= +github.com/pluralsh/gophoenix v0.1.3-0.20231024044735-d0f772170497 h1:FIrclcMZV27uUoOWGq7bBSi8cDTbJrCzoXRZklnnET8= +github.com/pluralsh/gophoenix v0.1.3-0.20231024044735-d0f772170497/go.mod h1:IagWXKFYu6NTHzcJx2dJyrIlZ1Sv2PH3fhOtplA9qOs= +github.com/pluralsh/gophoenix v0.1.3-0.20231024165338-04291b4de463 h1:PtXJG2hM73c98bqYAaFNtY+eJ1RupEArHSarNzi3Hek= +github.com/pluralsh/gophoenix v0.1.3-0.20231024165338-04291b4de463/go.mod h1:IagWXKFYu6NTHzcJx2dJyrIlZ1Sv2PH3fhOtplA9qOs= github.com/pluralsh/polly v0.1.4 h1:Kz90peCgvsfF3ERt8cujr5TR9z4wUlqQE60Eg09ZItY= github.com/pluralsh/polly v0.1.4/go.mod h1:Yo1/jcW+4xwhWG+ZJikZy4J4HJkMNPZ7sq5auL2c/tY= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -538,6 +551,8 @@ golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.9.0 h1:KS/R3tvhPqvJvwcKfnBHJwwthS11LRhmM5D59eEXa0s= +golang.org/x/sys v0.9.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.2.0/go.mod h1:TVmDHMZPmdnySmBfhjOoOdhjzdE1h4u1VwSiw2l1Nuc= diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index dce5b48f..baa433b5 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -8,6 +8,7 @@ import ( "github.com/pluralsh/deployment-operator/pkg/client" "github.com/pluralsh/deployment-operator/pkg/manifests" deploysync "github.com/pluralsh/deployment-operator/pkg/sync" + "github.com/pluralsh/deployment-operator/pkg/websocket" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/cli-runtime/pkg/genericclioptions" "k8s.io/client-go/discovery" @@ -29,10 +30,11 @@ type Agent struct { engine *deploysync.Engine deathChan chan interface{} svcQueue workqueue.RateLimitingInterface + socket *websocket.Socket refresh time.Duration } -func New(config *rest.Config, refresh time.Duration, consoleUrl, deployToken string) (*Agent, error) { +func New(config *rest.Config, refresh time.Duration, consoleUrl, deployToken, clusterId string) (*Agent, error) { dc, err := discovery.NewDiscoveryClientForConfig(config) if err != nil { return nil, err @@ -45,6 +47,14 @@ func New(config *rest.Config, refresh time.Duration, consoleUrl, deployToken str deathChan := make(chan interface{}) invFactory := inventory.ClusterClientFactory{StatusPolicy: inventory.StatusPolicyNone} + socket, err := websocket.New(clusterId, consoleUrl, deployToken, svcQueue) + if err != nil { + if socket == nil { + return nil, err + } + log.Error(err, "could not initiate websocket connection, ignoring and falling back to polling") + } + f := newFactory(config) applier, err := newApplier(invFactory, f) @@ -60,6 +70,7 @@ func New(config *rest.Config, refresh time.Duration, consoleUrl, deployToken str engine: engine, deathChan: deathChan, svcQueue: svcQueue, + socket: socket, refresh: refresh, }, nil } @@ -75,6 +86,10 @@ func (agent *Agent) Run() { } }() + if err := agent.socket.Join(); err != nil { + log.Error(err, "could not establish websocket to upstream") + } + err := wait.PollInfinite(agent.refresh, func() (done bool, err error) { log.Info("fetching services for cluster") svcs, err := agent.consoleClient.GetServices() diff --git a/pkg/websocket/socket.go b/pkg/websocket/socket.go new file mode 100644 index 00000000..392ab031 --- /dev/null +++ b/pkg/websocket/socket.go @@ -0,0 +1,76 @@ +package websocket + +import ( + "fmt" + "net/http" + "net/url" + + phx "github.com/pluralsh/gophoenix" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2/klogr" +) + +var ( + log = klogr.New() +) + +type Socket struct { + clusterId string + client *phx.Client + svcQueue workqueue.RateLimitingInterface + channel *phx.Channel +} + +func New(clusterId, consoleUrl, deployToken string, svcQueue workqueue.RateLimitingInterface) (*Socket, error) { + socket := &Socket{svcQueue: svcQueue, clusterId: clusterId} + client := phx.NewClient(socket) + + uri, err := wssUri(consoleUrl, deployToken) + if err != nil { + return nil, err + } + + err = client.Connect(*uri, http.Header{}) + socket.client = client + return socket, err +} + +func (s *Socket) Join() error { + channel, err := s.client.Join(s, fmt.Sprintf("cluster:%s", s.clusterId), map[string]string{}) + s.channel = channel + return err +} + +func wssUri(consoleUrl, deployToken string) (*url.URL, error) { + uri, err := url.Parse(consoleUrl) + if err != nil { + return nil, err + } + wssUrl := fmt.Sprintf("wss://%s/ext/socket/websocket", uri.Host) + values, err := url.ParseQuery("vsn=2.0.0") + if err != nil { + return nil, err + } + + values.Add("token", deployToken) + finalUrl := fmt.Sprintf("%s?%s", wssUrl, values.Encode()) + return uri.Parse(finalUrl) +} + +func (s *Socket) NotifyConnect() {} +func (s *Socket) NotifyDisconnect() {} + +// implement ChannelReceiver +func (s *Socket) OnJoin(payload interface{}) {} +func (s *Socket) OnJoinError(payload interface{}) {} +func (s *Socket) OnChannelClose(payload interface{}, joinRef int64) {} +func (s *Socket) OnMessage(ref int64, event string, payload interface{}) { + if event == "service.event" { + if parsed, ok := payload.(map[string]interface{}); ok { + if id, ok := parsed["id"].(string); ok { + log.Info("got new service update from websocket", "id", id) + s.svcQueue.Add(id) + } + } + } +}