Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: PRT - Implement cache reconnect mechanism + Cache E2E #1734

Merged
merged 10 commits into from
Oct 30, 2024
131 changes: 108 additions & 23 deletions protocol/performance/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,56 @@ package performance

import (
"context"
"sync"
"sync/atomic"
"time"

"github.com/lavanet/lava/v4/protocol/lavasession"
"github.com/lavanet/lava/v4/utils"
pairingtypes "github.com/lavanet/lava/v4/x/pairing/types"
)

type Cache struct {
client pairingtypes.RelayerCacheClient
address string
type relayerCacheClientStore struct {
client pairingtypes.RelayerCacheClient
lock sync.RWMutex
ctx context.Context
address string
reconnecting atomic.Bool
}

func ConnectGRPCConnectionToRelayerCacheService(ctx context.Context, addr string) (*pairingtypes.RelayerCacheClient, error) {
connectCtx, cancel := context.WithTimeout(ctx, 3*time.Second)
const (
reconnectInterval = 5 * time.Second
)

func newRelayerCacheClientStore(ctx context.Context, address string) (*relayerCacheClientStore, error) {
clientStore := &relayerCacheClientStore{
client: nil,
ctx: ctx,
address: address,
}
return clientStore, clientStore.connectClient()
}

func (r *relayerCacheClientStore) getClient() pairingtypes.RelayerCacheClient {
if r == nil {
return nil
}

r.lock.RLock()
defer r.lock.RUnlock()

if r.client == nil {
go r.reconnectClient()
}

return r.client // might be nil
}

func (r *relayerCacheClientStore) connectGRPCConnectionToRelayerCacheService() (*pairingtypes.RelayerCacheClient, error) {
connectCtx, cancel := context.WithTimeout(r.ctx, 3*time.Second)
defer cancel()

conn, err := lavasession.ConnectGRPCClient(connectCtx, addr, false, true, false)
conn, err := lavasession.ConnectGRPCClient(connectCtx, r.address, false, true, false)
if err != nil {
return nil, err
}
Expand All @@ -27,40 +61,91 @@ func ConnectGRPCConnectionToRelayerCacheService(ctx context.Context, addr string
return &c, nil
}

func InitCache(ctx context.Context, addr string) (*Cache, error) {
relayerCacheClient, err := ConnectGRPCConnectionToRelayerCacheService(ctx, addr)
if err != nil {
return &Cache{client: nil, address: addr}, err
func (r *relayerCacheClientStore) connectClient() error {
relayerCacheClient, err := r.connectGRPCConnectionToRelayerCacheService()
if err == nil {
utils.LavaFormatInfo("Connected to cache service", utils.LogAttr("address", r.address))
func() {
r.lock.Lock()
defer r.lock.Unlock()
r.client = *relayerCacheClient
}()

r.reconnecting.Store(false)
return nil // connected
}
cache := Cache{client: *relayerCacheClient, address: addr}
return &cache, nil

utils.LavaFormatDebug("Failed to connect to cache service", utils.LogAttr("address", r.address), utils.LogAttr("error", err))
return err
}

func (r *relayerCacheClientStore) reconnectClient() {
// This is a simple atomic operation to ensure that only one goroutine is reconnecting at a time.
// reconnecting.CompareAndSwap(false, true):
// if reconnecting == false {
// reconnecting = true
// return true -> reconnect
// }
// return false -> already reconnecting
if !r.reconnecting.CompareAndSwap(false, true) {
return
}

for {
select {
case <-r.ctx.Done():
return
case <-time.After(reconnectInterval):
if r.connectClient() != nil {
return
}
}
}
}

type Cache struct {
clientStore *relayerCacheClientStore
address string
serviceCtx context.Context
}

func InitCache(ctx context.Context, addr string) (*Cache, error) {
clientStore, err := newRelayerCacheClientStore(ctx, addr)
return &Cache{
clientStore: clientStore,
address: addr,
serviceCtx: ctx,
}, err
}

func (cache *Cache) GetEntry(ctx context.Context, relayCacheGet *pairingtypes.RelayCacheGet) (reply *pairingtypes.CacheRelayReply, err error) {
if cache == nil {
// TODO: try to connect again once in a while
return nil, NotInitializedError
}
if cache.client == nil {
return nil, NotConnectedError.Wrapf("No client connected to address: %s", cache.address)

client := cache.clientStore.getClient()
if client == nil {
return nil, NotConnectedError
}
// TODO: handle disconnections and error types here
return cache.client.GetRelay(ctx, relayCacheGet)

reply, err = client.GetRelay(ctx, relayCacheGet)
return reply, err
}

func (cache *Cache) CacheActive() bool {
return cache != nil
return cache != nil && cache.clientStore.getClient() != nil
}

func (cache *Cache) SetEntry(ctx context.Context, cacheSet *pairingtypes.RelayCacheSet) error {
if cache == nil {
// TODO: try to connect again once in a while
return NotInitializedError
}
if cache.client == nil {
return NotConnectedError.Wrapf("No client connected to address: %s", cache.address)

client := cache.clientStore.getClient()
if client == nil {
return NotConnectedError
}
// TODO: handle disconnections and SetRelay error types here
_, err := cache.client.SetRelay(ctx, cacheSet)

_, err := client.SetRelay(ctx, cacheSet)
return err
}
2 changes: 1 addition & 1 deletion protocol/performance/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ import (

var (
NotConnectedError = sdkerrors.New("Not Connected Error", 700, "No Connection To grpc server")
NotInitializedError = sdkerrors.New("Not Initialised Error", 701, "to use cache run initCache")
NotInitializedError = sdkerrors.New("Not Initialized Error", 701, "to use cache run initCache")
)
1 change: 1 addition & 0 deletions testutil/e2e/allowedErrorList.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ var allowedErrors = map[string]string{
"purging provider after all endpoints are disabled provider": "This error is allowed because it is caused by the initial bootup, continuous failure would be caught by the e2e so we can allowed this error.",
"Provider Side Failed Sending Message, Reason: Unavailable": "This error is allowed because it is caused by the lavad restart to turn on emergency mode",
"Maximum cu exceeded PrepareSessionForUsage": "This error is allowed because it is caused by switching between providers, continuous failure would be caught by the e2e so we can allowed this error.",
"Failed To Connect to cache at address": "This error is allowed because it is caused by cache being connected only during the test and not during the bootup",
}

var allowedErrorsDuringEmergencyMode = map[string]string{
Expand Down
Loading
Loading