Skip to content

Commit

Permalink
Implement websocket notifications to agent (#68)
Browse files Browse the repository at this point in the history
Needed to fork and fix the underlying phoenix websocket transport library, but this can technically flake and the system still function fine, so no big deal.  Have tested and verified:

* socket is established
* manual publish transmits to the worker near-instantly
  • Loading branch information
michaeljguarino authored Oct 25, 2023
1 parent 1b87a4f commit e592786
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 4 deletions.
2 changes: 1 addition & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func main() {
os.Exit(1)
}

a, err := agent.New(mgr.GetConfig(), refresh, consoleUrl, deployToken)
a, err := agent.New(mgr.GetConfig(), refresh, consoleUrl, deployToken, clusterId)
if err != nil {
setupLog.Error(err, "unable to create agent")
os.Exit(1)
Expand Down
9 changes: 7 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/alitto/pond v1.8.3
github.com/orcaman/concurrent-map/v2 v2.0.1
github.com/osteele/liquid v1.3.2
github.com/pluralsh/console-client-go v0.0.18
github.com/pluralsh/console-client-go v0.0.21
github.com/pluralsh/polly v0.1.4
github.com/samber/lo v1.38.1
github.com/spf13/pflag v1.0.5
Expand Down Expand Up @@ -54,13 +54,17 @@ require (
github.com/google/gofuzz v1.1.0 // indirect
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7 // indirect
github.com/huandu/xstrings v1.3.3 // indirect
github.com/imdario/mergo v0.3.12 // indirect
github.com/inconshreveable/mousetrap v1.0.1 // indirect
github.com/jonboulle/clockwork v0.2.2 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/jpillora/backoff v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/kataras/golog v0.1.9 // indirect
github.com/kataras/pio v0.0.12 // indirect
github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-runewidth v0.0.13 // indirect
Expand All @@ -78,6 +82,7 @@ require (
github.com/osteele/tuesday v1.0.3 // indirect
github.com/peterbourgon/diskv v2.0.1+incompatible // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pluralsh/gophoenix v0.1.3-0.20231024165338-04291b4de463 // indirect
github.com/prometheus/client_golang v1.14.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
Expand All @@ -98,7 +103,7 @@ require (
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 // indirect
golang.org/x/net v0.4.0 // indirect
golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b // indirect
golang.org/x/sys v0.6.0 // indirect
golang.org/x/sys v0.9.0 // indirect
golang.org/x/term v0.3.0 // indirect
golang.org/x/text v0.8.0 // indirect
golang.org/x/time v0.3.0 // indirect
Expand Down
15 changes: 15 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,8 @@ github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7 h1:pdN6V1QBWetyv/0+wjACpqVH+eVULgEjkurDLq3goeM=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
Expand All @@ -218,6 +220,7 @@ github.com/jonboulle/clockwork v0.2.2 h1:UOGuzwb1PwsrDAObMuhUnj0p5ULPj8V/xJ7Kx9q
github.com/jonboulle/clockwork v0.2.2/go.mod h1:Pkfl5aHPm1nk2H9h0bjmnJD/BcgbGXUBGnn1kMkgxc8=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
Expand All @@ -229,6 +232,10 @@ github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/X
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/k0kubun/go-ansi v0.0.0-20180517002512-3bf9e2903213/go.mod h1:vNUNkEQ1e29fT/6vq2aBdFsgNPmy8qMdSay1npru+Sw=
github.com/kataras/golog v0.1.9 h1:vLvSDpP7kihFGKFAvBSofYo7qZNULYSHOH2D7rPTKJk=
github.com/kataras/golog v0.1.9/go.mod h1:jlpk/bOaYCyqDqH18pgDHdaJab72yBE6i0O3s30hpWY=
github.com/kataras/pio v0.0.12 h1:o52SfVYauS3J5X08fNjlGS5arXHjW/ItLkyLcKjoH6w=
github.com/kataras/pio v0.0.12/go.mod h1:ODK/8XBhhQ5WqrAhKy+9lTPS7sBf6O3KcLhc9klfRcY=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
Expand Down Expand Up @@ -296,6 +303,12 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pluralsh/console-client-go v0.0.18 h1:GhfThwExfyQbU+NjMLdxH0KPuVJjdsI5NdK0cdTcakA=
github.com/pluralsh/console-client-go v0.0.18/go.mod h1:kZjk0pXAWnvyj+miXveCho4kKQaX1Tm3CGAM+iwurWU=
github.com/pluralsh/console-client-go v0.0.21 h1:CacHCqes793bOJbE5pFj3q+P9mSUevPk/IjbI/lF6tw=
github.com/pluralsh/console-client-go v0.0.21/go.mod h1:kZjk0pXAWnvyj+miXveCho4kKQaX1Tm3CGAM+iwurWU=
github.com/pluralsh/gophoenix v0.1.3-0.20231024044735-d0f772170497 h1:FIrclcMZV27uUoOWGq7bBSi8cDTbJrCzoXRZklnnET8=
github.com/pluralsh/gophoenix v0.1.3-0.20231024044735-d0f772170497/go.mod h1:IagWXKFYu6NTHzcJx2dJyrIlZ1Sv2PH3fhOtplA9qOs=
github.com/pluralsh/gophoenix v0.1.3-0.20231024165338-04291b4de463 h1:PtXJG2hM73c98bqYAaFNtY+eJ1RupEArHSarNzi3Hek=
github.com/pluralsh/gophoenix v0.1.3-0.20231024165338-04291b4de463/go.mod h1:IagWXKFYu6NTHzcJx2dJyrIlZ1Sv2PH3fhOtplA9qOs=
github.com/pluralsh/polly v0.1.4 h1:Kz90peCgvsfF3ERt8cujr5TR9z4wUlqQE60Eg09ZItY=
github.com/pluralsh/polly v0.1.4/go.mod h1:Yo1/jcW+4xwhWG+ZJikZy4J4HJkMNPZ7sq5auL2c/tY=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand Down Expand Up @@ -538,6 +551,8 @@ golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.9.0 h1:KS/R3tvhPqvJvwcKfnBHJwwthS11LRhmM5D59eEXa0s=
golang.org/x/sys v0.9.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.2.0/go.mod h1:TVmDHMZPmdnySmBfhjOoOdhjzdE1h4u1VwSiw2l1Nuc=
Expand Down
17 changes: 16 additions & 1 deletion pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/pluralsh/deployment-operator/pkg/client"
"github.com/pluralsh/deployment-operator/pkg/manifests"
deploysync "github.com/pluralsh/deployment-operator/pkg/sync"
"github.com/pluralsh/deployment-operator/pkg/websocket"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/client-go/discovery"
Expand All @@ -29,10 +30,11 @@ type Agent struct {
engine *deploysync.Engine
deathChan chan interface{}
svcQueue workqueue.RateLimitingInterface
socket *websocket.Socket
refresh time.Duration
}

func New(config *rest.Config, refresh time.Duration, consoleUrl, deployToken string) (*Agent, error) {
func New(config *rest.Config, refresh time.Duration, consoleUrl, deployToken, clusterId string) (*Agent, error) {
dc, err := discovery.NewDiscoveryClientForConfig(config)
if err != nil {
return nil, err
Expand All @@ -45,6 +47,14 @@ func New(config *rest.Config, refresh time.Duration, consoleUrl, deployToken str
deathChan := make(chan interface{})
invFactory := inventory.ClusterClientFactory{StatusPolicy: inventory.StatusPolicyNone}

socket, err := websocket.New(clusterId, consoleUrl, deployToken, svcQueue)
if err != nil {
if socket == nil {
return nil, err
}
log.Error(err, "could not initiate websocket connection, ignoring and falling back to polling")
}

f := newFactory(config)

applier, err := newApplier(invFactory, f)
Expand All @@ -60,6 +70,7 @@ func New(config *rest.Config, refresh time.Duration, consoleUrl, deployToken str
engine: engine,
deathChan: deathChan,
svcQueue: svcQueue,
socket: socket,
refresh: refresh,
}, nil
}
Expand All @@ -75,6 +86,10 @@ func (agent *Agent) Run() {
}
}()

if err := agent.socket.Join(); err != nil {
log.Error(err, "could not establish websocket to upstream")
}

err := wait.PollInfinite(agent.refresh, func() (done bool, err error) {
log.Info("fetching services for cluster")
svcs, err := agent.consoleClient.GetServices()
Expand Down
76 changes: 76 additions & 0 deletions pkg/websocket/socket.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package websocket

import (
"fmt"
"net/http"
"net/url"

phx "github.com/pluralsh/gophoenix"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2/klogr"
)

var (
log = klogr.New()
)

type Socket struct {
clusterId string
client *phx.Client
svcQueue workqueue.RateLimitingInterface
channel *phx.Channel
}

func New(clusterId, consoleUrl, deployToken string, svcQueue workqueue.RateLimitingInterface) (*Socket, error) {
socket := &Socket{svcQueue: svcQueue, clusterId: clusterId}
client := phx.NewClient(socket)

uri, err := wssUri(consoleUrl, deployToken)
if err != nil {
return nil, err
}

err = client.Connect(*uri, http.Header{})
socket.client = client
return socket, err
}

func (s *Socket) Join() error {
channel, err := s.client.Join(s, fmt.Sprintf("cluster:%s", s.clusterId), map[string]string{})
s.channel = channel
return err
}

func wssUri(consoleUrl, deployToken string) (*url.URL, error) {
uri, err := url.Parse(consoleUrl)
if err != nil {
return nil, err
}
wssUrl := fmt.Sprintf("wss://%s/ext/socket/websocket", uri.Host)
values, err := url.ParseQuery("vsn=2.0.0")
if err != nil {
return nil, err
}

values.Add("token", deployToken)
finalUrl := fmt.Sprintf("%s?%s", wssUrl, values.Encode())
return uri.Parse(finalUrl)
}

func (s *Socket) NotifyConnect() {}
func (s *Socket) NotifyDisconnect() {}

// implement ChannelReceiver
func (s *Socket) OnJoin(payload interface{}) {}
func (s *Socket) OnJoinError(payload interface{}) {}
func (s *Socket) OnChannelClose(payload interface{}, joinRef int64) {}
func (s *Socket) OnMessage(ref int64, event string, payload interface{}) {
if event == "service.event" {
if parsed, ok := payload.(map[string]interface{}); ok {
if id, ok := parsed["id"].(string); ok {
log.Info("got new service update from websocket", "id", id)
s.svcQueue.Add(id)
}
}
}
}

0 comments on commit e592786

Please sign in to comment.