From 4b4047d5dffac13b491ce1dc5c92167bbc4c76a8 Mon Sep 17 00:00:00 2001 From: Elad Gildnur Date: Wed, 9 Oct 2024 16:11:13 +0300 Subject: [PATCH] Separate the cache from the relayer for auto reconnection --- protocol/performance/cache.go | 120 +++++++++++++++++++++------------- 1 file changed, 76 insertions(+), 44 deletions(-) diff --git a/protocol/performance/cache.go b/protocol/performance/cache.go index 0dee41bf46..0db0a499d2 100644 --- a/protocol/performance/cache.go +++ b/protocol/performance/cache.go @@ -2,8 +2,7 @@ package performance import ( "context" - "errors" - "strings" + "sync" "sync/atomic" "time" @@ -12,22 +11,47 @@ import ( pairingtypes "github.com/lavanet/lava/v3/x/pairing/types" ) -type Cache struct { +type relayerCacheClientStore struct { client pairingtypes.RelayerCacheClient + lock sync.RWMutex + ctx context.Context address string - serviceCtx context.Context reconnecting atomic.Bool } const ( - ReconnectInterval = 5 * time.Second + reconnectInterval = 5 * time.Second ) -func ConnectGRPCConnectionToRelayerCacheService(ctx context.Context, addr string) (*pairingtypes.RelayerCacheClient, error) { - connectCtx, cancel := context.WithTimeout(ctx, 3*time.Second) +func newRelayerCacheClientStore(ctx context.Context, address string) (*relayerCacheClientStore, error) { + clientStore := &relayerCacheClientStore{ + client: nil, + ctx: ctx, + address: address, + } + return clientStore, clientStore.connectClient() +} + +func (r *relayerCacheClientStore) getClient() pairingtypes.RelayerCacheClient { + if r == nil { + return nil + } + + r.lock.RLock() + defer r.lock.RUnlock() + + if r.client == nil { + go r.reconnectClient() + } + + return r.client // might be nil +} + +func (r *relayerCacheClientStore) connectGRPCConnectionToRelayerCacheService() (*pairingtypes.RelayerCacheClient, error) { + connectCtx, cancel := context.WithTimeout(r.ctx, 3*time.Second) defer cancel() - conn, err := lavasession.ConnectGRPCClient(connectCtx, addr, false, true, false) + conn, err := lavasession.ConnectGRPCClient(connectCtx, r.address, false, true, false) if err != nil { return nil, err } @@ -37,18 +61,25 @@ func ConnectGRPCConnectionToRelayerCacheService(ctx context.Context, addr string return &c, nil } -func InitCache(ctx context.Context, addr string) (*Cache, error) { - relayerCacheClient, err := ConnectGRPCConnectionToRelayerCacheService(ctx, addr) - if err != nil { - cache := &Cache{client: nil, address: addr, serviceCtx: ctx} - go cache.reconnectLoop() - return cache, err +func (r *relayerCacheClientStore) connectClient() error { + relayerCacheClient, err := r.connectGRPCConnectionToRelayerCacheService() + if err == nil { + utils.LavaFormatInfo("Connected to cache service", utils.LogAttr("address", r.address)) + func() { + r.lock.Lock() + defer r.lock.Unlock() + r.client = *relayerCacheClient + }() + + r.reconnecting.Store(false) + return nil // connected } - cache := Cache{client: *relayerCacheClient, address: addr, serviceCtx: ctx} - return &cache, nil + + utils.LavaFormatDebug("Failed to connect to cache service", utils.LogAttr("address", r.address), utils.LogAttr("error", err)) + return err } -func (cache *Cache) reconnectLoop() { +func (r *relayerCacheClientStore) reconnectClient() { // This is a simple atomic operation to ensure that only one goroutine is reconnecting at a time. // reconnecting.CompareAndSwap(false, true): // if reconnecting == false { @@ -56,33 +87,36 @@ func (cache *Cache) reconnectLoop() { // return true -> reconnect // } // return false -> already reconnecting - if !cache.reconnecting.CompareAndSwap(false, true) { + if !r.reconnecting.CompareAndSwap(false, true) { return } for { select { - case <-cache.serviceCtx.Done(): + case <-r.ctx.Done(): return - case <-time.After(ReconnectInterval): - relayerCacheClient, err := ConnectGRPCConnectionToRelayerCacheService(cache.serviceCtx, cache.address) - if err == nil { - utils.LavaFormatInfo("Connection to cache service restored", utils.LogAttr("address", cache.address)) - cache.client = *relayerCacheClient - cache.reconnecting.Store(false) - return // connected - } else { - utils.LavaFormatDebug("Failed to connect to cache service", utils.LogAttr("address", cache.address), utils.LogAttr("error", err)) + case <-time.After(reconnectInterval): + if r.connectClient() != nil { + return } } } } -func (cache *Cache) reconnectIfNeeded(err error) { - if err != nil && (errors.Is(err, context.DeadlineExceeded) || strings.Contains(err.Error(), "connection refused")) { - utils.LavaFormatDebug("Cache connection failed, reconnecting", utils.LogAttr("address", cache.address), utils.LogAttr("error", err)) - go cache.reconnectLoop() - } +type Cache struct { + clientStore *relayerCacheClientStore + address string + serviceCtx context.Context +} + +func InitCache(ctx context.Context, addr string) (*Cache, error) { + clientStore, err := newRelayerCacheClientStore(ctx, addr) + return &Cache{ + clientStore: clientStore, + address: addr, + serviceCtx: ctx, + }, err + } func (cache *Cache) GetEntry(ctx context.Context, relayCacheGet *pairingtypes.RelayCacheGet) (reply *pairingtypes.CacheRelayReply, err error) { @@ -90,18 +124,17 @@ func (cache *Cache) GetEntry(ctx context.Context, relayCacheGet *pairingtypes.Re return nil, NotInitializedError } - if cache.client == nil { - go cache.reconnectLoop() - return nil, NotConnectedError.Wrapf("No client connected to address: %s", cache.address) + client := cache.clientStore.getClient() + if client == nil { + return nil, NotConnectedError } - reply, err = cache.client.GetRelay(ctx, relayCacheGet) - cache.reconnectIfNeeded(err) + reply, err = client.GetRelay(ctx, relayCacheGet) return reply, err } func (cache *Cache) CacheActive() bool { - return cache != nil && cache.client != nil + return cache != nil && cache.clientStore.getClient() != nil } func (cache *Cache) SetEntry(ctx context.Context, cacheSet *pairingtypes.RelayCacheSet) error { @@ -109,12 +142,11 @@ func (cache *Cache) SetEntry(ctx context.Context, cacheSet *pairingtypes.RelayCa return NotInitializedError } - if cache.client == nil { - go cache.reconnectLoop() - return NotConnectedError.Wrapf("No client connected to address: %s", cache.address) + client := cache.clientStore.getClient() + if client == nil { + return NotConnectedError } - _, err := cache.client.SetRelay(ctx, cacheSet) - cache.reconnectIfNeeded(err) + _, err := client.SetRelay(ctx, cacheSet) return err }