Skip to content

Commit

Permalink
PRT-1166 cache hash key migration. (#1300)
Browse files Browse the repository at this point in the history
* cache WIP, hash key migration.

* fix hashing requested block.

* fix lint

* fix test errors and nil pointers

* fixing setting data before changing it.

* fix seen block bug.

* fix cache bug when requested block is negative and missing handling for -4 requests (pending)

* rename

* cache improvements for TTL and unmarshalling

* fix lint

* cache seen block edge case, and adding chain id prints to errors

* fix grpc bandwidth size.

* bug fix

* adding max call size to all grpc connections

* fix seen block state machine, fix latency for lava block fetching, fix provider proto deep copying.

* add cache to chain fetcher and fix edge case

* fix unhandled seenBlock on response larger than seen scenario

* update upgrade params
  • Loading branch information
ranlavanet authored Mar 18, 2024
1 parent e74973d commit b3bec20
Show file tree
Hide file tree
Showing 30 changed files with 1,014 additions and 565 deletions.
6 changes: 4 additions & 2 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ linters:
- stylecheck
- typecheck
- unconvert
#- unused we have a lot of unused code that we dont want to delete
#- unused we have a lot of unused code that we don't want to delete
- forcetypeassert
- gofmt
- goimports
Expand All @@ -56,6 +56,9 @@ issues:
- text: "ST1016:"
linters:
- stylecheck
- text: "SA1019:.*\"github.com/golang/protobuf/proto\" is deprecated.*" # proto is deprecated, but some places couldn't be removed
linters:
- staticcheck
- path: "migrations"
text: "SA1019:"
linters:
Expand All @@ -72,6 +75,5 @@ linters-settings:
suggest-new: true
nolintlint:
allow-unused: false
allow-leading-space: true
require-explanation: false
require-specific: false
142 changes: 84 additions & 58 deletions ecosystem/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/lavanet/lava/ecosystem/cache"
"github.com/lavanet/lava/ecosystem/cache/format"
"github.com/lavanet/lava/protocol/chainlib"
"github.com/lavanet/lava/utils"
pairingtypes "github.com/lavanet/lava/x/pairing/types"
spectypes "github.com/lavanet/lava/x/spec/types"
Expand All @@ -30,7 +31,7 @@ const (
func initTest() (context.Context, *cache.RelayerCacheServer) {
ctx := context.Background()
cs := cache.CacheServer{CacheMaxCost: 2 * 1024 * 1024 * 1024}
cs.InitCache(ctx, cache.DefaultExpirationTimeFinalized, cache.DefaultExpirationForNonFinalized, cache.DisabledFlagOption, true)
cs.InitCache(ctx, cache.DefaultExpirationTimeFinalized, cache.DefaultExpirationForNonFinalized, cache.DisabledFlagOption)
cacheServer := &cache.RelayerCacheServer{CacheServer: &cs}
return ctx, cacheServer
}
Expand Down Expand Up @@ -62,11 +63,12 @@ func TestCacheSetGet(t *testing.T) {
response := &pairingtypes.RelayReply{}

messageSet := pairingtypes.RelayCacheSet{
Request: shallowCopy(request),
BlockHash: tt.hash,
ChainID: StubChainID,
Response: response,
Finalized: tt.finalized,
RequestHash: HashRequest(t, request, StubChainID),
BlockHash: tt.hash,
ChainId: StubChainID,
Response: response,
Finalized: tt.finalized,
RequestedBlock: request.RequestBlock,
}

_, err := cacheServer.SetRelay(ctx, &messageSet)
Expand All @@ -77,10 +79,11 @@ func TestCacheSetGet(t *testing.T) {
// now to get it

messageGet := pairingtypes.RelayCacheGet{
Request: shallowCopy(request),
BlockHash: tt.hash,
ChainID: StubChainID,
Finalized: tt.finalized,
RequestHash: HashRequest(t, request, StubChainID),
BlockHash: tt.hash,
ChainId: StubChainID,
Finalized: tt.finalized,
RequestedBlock: request.RequestBlock,
}
_, err = cacheServer.GetRelay(ctx, &messageGet)
if tt.valid {
Expand Down Expand Up @@ -121,6 +124,18 @@ func shallowCopy(request *pairingtypes.RelayPrivateData) *pairingtypes.RelayPriv
}
}

func HashRequest(t *testing.T, request *pairingtypes.RelayPrivateData, chainId string) []byte {
hash, _, err := chainlib.HashCacheRequest(request, chainId)
require.NoError(t, err)
return hash
}

func HashRequestFormatter(t *testing.T, request *pairingtypes.RelayPrivateData, chainId string) ([]byte, func([]byte) []byte) {
hash, outputFormatter, err := chainlib.HashCacheRequest(request, chainId)
require.NoError(t, err)
return hash, outputFormatter
}

func TestCacheGetWithoutSet(t *testing.T) {
t.Parallel()
tests := []struct {
Expand All @@ -147,12 +162,12 @@ func TestCacheGetWithoutSet(t *testing.T) {
request := getRequest(1230, []byte(StubSig), StubApiInterface)

// now to get it

messageGet := pairingtypes.RelayCacheGet{
Request: shallowCopy(request),
BlockHash: tt.hash,
ChainID: StubChainID,
Finalized: tt.finalized,
RequestHash: HashRequest(t, request, StubChainID),
BlockHash: tt.hash,
ChainId: StubChainID,
Finalized: tt.finalized,
RequestedBlock: request.RequestBlock,
}
_, err := cacheServer.GetRelay(ctx, &messageGet)

Expand Down Expand Up @@ -196,11 +211,12 @@ func TestCacheFailSetWithInvalidRequestBlock(t *testing.T) {
response := &pairingtypes.RelayReply{}

messageSet := pairingtypes.RelayCacheSet{
Request: shallowCopy(request),
BlockHash: tt.hash,
ChainID: StubChainID,
Response: response,
Finalized: tt.finalized,
RequestHash: HashRequest(t, request, StubChainID),
BlockHash: tt.hash,
ChainId: StubChainID,
Response: response,
Finalized: tt.finalized,
RequestedBlock: request.RequestBlock,
}

_, err := cacheServer.SetRelay(ctx, &messageSet)
Expand Down Expand Up @@ -278,11 +294,12 @@ func TestCacheSetGetLatest(t *testing.T) {

response := &pairingtypes.RelayReply{LatestBlock: tt.latestBlockForSetRelay}
messageSet := pairingtypes.RelayCacheSet{
Request: shallowCopy(request),
BlockHash: tt.hash,
ChainID: StubChainID,
Response: response,
Finalized: tt.finalized,
RequestHash: HashRequest(t, request, StubChainID),
BlockHash: tt.hash,
ChainId: StubChainID,
Response: response,
Finalized: tt.finalized,
RequestedBlock: request.RequestBlock,
}
_ = utils.LavaFormatDebug("next test", utils.Attribute{Key: "name", Value: tt.name})
_, err := cacheServer.SetRelay(ctx, &messageSet)
Expand All @@ -297,12 +314,15 @@ func TestCacheSetGetLatest(t *testing.T) {
// modify the request to get latest
request.RequestBlock = spectypes.LATEST_BLOCK
messageGet := pairingtypes.RelayCacheGet{
Request: shallowCopy(request),
BlockHash: tt.hash,
ChainID: StubChainID,
Finalized: tt.finalized,
RequestHash: HashRequest(t, request, StubChainID),
BlockHash: tt.hash,
ChainId: StubChainID,
Finalized: tt.finalized,
RequestedBlock: request.RequestBlock,
}

// hashes needs to be equal.
require.Equal(t, messageGet.RequestHash, messageSet.RequestHash)
cacheReply, err := cacheServer.GetRelay(ctx, &messageGet)
if tt.valid {
require.NoError(t, err)
Expand Down Expand Up @@ -356,11 +376,12 @@ func TestCacheSetGetLatestWhenAdvancingLatest(t *testing.T) {

response := &pairingtypes.RelayReply{LatestBlock: tt.latestBlockForSetRelay}
messageSet := pairingtypes.RelayCacheSet{
Request: shallowCopy(request),
BlockHash: tt.hash,
ChainID: StubChainID,
Response: response,
Finalized: tt.finalized,
RequestHash: HashRequest(t, request, StubChainID),
BlockHash: tt.hash,
ChainId: StubChainID,
Response: response,
Finalized: tt.finalized,
RequestedBlock: request.RequestBlock,
}

_, err := cacheServer.SetRelay(ctx, &messageSet)
Expand All @@ -372,10 +393,11 @@ func TestCacheSetGetLatestWhenAdvancingLatest(t *testing.T) {
// modify the request to get latest
request.RequestBlock = spectypes.LATEST_BLOCK
messageGet := pairingtypes.RelayCacheGet{
Request: shallowCopy(request),
BlockHash: tt.hash,
ChainID: StubChainID,
Finalized: tt.finalized,
RequestHash: HashRequest(t, request, StubChainID),
BlockHash: tt.hash,
ChainId: StubChainID,
Finalized: tt.finalized,
RequestedBlock: request.RequestBlock,
}

cacheReply, err := cacheServer.GetRelay(ctx, &messageGet)
Expand All @@ -396,19 +418,21 @@ func TestCacheSetGetLatestWhenAdvancingLatest(t *testing.T) {
request2.Data = []byte(StubData + "nonRelevantData")
response.LatestBlock = latestBlockForRelay + 1
messageSet2 := pairingtypes.RelayCacheSet{
Request: shallowCopy(request2),
BlockHash: tt.hash,
ChainID: StubChainID,
Response: response,
Finalized: tt.finalized,
RequestHash: HashRequest(t, request2, StubChainID),
BlockHash: tt.hash,
ChainId: StubChainID,
Response: response,
Finalized: tt.finalized,
RequestedBlock: request2.RequestBlock,
}
_, err = cacheServer.SetRelay(ctx, &messageSet2) // we use this to increase latest block received
require.NoError(t, err)
messageGet = pairingtypes.RelayCacheGet{
Request: shallowCopy(request),
BlockHash: tt.hash,
ChainID: StubChainID,
Finalized: tt.finalized,
RequestHash: HashRequest(t, request, StubChainID),
BlockHash: tt.hash,
ChainId: StubChainID,
Finalized: tt.finalized,
RequestedBlock: request.RequestBlock,
}
// repeat our latest block get, this time we expect it to look for a newer block and fail
_, err = cacheServer.GetRelay(ctx, &messageGet)
Expand Down Expand Up @@ -452,13 +476,13 @@ func TestCacheSetGetJsonRPCWithID(t *testing.T) {
response := &pairingtypes.RelayReply{
Data: formatIDInJsonResponse(id), // response has the old id when cached
}

messageSet := pairingtypes.RelayCacheSet{
Request: shallowCopy(request),
BlockHash: tt.hash,
ChainID: StubChainID,
Response: response,
Finalized: tt.finalized,
RequestHash: HashRequest(t, request, StubChainID),
BlockHash: tt.hash,
ChainId: StubChainID,
Response: response,
Finalized: tt.finalized,
RequestedBlock: request.RequestBlock,
}

_, err := cacheServer.SetRelay(ctx, &messageSet)
Expand All @@ -471,15 +495,17 @@ func TestCacheSetGetJsonRPCWithID(t *testing.T) {
changedID := id + 1
// now we change the ID:
request.Data = formatIDInJson(changedID)

hash, outputFormatter := HashRequestFormatter(t, request, StubChainID)
messageGet := pairingtypes.RelayCacheGet{
Request: shallowCopy(request),
BlockHash: tt.hash,
ChainID: StubChainID,
Finalized: tt.finalized,
RequestHash: hash,
BlockHash: tt.hash,
ChainId: StubChainID,
Finalized: tt.finalized,
RequestedBlock: request.RequestBlock,
}
cacheReply, err := cacheServer.GetRelay(ctx, &messageGet)
if tt.valid {
cacheReply.Reply.Data = outputFormatter(cacheReply.Reply.Data)
require.NoError(t, err)
result := gjson.GetBytes(cacheReply.GetReply().Data, format.IDFieldName)
extractedID := result.Raw
Expand Down
1 change: 0 additions & 1 deletion ecosystem/cache/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,5 @@ longer DefaultExpirationForNonFinalized will reduce sync QoS for "latest" reques
cacheCmd.Flags().Duration(ExpirationNonFinalizedFlagName, DefaultExpirationForNonFinalized, "how long does a cache entry lasts in the cache for a non finalized entry")
cacheCmd.Flags().String(FlagMetricsAddress, DisabledFlagOption, "address to listen to prometheus metrics 127.0.0.1:5555, later you can curl http://127.0.0.1:5555/metrics")
cacheCmd.Flags().Int64(FlagCacheSizeName, 2*1024*1024*1024, "the maximal amount of entries to save")
cacheCmd.Flags().Bool(FlagUseMethodInApiSpecificCacheMetricsName, false, "use method in the cache specific api metric")
return cacheCmd
}
Loading

0 comments on commit b3bec20

Please sign in to comment.