diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index 7da65e9e..fc8f07bf 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -47,7 +47,7 @@ func New(config *rest.Config, refresh time.Duration, consoleUrl, deployToken, cl deathChan := make(chan interface{}) invFactory := inventory.ClusterClientFactory{StatusPolicy: inventory.StatusPolicyNone} - socket, err := websocket.New(clusterId, consoleUrl, deployToken, svcQueue) + socket, err := websocket.New(clusterId, consoleUrl, deployToken, svcQueue, svcCache) if err != nil { if socket == nil { return nil, err diff --git a/pkg/client/cache.go b/pkg/client/cache.go index 468412a6..d1fb2990 100644 --- a/pkg/client/cache.go +++ b/pkg/client/cache.go @@ -50,6 +50,10 @@ func (c *ServiceCache) Wipe() { c.cache.Clear() } +func (c *ServiceCache) Expire(id string) { + c.cache.Remove(id) +} + func (l *cacheLine) live(dur time.Duration) bool { return l.created.After(time.Now().Add(-dur)) } diff --git a/pkg/websocket/socket.go b/pkg/websocket/socket.go index 392ab031..4104f164 100644 --- a/pkg/websocket/socket.go +++ b/pkg/websocket/socket.go @@ -5,6 +5,7 @@ import ( "net/http" "net/url" + "github.com/pluralsh/deployment-operator/pkg/client" phx "github.com/pluralsh/gophoenix" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2/klogr" @@ -18,11 +19,12 @@ type Socket struct { clusterId string client *phx.Client svcQueue workqueue.RateLimitingInterface + svcCache *client.ServiceCache channel *phx.Channel } -func New(clusterId, consoleUrl, deployToken string, svcQueue workqueue.RateLimitingInterface) (*Socket, error) { - socket := &Socket{svcQueue: svcQueue, clusterId: clusterId} +func New(clusterId, consoleUrl, deployToken string, svcQueue workqueue.RateLimitingInterface, svcCache *client.ServiceCache) (*Socket, error) { + socket := &Socket{svcQueue: svcQueue, clusterId: clusterId, svcCache: svcCache} client := phx.NewClient(socket) uri, err := wssUri(consoleUrl, deployToken) @@ -69,6 +71,7 @@ func (s *Socket) OnMessage(ref int64, event string, payload interface{}) { if parsed, ok := payload.(map[string]interface{}); ok { if id, ok := parsed["id"].(string); ok { log.Info("got new service update from websocket", "id", id) + s.svcCache.Expire(id) s.svcQueue.Add(id) } }