Skip to content

Commit

Permalink
Bust service cache on incoming websocket (#70)
Browse files Browse the repository at this point in the history
At the moment the websocket might not matter that much, since it would just get served out of cache, we can confidently expire in that case.
  • Loading branch information
michaeljguarino authored Oct 25, 2023
1 parent 8ec5df5 commit bd4ac24
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 3 deletions.
2 changes: 1 addition & 1 deletion pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func New(config *rest.Config, refresh time.Duration, consoleUrl, deployToken, cl
deathChan := make(chan interface{})
invFactory := inventory.ClusterClientFactory{StatusPolicy: inventory.StatusPolicyNone}

socket, err := websocket.New(clusterId, consoleUrl, deployToken, svcQueue)
socket, err := websocket.New(clusterId, consoleUrl, deployToken, svcQueue, svcCache)
if err != nil {
if socket == nil {
return nil, err
Expand Down
4 changes: 4 additions & 0 deletions pkg/client/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ func (c *ServiceCache) Wipe() {
c.cache.Clear()
}

func (c *ServiceCache) Expire(id string) {
c.cache.Remove(id)
}

func (l *cacheLine) live(dur time.Duration) bool {
return l.created.After(time.Now().Add(-dur))
}
7 changes: 5 additions & 2 deletions pkg/websocket/socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"net/http"
"net/url"

"github.com/pluralsh/deployment-operator/pkg/client"
phx "github.com/pluralsh/gophoenix"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2/klogr"
Expand All @@ -18,11 +19,12 @@ type Socket struct {
clusterId string
client *phx.Client
svcQueue workqueue.RateLimitingInterface
svcCache *client.ServiceCache
channel *phx.Channel
}

func New(clusterId, consoleUrl, deployToken string, svcQueue workqueue.RateLimitingInterface) (*Socket, error) {
socket := &Socket{svcQueue: svcQueue, clusterId: clusterId}
func New(clusterId, consoleUrl, deployToken string, svcQueue workqueue.RateLimitingInterface, svcCache *client.ServiceCache) (*Socket, error) {
socket := &Socket{svcQueue: svcQueue, clusterId: clusterId, svcCache: svcCache}
client := phx.NewClient(socket)

uri, err := wssUri(consoleUrl, deployToken)
Expand Down Expand Up @@ -69,6 +71,7 @@ func (s *Socket) OnMessage(ref int64, event string, payload interface{}) {
if parsed, ok := payload.(map[string]interface{}); ok {
if id, ok := parsed["id"].(string); ok {
log.Info("got new service update from websocket", "id", id)
s.svcCache.Expire(id)
s.svcQueue.Add(id)
}
}
Expand Down

0 comments on commit bd4ac24

Please sign in to comment.