Skip to content

Commit

Permalink
Revert "removed pairing query cache"
Browse files Browse the repository at this point in the history
This reverts commit 0e9e66b.
  • Loading branch information
oren-lava committed Sep 10, 2024
1 parent 0e9e66b commit c193ebf
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 5 deletions.
8 changes: 8 additions & 0 deletions x/pairing/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"

storetypes "github.com/cosmos/cosmos-sdk/store/types"
epochstoragetypes "github.com/lavanet/lava/v3/x/epochstorage/types"
timerstoretypes "github.com/lavanet/lava/v3/x/timerstore/types"

"github.com/cometbft/cometbft/libs/log"
Expand Down Expand Up @@ -34,6 +35,8 @@ type (
downtimeKeeper types.DowntimeKeeper
dualstakingKeeper types.DualstakingKeeper
stakingKeeper types.StakingKeeper

pairingQueryCache *map[string][]epochstoragetypes.StakeEntry
}
)

Expand Down Expand Up @@ -71,6 +74,8 @@ func NewKeeper(
ps = ps.WithKeyTable(types.ParamKeyTable())
}

emptypairingQueryCache := map[string][]epochstoragetypes.StakeEntry{}

keeper := &Keeper{
cdc: cdc,
storeKey: storeKey,
Expand All @@ -86,6 +91,7 @@ func NewKeeper(
downtimeKeeper: downtimeKeeper,
dualstakingKeeper: dualstakingKeeper,
stakingKeeper: stakingKeeper,
pairingQueryCache: &emptypairingQueryCache,
}

// note that the timer and badgeUsedCu keys are the same (so we can use only the second arg)
Expand All @@ -107,6 +113,8 @@ func (k Keeper) Logger(ctx sdk.Context) log.Logger {

func (k Keeper) BeginBlock(ctx sdk.Context) {
if k.epochStorageKeeper.IsEpochStart(ctx) {
// reset pairing query cache every epoch
*k.pairingQueryCache = map[string][]epochstoragetypes.StakeEntry{}
// remove old session payments
k.RemoveOldEpochPayments(ctx)
// unstake/jail unresponsive providers
Expand Down
28 changes: 24 additions & 4 deletions x/pairing/keeper/pairing.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (k Keeper) GetPairingForClient(ctx sdk.Context, chainID string, clientAddre
return nil, fmt.Errorf("invalid user for pairing: %s", err.Error())
}

providers, _, _, err = k.getPairingForClient(ctx, chainID, block, strictestPolicy, cluster, project.Index, false)
providers, _, _, err = k.getPairingForClient(ctx, chainID, block, strictestPolicy, cluster, project.Index, false, true)

return providers, err
}
Expand All @@ -90,7 +90,7 @@ func (k Keeper) CalculatePairingChance(ctx sdk.Context, provider string, chainID
totalScore := cosmosmath.ZeroUint()
providerScore := cosmosmath.ZeroUint()

_, _, scores, err := k.getPairingForClient(ctx, chainID, uint64(ctx.BlockHeight()), policy, cluster, "dummy", true)
_, _, scores, err := k.getPairingForClient(ctx, chainID, uint64(ctx.BlockHeight()), policy, cluster, "dummy", true, false)
if err != nil {
return cosmosmath.LegacyZeroDec(), err
}
Expand All @@ -117,12 +117,22 @@ func (k Keeper) CalculatePairingChance(ctx sdk.Context, provider string, chainID

// function used to get a new pairing from provider and client
// first argument has all metadata, second argument is only the addresses
func (k Keeper) getPairingForClient(ctx sdk.Context, chainID string, block uint64, policy *planstypes.Policy, cluster string, projectIndex string, calcChance bool) (providers []epochstoragetypes.StakeEntry, allowedCU uint64, providerScores []*pairingscores.PairingScore, errorRet error) {
// useCache is a boolean argument that is used to determine whether pairing cache should be used
// Note: useCache should only be true for queries! functions that write to the state and use this function should never put useCache=true
func (k Keeper) getPairingForClient(ctx sdk.Context, chainID string, block uint64, policy *planstypes.Policy, cluster string, projectIndex string, calcChance bool, useCache bool) (providers []epochstoragetypes.StakeEntry, allowedCU uint64, providerScores []*pairingscores.PairingScore, errorRet error) {
epoch, providersType, err := k.VerifyPairingData(ctx, chainID, block)
if err != nil {
return nil, 0, nil, fmt.Errorf("invalid pairing data: %s", err)
}

// to be used only in queries as this changes gas calculations, and therefore must not be part of consensus
if useCache {
providers, found := k.GetPairingQueryCache(projectIndex, chainID, epoch)
if found {
return providers, policy.EpochCuLimit, nil, nil
}
}

stakeEntries := k.epochStorageKeeper.GetAllStakeEntriesForEpochChainId(ctx, epoch, chainID)
if len(stakeEntries) == 0 {
return nil, 0, nil, fmt.Errorf("did not find providers for pairing: epoch:%d, chainID: %s", block, chainID)
Expand All @@ -139,6 +149,9 @@ func (k Keeper) getPairingForClient(ctx sdk.Context, chainID string, block uint6
stakeEntriesFiltered = append(stakeEntriesFiltered, stakeEntries[i])
}
}
if useCache {
k.SetPairingQueryCache(projectIndex, chainID, epoch, stakeEntriesFiltered)
}
return stakeEntriesFiltered, policy.EpochCuLimit, nil, nil
}

Expand All @@ -158,6 +171,9 @@ func (k Keeper) getPairingForClient(ctx sdk.Context, chainID string, block uint6
for _, score := range providerScores {
filteredEntries = append(filteredEntries, *score.Provider)
}
if useCache {
k.SetPairingQueryCache(projectIndex, chainID, epoch, filteredEntries)
}
return filteredEntries, policy.EpochCuLimit, nil, nil
}

Expand All @@ -178,6 +194,10 @@ func (k Keeper) getPairingForClient(ctx sdk.Context, chainID string, block uint6
prevGroupSlot = group
}

if useCache {
k.SetPairingQueryCache(projectIndex, chainID, epoch, providers)
}

return providers, policy.EpochCuLimit, providerScores, nil
}

Expand Down Expand Up @@ -330,7 +350,7 @@ func (k Keeper) ValidatePairingForClient(ctx sdk.Context, chainID string, provid
return false, allowedCU, []epochstoragetypes.StakeEntry{}, fmt.Errorf("invalid user for pairing: %s", err.Error())
}

validAddresses, allowedCU, _, err = k.getPairingForClient(ctx, chainID, epoch, strictestPolicy, cluster, project.Index, false)
validAddresses, allowedCU, _, err = k.getPairingForClient(ctx, chainID, epoch, strictestPolicy, cluster, project.Index, false, false)
if err != nil {
return false, allowedCU, []epochstoragetypes.StakeEntry{}, err
}
Expand Down
24 changes: 24 additions & 0 deletions x/pairing/keeper/pairing_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,27 @@ func (k Keeper) ResetPairingRelayCache(ctx sdk.Context) {
store.Delete(iterator.Key())
}
}

// the cache used for the query, does not write into state
func (k Keeper) SetPairingQueryCache(project string, chainID string, epoch uint64, pairedProviders []epochstoragetypes.StakeEntry) {
if k.pairingQueryCache == nil {
// pairing cache is not initialized, will be in next epoch so simply skip
return
}
key := types.NewPairingCacheKey(project, chainID, epoch)

(*k.pairingQueryCache)[key] = pairedProviders
}

func (k Keeper) GetPairingQueryCache(project string, chainID string, epoch uint64) ([]epochstoragetypes.StakeEntry, bool) {
if k.pairingQueryCache == nil {
// pairing cache is not initialized, will be in next epoch so simply skip
return nil, false
}
key := types.NewPairingCacheKey(project, chainID, epoch)
if providers, ok := (*k.pairingQueryCache)[key]; ok {
return providers, true
}

return nil, false
}
39 changes: 38 additions & 1 deletion x/pairing/keeper/pairing_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,44 @@ import (
"github.com/stretchr/testify/require"
)

// TestPairingRelayCache tests the following:
// TestPairingQueryCache tests the following:
// 1. The pairing query cache is reset every epoch
// 2. Getting pairing with a query using an existent cache entry consumes fewer gas than without one
func TestPairingQueryCache(t *testing.T) {
ts := newTester(t)
ts.setupForPayments(1, 1, 0) // 1 provider, 1 client, default providers-to-pair

_, consumer := ts.GetAccount(common.CONSUMER, 0)

getPairingGas := func(ts *tester) uint64 {
gm := ts.Ctx.GasMeter()
before := gm.GasConsumed()
_, err := ts.QueryPairingGetPairing(ts.spec.Index, consumer)
require.NoError(t, err)
return gm.GasConsumed() - before
}

// query for pairing for the first time - empty cache
emptyCacheGas := getPairingGas(ts)

// query for pairing for the second time - non-empty cache
filledCacheGas := getPairingGas(ts)

// second time gas should be smaller than first time
require.Less(t, filledCacheGas, emptyCacheGas)

// advance block to test it stays the same (should still be less than empty cache gas)
ts.AdvanceBlock()
filledAfterBlockCacheGas := getPairingGas(ts)
require.Less(t, filledAfterBlockCacheGas, emptyCacheGas)

// advance epoch to reset the cache
ts.AdvanceEpoch()
emptyCacheAgainGas := getPairingGas(ts)
require.Equal(t, emptyCacheGas, emptyCacheAgainGas)
}

// TestPairingQueryCache tests the following:
// 1. The pairing relay cache is reset every block
// 2. Getting pairing in relay payment using an existent cache entry consumes fewer gas than without one
func TestPairingRelayCache(t *testing.T) {
Expand Down

0 comments on commit c193ebf

Please sign in to comment.