Skip to content

Commit

Permalink
Separate the cache from the relayer for auto reconnection
Browse files Browse the repository at this point in the history
  • Loading branch information
shleikes committed Oct 9, 2024
1 parent ca3ee23 commit 4b4047d
Showing 1 changed file with 76 additions and 44 deletions.
120 changes: 76 additions & 44 deletions protocol/performance/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ package performance

import (
"context"
"errors"
"strings"
"sync"
"sync/atomic"
"time"

Expand All @@ -12,22 +11,47 @@ import (
pairingtypes "github.com/lavanet/lava/v3/x/pairing/types"
)

type Cache struct {
type relayerCacheClientStore struct {
client pairingtypes.RelayerCacheClient
lock sync.RWMutex
ctx context.Context
address string
serviceCtx context.Context
reconnecting atomic.Bool
}

const (
ReconnectInterval = 5 * time.Second
reconnectInterval = 5 * time.Second
)

func ConnectGRPCConnectionToRelayerCacheService(ctx context.Context, addr string) (*pairingtypes.RelayerCacheClient, error) {
connectCtx, cancel := context.WithTimeout(ctx, 3*time.Second)
func newRelayerCacheClientStore(ctx context.Context, address string) (*relayerCacheClientStore, error) {
clientStore := &relayerCacheClientStore{
client: nil,
ctx: ctx,
address: address,
}
return clientStore, clientStore.connectClient()
}

func (r *relayerCacheClientStore) getClient() pairingtypes.RelayerCacheClient {
if r == nil {
return nil
}

r.lock.RLock()
defer r.lock.RUnlock()

if r.client == nil {
go r.reconnectClient()
}

return r.client // might be nil
}

func (r *relayerCacheClientStore) connectGRPCConnectionToRelayerCacheService() (*pairingtypes.RelayerCacheClient, error) {
connectCtx, cancel := context.WithTimeout(r.ctx, 3*time.Second)
defer cancel()

conn, err := lavasession.ConnectGRPCClient(connectCtx, addr, false, true, false)
conn, err := lavasession.ConnectGRPCClient(connectCtx, r.address, false, true, false)
if err != nil {
return nil, err
}
Expand All @@ -37,84 +61,92 @@ func ConnectGRPCConnectionToRelayerCacheService(ctx context.Context, addr string
return &c, nil
}

func InitCache(ctx context.Context, addr string) (*Cache, error) {
relayerCacheClient, err := ConnectGRPCConnectionToRelayerCacheService(ctx, addr)
if err != nil {
cache := &Cache{client: nil, address: addr, serviceCtx: ctx}
go cache.reconnectLoop()
return cache, err
func (r *relayerCacheClientStore) connectClient() error {
relayerCacheClient, err := r.connectGRPCConnectionToRelayerCacheService()
if err == nil {
utils.LavaFormatInfo("Connected to cache service", utils.LogAttr("address", r.address))
func() {
r.lock.Lock()
defer r.lock.Unlock()
r.client = *relayerCacheClient
}()

r.reconnecting.Store(false)
return nil // connected
}
cache := Cache{client: *relayerCacheClient, address: addr, serviceCtx: ctx}
return &cache, nil

utils.LavaFormatDebug("Failed to connect to cache service", utils.LogAttr("address", r.address), utils.LogAttr("error", err))
return err
}

func (cache *Cache) reconnectLoop() {
func (r *relayerCacheClientStore) reconnectClient() {
// This is a simple atomic operation to ensure that only one goroutine is reconnecting at a time.
// reconnecting.CompareAndSwap(false, true):
// if reconnecting == false {
// reconnecting = true
// return true -> reconnect
// }
// return false -> already reconnecting
if !cache.reconnecting.CompareAndSwap(false, true) {
if !r.reconnecting.CompareAndSwap(false, true) {
return
}

for {
select {
case <-cache.serviceCtx.Done():
case <-r.ctx.Done():
return
case <-time.After(ReconnectInterval):
relayerCacheClient, err := ConnectGRPCConnectionToRelayerCacheService(cache.serviceCtx, cache.address)
if err == nil {
utils.LavaFormatInfo("Connection to cache service restored", utils.LogAttr("address", cache.address))
cache.client = *relayerCacheClient
cache.reconnecting.Store(false)
return // connected
} else {
utils.LavaFormatDebug("Failed to connect to cache service", utils.LogAttr("address", cache.address), utils.LogAttr("error", err))
case <-time.After(reconnectInterval):
if r.connectClient() != nil {
return
}
}
}
}

func (cache *Cache) reconnectIfNeeded(err error) {
if err != nil && (errors.Is(err, context.DeadlineExceeded) || strings.Contains(err.Error(), "connection refused")) {
utils.LavaFormatDebug("Cache connection failed, reconnecting", utils.LogAttr("address", cache.address), utils.LogAttr("error", err))
go cache.reconnectLoop()
}
type Cache struct {
clientStore *relayerCacheClientStore
address string
serviceCtx context.Context
}

func InitCache(ctx context.Context, addr string) (*Cache, error) {
clientStore, err := newRelayerCacheClientStore(ctx, addr)
return &Cache{
clientStore: clientStore,
address: addr,
serviceCtx: ctx,
}, err

Check failure on line 119 in protocol/performance/cache.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gofumpt`-ed (gofumpt)
}

Check failure on line 120 in protocol/performance/cache.go

View workflow job for this annotation

GitHub Actions / lint

unnecessary trailing newline (whitespace)

func (cache *Cache) GetEntry(ctx context.Context, relayCacheGet *pairingtypes.RelayCacheGet) (reply *pairingtypes.CacheRelayReply, err error) {
if cache == nil {
return nil, NotInitializedError
}

if cache.client == nil {
go cache.reconnectLoop()
return nil, NotConnectedError.Wrapf("No client connected to address: %s", cache.address)
client := cache.clientStore.getClient()
if client == nil {
return nil, NotConnectedError
}

reply, err = cache.client.GetRelay(ctx, relayCacheGet)
cache.reconnectIfNeeded(err)
reply, err = client.GetRelay(ctx, relayCacheGet)
return reply, err
}

func (cache *Cache) CacheActive() bool {
return cache != nil && cache.client != nil
return cache != nil && cache.clientStore.getClient() != nil
}

func (cache *Cache) SetEntry(ctx context.Context, cacheSet *pairingtypes.RelayCacheSet) error {
if cache == nil {
return NotInitializedError
}

if cache.client == nil {
go cache.reconnectLoop()
return NotConnectedError.Wrapf("No client connected to address: %s", cache.address)
client := cache.clientStore.getClient()
if client == nil {
return NotConnectedError
}

_, err := cache.client.SetRelay(ctx, cacheSet)
cache.reconnectIfNeeded(err)
_, err := client.SetRelay(ctx, cacheSet)
return err
}

0 comments on commit 4b4047d

Please sign in to comment.