diff --git a/.golangci.yml b/.golangci.yml index 741ef44576..e362fef5d2 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -29,7 +29,7 @@ linters: - stylecheck - typecheck - unconvert - #- unused we have a lot of unused code that we dont want to delete + #- unused we have a lot of unused code that we don't want to delete - forcetypeassert - gofmt - goimports @@ -56,6 +56,9 @@ issues: - text: "ST1016:" linters: - stylecheck + - text: "SA1019:.*\"github.com/golang/protobuf/proto\" is deprecated.*" # proto is deprecated, but some places couldn't be removed + linters: + - staticcheck - path: "migrations" text: "SA1019:" linters: @@ -72,6 +75,5 @@ linters-settings: suggest-new: true nolintlint: allow-unused: false - allow-leading-space: true require-explanation: false require-specific: false diff --git a/ecosystem/cache/cache_test.go b/ecosystem/cache/cache_test.go index f40e383222..59397de798 100644 --- a/ecosystem/cache/cache_test.go +++ b/ecosystem/cache/cache_test.go @@ -10,6 +10,7 @@ import ( "github.com/lavanet/lava/ecosystem/cache" "github.com/lavanet/lava/ecosystem/cache/format" + "github.com/lavanet/lava/protocol/chainlib" "github.com/lavanet/lava/utils" pairingtypes "github.com/lavanet/lava/x/pairing/types" spectypes "github.com/lavanet/lava/x/spec/types" @@ -30,7 +31,7 @@ const ( func initTest() (context.Context, *cache.RelayerCacheServer) { ctx := context.Background() cs := cache.CacheServer{CacheMaxCost: 2 * 1024 * 1024 * 1024} - cs.InitCache(ctx, cache.DefaultExpirationTimeFinalized, cache.DefaultExpirationForNonFinalized, cache.DisabledFlagOption, true) + cs.InitCache(ctx, cache.DefaultExpirationTimeFinalized, cache.DefaultExpirationForNonFinalized, cache.DisabledFlagOption) cacheServer := &cache.RelayerCacheServer{CacheServer: &cs} return ctx, cacheServer } @@ -62,11 +63,12 @@ func TestCacheSetGet(t *testing.T) { response := &pairingtypes.RelayReply{} messageSet := pairingtypes.RelayCacheSet{ - Request: shallowCopy(request), - BlockHash: tt.hash, - ChainID: StubChainID, - Response: response, - Finalized: tt.finalized, + RequestHash: HashRequest(t, request, StubChainID), + BlockHash: tt.hash, + ChainId: StubChainID, + Response: response, + Finalized: tt.finalized, + RequestedBlock: request.RequestBlock, } _, err := cacheServer.SetRelay(ctx, &messageSet) @@ -77,10 +79,11 @@ func TestCacheSetGet(t *testing.T) { // now to get it messageGet := pairingtypes.RelayCacheGet{ - Request: shallowCopy(request), - BlockHash: tt.hash, - ChainID: StubChainID, - Finalized: tt.finalized, + RequestHash: HashRequest(t, request, StubChainID), + BlockHash: tt.hash, + ChainId: StubChainID, + Finalized: tt.finalized, + RequestedBlock: request.RequestBlock, } _, err = cacheServer.GetRelay(ctx, &messageGet) if tt.valid { @@ -121,6 +124,18 @@ func shallowCopy(request *pairingtypes.RelayPrivateData) *pairingtypes.RelayPriv } } +func HashRequest(t *testing.T, request *pairingtypes.RelayPrivateData, chainId string) []byte { + hash, _, err := chainlib.HashCacheRequest(request, chainId) + require.NoError(t, err) + return hash +} + +func HashRequestFormatter(t *testing.T, request *pairingtypes.RelayPrivateData, chainId string) ([]byte, func([]byte) []byte) { + hash, outputFormatter, err := chainlib.HashCacheRequest(request, chainId) + require.NoError(t, err) + return hash, outputFormatter +} + func TestCacheGetWithoutSet(t *testing.T) { t.Parallel() tests := []struct { @@ -147,12 +162,12 @@ func TestCacheGetWithoutSet(t *testing.T) { request := getRequest(1230, []byte(StubSig), StubApiInterface) // now to get it - messageGet := pairingtypes.RelayCacheGet{ - Request: shallowCopy(request), - BlockHash: tt.hash, - ChainID: StubChainID, - Finalized: tt.finalized, + RequestHash: HashRequest(t, request, StubChainID), + BlockHash: tt.hash, + ChainId: StubChainID, + Finalized: tt.finalized, + RequestedBlock: request.RequestBlock, } _, err := cacheServer.GetRelay(ctx, &messageGet) @@ -196,11 +211,12 @@ func TestCacheFailSetWithInvalidRequestBlock(t *testing.T) { response := &pairingtypes.RelayReply{} messageSet := pairingtypes.RelayCacheSet{ - Request: shallowCopy(request), - BlockHash: tt.hash, - ChainID: StubChainID, - Response: response, - Finalized: tt.finalized, + RequestHash: HashRequest(t, request, StubChainID), + BlockHash: tt.hash, + ChainId: StubChainID, + Response: response, + Finalized: tt.finalized, + RequestedBlock: request.RequestBlock, } _, err := cacheServer.SetRelay(ctx, &messageSet) @@ -278,11 +294,12 @@ func TestCacheSetGetLatest(t *testing.T) { response := &pairingtypes.RelayReply{LatestBlock: tt.latestBlockForSetRelay} messageSet := pairingtypes.RelayCacheSet{ - Request: shallowCopy(request), - BlockHash: tt.hash, - ChainID: StubChainID, - Response: response, - Finalized: tt.finalized, + RequestHash: HashRequest(t, request, StubChainID), + BlockHash: tt.hash, + ChainId: StubChainID, + Response: response, + Finalized: tt.finalized, + RequestedBlock: request.RequestBlock, } _ = utils.LavaFormatDebug("next test", utils.Attribute{Key: "name", Value: tt.name}) _, err := cacheServer.SetRelay(ctx, &messageSet) @@ -297,12 +314,15 @@ func TestCacheSetGetLatest(t *testing.T) { // modify the request to get latest request.RequestBlock = spectypes.LATEST_BLOCK messageGet := pairingtypes.RelayCacheGet{ - Request: shallowCopy(request), - BlockHash: tt.hash, - ChainID: StubChainID, - Finalized: tt.finalized, + RequestHash: HashRequest(t, request, StubChainID), + BlockHash: tt.hash, + ChainId: StubChainID, + Finalized: tt.finalized, + RequestedBlock: request.RequestBlock, } + // hashes needs to be equal. + require.Equal(t, messageGet.RequestHash, messageSet.RequestHash) cacheReply, err := cacheServer.GetRelay(ctx, &messageGet) if tt.valid { require.NoError(t, err) @@ -356,11 +376,12 @@ func TestCacheSetGetLatestWhenAdvancingLatest(t *testing.T) { response := &pairingtypes.RelayReply{LatestBlock: tt.latestBlockForSetRelay} messageSet := pairingtypes.RelayCacheSet{ - Request: shallowCopy(request), - BlockHash: tt.hash, - ChainID: StubChainID, - Response: response, - Finalized: tt.finalized, + RequestHash: HashRequest(t, request, StubChainID), + BlockHash: tt.hash, + ChainId: StubChainID, + Response: response, + Finalized: tt.finalized, + RequestedBlock: request.RequestBlock, } _, err := cacheServer.SetRelay(ctx, &messageSet) @@ -372,10 +393,11 @@ func TestCacheSetGetLatestWhenAdvancingLatest(t *testing.T) { // modify the request to get latest request.RequestBlock = spectypes.LATEST_BLOCK messageGet := pairingtypes.RelayCacheGet{ - Request: shallowCopy(request), - BlockHash: tt.hash, - ChainID: StubChainID, - Finalized: tt.finalized, + RequestHash: HashRequest(t, request, StubChainID), + BlockHash: tt.hash, + ChainId: StubChainID, + Finalized: tt.finalized, + RequestedBlock: request.RequestBlock, } cacheReply, err := cacheServer.GetRelay(ctx, &messageGet) @@ -396,19 +418,21 @@ func TestCacheSetGetLatestWhenAdvancingLatest(t *testing.T) { request2.Data = []byte(StubData + "nonRelevantData") response.LatestBlock = latestBlockForRelay + 1 messageSet2 := pairingtypes.RelayCacheSet{ - Request: shallowCopy(request2), - BlockHash: tt.hash, - ChainID: StubChainID, - Response: response, - Finalized: tt.finalized, + RequestHash: HashRequest(t, request2, StubChainID), + BlockHash: tt.hash, + ChainId: StubChainID, + Response: response, + Finalized: tt.finalized, + RequestedBlock: request2.RequestBlock, } _, err = cacheServer.SetRelay(ctx, &messageSet2) // we use this to increase latest block received require.NoError(t, err) messageGet = pairingtypes.RelayCacheGet{ - Request: shallowCopy(request), - BlockHash: tt.hash, - ChainID: StubChainID, - Finalized: tt.finalized, + RequestHash: HashRequest(t, request, StubChainID), + BlockHash: tt.hash, + ChainId: StubChainID, + Finalized: tt.finalized, + RequestedBlock: request.RequestBlock, } // repeat our latest block get, this time we expect it to look for a newer block and fail _, err = cacheServer.GetRelay(ctx, &messageGet) @@ -452,13 +476,13 @@ func TestCacheSetGetJsonRPCWithID(t *testing.T) { response := &pairingtypes.RelayReply{ Data: formatIDInJsonResponse(id), // response has the old id when cached } - messageSet := pairingtypes.RelayCacheSet{ - Request: shallowCopy(request), - BlockHash: tt.hash, - ChainID: StubChainID, - Response: response, - Finalized: tt.finalized, + RequestHash: HashRequest(t, request, StubChainID), + BlockHash: tt.hash, + ChainId: StubChainID, + Response: response, + Finalized: tt.finalized, + RequestedBlock: request.RequestBlock, } _, err := cacheServer.SetRelay(ctx, &messageSet) @@ -471,15 +495,17 @@ func TestCacheSetGetJsonRPCWithID(t *testing.T) { changedID := id + 1 // now we change the ID: request.Data = formatIDInJson(changedID) - + hash, outputFormatter := HashRequestFormatter(t, request, StubChainID) messageGet := pairingtypes.RelayCacheGet{ - Request: shallowCopy(request), - BlockHash: tt.hash, - ChainID: StubChainID, - Finalized: tt.finalized, + RequestHash: hash, + BlockHash: tt.hash, + ChainId: StubChainID, + Finalized: tt.finalized, + RequestedBlock: request.RequestBlock, } cacheReply, err := cacheServer.GetRelay(ctx, &messageGet) if tt.valid { + cacheReply.Reply.Data = outputFormatter(cacheReply.Reply.Data) require.NoError(t, err) result := gjson.GetBytes(cacheReply.GetReply().Data, format.IDFieldName) extractedID := result.Raw diff --git a/ecosystem/cache/command.go b/ecosystem/cache/command.go index 410ff97120..3d1dbf640b 100644 --- a/ecosystem/cache/command.go +++ b/ecosystem/cache/command.go @@ -44,6 +44,5 @@ longer DefaultExpirationForNonFinalized will reduce sync QoS for "latest" reques cacheCmd.Flags().Duration(ExpirationNonFinalizedFlagName, DefaultExpirationForNonFinalized, "how long does a cache entry lasts in the cache for a non finalized entry") cacheCmd.Flags().String(FlagMetricsAddress, DisabledFlagOption, "address to listen to prometheus metrics 127.0.0.1:5555, later you can curl http://127.0.0.1:5555/metrics") cacheCmd.Flags().Int64(FlagCacheSizeName, 2*1024*1024*1024, "the maximal amount of entries to save") - cacheCmd.Flags().Bool(FlagUseMethodInApiSpecificCacheMetricsName, false, "use method in the cache specific api metric") return cacheCmd } diff --git a/ecosystem/cache/handlers.go b/ecosystem/cache/handlers.go index 5369c5d2fc..ff87cbcb7a 100644 --- a/ecosystem/cache/handlers.go +++ b/ecosystem/cache/handlers.go @@ -3,7 +3,7 @@ package cache import ( "bytes" "context" - "encoding/json" + "encoding/binary" "fmt" "math" "strconv" @@ -13,11 +13,10 @@ import ( sdkerrors "cosmossdk.io/errors" "github.com/dgraph-io/ristretto" - "github.com/lavanet/lava/ecosystem/cache/format" - rpcInterfaceMessages "github.com/lavanet/lava/protocol/chainlib/chainproxy/rpcInterfaceMessages" "github.com/lavanet/lava/protocol/lavaprotocol" "github.com/lavanet/lava/protocol/parser" "github.com/lavanet/lava/utils" + "github.com/lavanet/lava/utils/slices" pairingtypes "github.com/lavanet/lava/x/pairing/types" spectypes "github.com/lavanet/lava/x/spec/types" emptypb "google.golang.org/protobuf/types/known/emptypb" @@ -45,12 +44,14 @@ type CacheValue struct { Response pairingtypes.RelayReply Hash []byte OptionalMetadata []pairingtypes.Metadata + SeenBlock int64 } func (cv *CacheValue) ToCacheReply() *pairingtypes.CacheRelayReply { return &pairingtypes.CacheRelayReply{ Reply: &cv.Response, OptionalMetadata: cv.OptionalMetadata, + SeenBlock: cv.SeenBlock, } } @@ -86,60 +87,97 @@ func (s *RelayerCacheServer) getSeenBlockForSharedStateMode(chainId string, shar func (s *RelayerCacheServer) GetRelay(ctx context.Context, relayCacheGet *pairingtypes.RelayCacheGet) (*pairingtypes.CacheRelayReply, error) { cacheReply := &pairingtypes.CacheRelayReply{} + var cacheReplyTmp *pairingtypes.CacheRelayReply var err error var seenBlock int64 - waitGroup := sync.WaitGroup{} - waitGroup.Add(2) // currently we have two groups getRelayInner and getSeenBlock - requestedBlock := relayCacheGet.Request.RequestBlock // save requested block - - // fetch all reads at the same time. - go func() { - defer waitGroup.Done() - var cacheReplyTmp *pairingtypes.CacheRelayReply - cacheReplyTmp, err = s.getRelayInner(ctx, relayCacheGet) - if cacheReplyTmp != nil { - cacheReply = cacheReplyTmp // set cache reply only if its not nil, as we need to store seen block in it. - } - }() - go func() { - defer waitGroup.Done() - // set seen block if required - seenBlock = s.getSeenBlockForSharedStateMode(relayCacheGet.ChainID, relayCacheGet.SharedStateId) - }() - - // wait for all reads to complete before moving forward - waitGroup.Wait() - // set seen block. - if seenBlock > cacheReply.SeenBlock { - cacheReply.SeenBlock = seenBlock + originalRequestedBlock := relayCacheGet.RequestedBlock // save requested block prior to swap + if originalRequestedBlock < 0 { // we need to fetch stored latest block information. + getLatestBlock := s.getLatestBlock(latestBlockKey(relayCacheGet.ChainId, "")) + relayCacheGet.RequestedBlock = lavaprotocol.ReplaceRequestedBlock(originalRequestedBlock, getLatestBlock) } - var hit bool - if err != nil { - s.cacheMiss(ctx, err) + utils.LavaFormatDebug("Got Cache Get", utils.Attribute{Key: "request_hash", Value: string(relayCacheGet.RequestHash)}, + utils.Attribute{Key: "finalized", Value: relayCacheGet.Finalized}, + utils.Attribute{Key: "requested_block", Value: originalRequestedBlock}, + utils.Attribute{Key: "block_hash", Value: relayCacheGet.BlockHash}, + utils.Attribute{Key: "requested_block_parsed", Value: relayCacheGet.RequestedBlock}, + utils.Attribute{Key: "seen_block", Value: relayCacheGet.SeenBlock}, + ) + if relayCacheGet.RequestedBlock >= 0 { // we can only fetch + // check seen block is larger than our requested block, we don't need to fetch seen block prior as its already larger than requested block + waitGroup := sync.WaitGroup{} + waitGroup.Add(2) // currently we have two groups getRelayInner and getSeenBlock + // fetch all reads at the same time. + go func() { + defer waitGroup.Done() + cacheReplyTmp, err = s.getRelayInner(relayCacheGet) + if cacheReplyTmp != nil { + cacheReply = cacheReplyTmp // set cache reply only if its not nil, as we need to store seen block in it. + } + }() + go func() { + defer waitGroup.Done() + // set seen block if required + seenBlock = s.getSeenBlockForSharedStateMode(relayCacheGet.ChainId, relayCacheGet.SharedStateId) + if seenBlock > relayCacheGet.SeenBlock { + relayCacheGet.SeenBlock = seenBlock // update state. + } + }() + // wait for all reads to complete before moving forward + waitGroup.Wait() + + // validate that the response seen block is larger or equal to our expectations. + if cacheReply.SeenBlock < slices.Min([]int64{relayCacheGet.SeenBlock, relayCacheGet.RequestedBlock}) { // TODO unitest this. + // Error, our reply seen block is not larger than our expectations, meaning we got an old response + // this can happen only in the case relayCacheGet.SeenBlock < relayCacheGet.RequestedBlock + // by setting the err variable we will get a cache miss, and the relay will continue to the node. + err = utils.LavaFormatDebug("reply seen block is smaller than our expectations", + utils.LogAttr("cacheReply.SeenBlock", cacheReply.SeenBlock), + utils.LogAttr("seenBlock", relayCacheGet.SeenBlock), + ) + } + // set seen block. + if relayCacheGet.SeenBlock > cacheReply.SeenBlock { + cacheReply.SeenBlock = relayCacheGet.SeenBlock + } } else { - hit = true - s.cacheHit(ctx) + // set the error so cache miss will trigger. + err = utils.LavaFormatDebug("Requested block is invalid", + utils.LogAttr("requested block", relayCacheGet.RequestedBlock), + utils.LogAttr("request_hash", string(relayCacheGet.RequestHash)), + ) } - // add prometheus metrics - s.CacheServer.CacheMetrics.AddApiSpecific(requestedBlock, relayCacheGet.ChainID, getMethodFromRequest(relayCacheGet), relayCacheGet.Request.ApiInterface, hit) + + // add prometheus metrics asynchronously + go func() { + cacheMetricsContext, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + var hit bool + if err != nil { + s.cacheMiss(cacheMetricsContext, err) + } else { + hit = true + s.cacheHit(cacheMetricsContext) + } + s.CacheServer.CacheMetrics.AddApiSpecific(originalRequestedBlock, relayCacheGet.ChainId, hit) + }() return cacheReply, err } -func (s *RelayerCacheServer) getRelayInner(ctx context.Context, relayCacheGet *pairingtypes.RelayCacheGet) (*pairingtypes.CacheRelayReply, error) { - inputFormatter, outputFormatter := format.FormatterForRelayRequestAndResponse(relayCacheGet.Request.ApiInterface) - relayCacheGet.Request.Data = inputFormatter(relayCacheGet.Request.Data) - requestedBlock := relayCacheGet.Request.RequestBlock - getLatestBlock := s.getLatestBlock(latestBlockKey(relayCacheGet.ChainID, relayCacheGet.Provider)) - relayCacheGet.Request.RequestBlock = lavaprotocol.ReplaceRequestedBlock(requestedBlock, getLatestBlock) - cacheKey := formatCacheKey(relayCacheGet.Request.ApiInterface, relayCacheGet.ChainID, relayCacheGet.Request, relayCacheGet.Provider) - utils.LavaFormatDebug("Got Cache Get", utils.Attribute{Key: "cacheKey", Value: parser.CapStringLen(cacheKey)}, - utils.Attribute{Key: "finalized", Value: relayCacheGet.Finalized}, - utils.Attribute{Key: "requestedBlock", Value: requestedBlock}, - utils.Attribute{Key: "requestHash", Value: relayCacheGet.BlockHash}, - utils.Attribute{Key: "getLatestBlock", Value: relayCacheGet.Request.RequestBlock}, - ) +// formatHashKey formats the hash key by adding latestBlock information. +func (s *RelayerCacheServer) formatHashKey(hash []byte, parsedRequestedBlock int64) []byte { + // Append the latestBlock and seenBlock directly to the hash using little-endian encoding + hash = binary.LittleEndian.AppendUint64(hash, uint64(parsedRequestedBlock)) + return hash +} + +func (s *RelayerCacheServer) getRelayInner(relayCacheGet *pairingtypes.RelayCacheGet) (*pairingtypes.CacheRelayReply, error) { + // cache key is compressed from: + // 1. Request hash including all the information inside RelayPrivateData (Salt can cause issues if not dealt with on consumer side.) + // 2. chain-id (same requests for different chains should get unique results) + // 3. seen block to distinguish between seen entries and unseen entries. + cacheKey := s.formatHashKey(relayCacheGet.RequestHash, relayCacheGet.RequestedBlock) cacheVal, cache_source, found := s.findInAllCaches(relayCacheGet.Finalized, cacheKey) // TODO: use the information when a new block is finalized if !found { @@ -147,7 +185,6 @@ func (s *RelayerCacheServer) getRelayInner(ctx context.Context, relayCacheGet *p } if cacheVal.Hash == nil { // if we didn't store a hash its also always a match - cacheVal.Response.Data = outputFormatter(cacheVal.Response.Data) utils.LavaFormatDebug("returning response", utils.Attribute{Key: "cache_source", Value: cache_source}, utils.Attribute{Key: "hash", Value: "nil"}, utils.Attribute{Key: "response_data", Value: parser.CapStringLen(string(cacheVal.Response.Data))}, @@ -156,7 +193,6 @@ func (s *RelayerCacheServer) getRelayInner(ctx context.Context, relayCacheGet *p } // entry found, now we check the hash requested and hash stored if bytes.Equal(cacheVal.Hash, relayCacheGet.BlockHash) { - cacheVal.Response.Data = outputFormatter(cacheVal.Response.Data) utils.LavaFormatDebug("returning response", utils.Attribute{Key: "cache_source", Value: cache_source}, utils.Attribute{Key: "hash", Value: "match"}, utils.Attribute{Key: "response_data", Value: parser.CapStringLen(string(cacheVal.Response.Data))}, @@ -213,32 +249,31 @@ func (s *RelayerCacheServer) setSeenBlockOnSharedStateMode(chainId, sharedStateI } func (s *RelayerCacheServer) SetRelay(ctx context.Context, relayCacheSet *pairingtypes.RelayCacheSet) (*emptypb.Empty, error) { - if relayCacheSet.Request.RequestBlock < 0 { - return nil, utils.LavaFormatError("invalid relay cache set data, request block is negative", nil, utils.Attribute{Key: "requestBlock", Value: relayCacheSet.Request.RequestBlock}) + if relayCacheSet.RequestedBlock < 0 { + return nil, utils.LavaFormatError("invalid relay cache set data, request block is negative", nil, utils.Attribute{Key: "requestBlock", Value: relayCacheSet.RequestedBlock}) } - // TODO: make this non-blocking - inputFormatter, _ := format.FormatterForRelayRequestAndResponse(relayCacheSet.Request.ApiInterface) - relayCacheSet.Request.Data = inputFormatter(relayCacheSet.Request.Data) // so we can find the entry regardless of id + // Getting the max block number between the seen block on the consumer side vs the latest block on the response of the provider + latestKnownBlock := int64(math.Max(float64(relayCacheSet.Response.LatestBlock), float64(relayCacheSet.SeenBlock))) - cacheKey := formatCacheKey(relayCacheSet.Request.ApiInterface, relayCacheSet.ChainID, relayCacheSet.Request, relayCacheSet.Provider) - cacheValue := formatCacheValue(relayCacheSet.Response, relayCacheSet.BlockHash, relayCacheSet.Finalized, relayCacheSet.OptionalMetadata) - utils.LavaFormatDebug("Got Cache Set", utils.Attribute{Key: "cacheKey", Value: parser.CapStringLen(cacheKey)}, + cacheKey := s.formatHashKey(relayCacheSet.RequestHash, relayCacheSet.RequestedBlock) + cacheValue := formatCacheValue(relayCacheSet.Response, relayCacheSet.BlockHash, relayCacheSet.Finalized, relayCacheSet.OptionalMetadata, latestKnownBlock) + utils.LavaFormatDebug("Got Cache Set", utils.Attribute{Key: "cacheKey", Value: string(cacheKey)}, utils.Attribute{Key: "finalized", Value: fmt.Sprintf("%t", relayCacheSet.Finalized)}, + utils.Attribute{Key: "requested_block", Value: relayCacheSet.RequestedBlock}, utils.Attribute{Key: "response_data", Value: parser.CapStringLen(string(relayCacheSet.Response.Data))}, - utils.Attribute{Key: "requestHash", Value: string(relayCacheSet.BlockHash)}) + utils.Attribute{Key: "requestHash", Value: string(relayCacheSet.BlockHash)}, + utils.Attribute{Key: "latestKnownBlock", Value: string(relayCacheSet.BlockHash)}) // finalized entries can stay there if relayCacheSet.Finalized { cache := s.CacheServer.finalizedCache cache.SetWithTTL(cacheKey, cacheValue, cacheValue.Cost(), s.CacheServer.ExpirationFinalized) } else { cache := s.CacheServer.tempCache - cache.SetWithTTL(cacheKey, cacheValue, cacheValue.Cost(), s.getExpirationForChain(relayCacheSet.ChainID, relayCacheSet.BlockHash)) + cache.SetWithTTL(cacheKey, cacheValue, cacheValue.Cost(), s.getExpirationForChain(time.Duration(relayCacheSet.AverageBlockTime), relayCacheSet.BlockHash)) } // Setting the seen block for shared state. - // Getting the max block number between the seen block on the consumer side vs the latest block on the response of the provider - latestKnownBlock := int64(math.Max(float64(relayCacheSet.Response.LatestBlock), float64(relayCacheSet.Request.SeenBlock))) - s.setSeenBlockOnSharedStateMode(relayCacheSet.ChainID, relayCacheSet.SharedStateId, latestKnownBlock) - s.setLatestBlock(latestBlockKey(relayCacheSet.ChainID, relayCacheSet.Provider), latestKnownBlock) + s.setSeenBlockOnSharedStateMode(relayCacheSet.ChainId, relayCacheSet.SharedStateId, latestKnownBlock) + s.setLatestBlock(latestBlockKey(relayCacheSet.ChainId, ""), latestKnownBlock) return &emptypb.Empty{}, nil } @@ -302,17 +337,17 @@ func (s *RelayerCacheServer) setLatestBlock(key string, latestBlock int64) { s.performInt64WriteWithValidationAndRetry(get, set, latestBlock) } -func (s *RelayerCacheServer) getExpirationForChain(chainID string, blockHash []byte) time.Duration { +func (s *RelayerCacheServer) getExpirationForChain(averageBlockTimeForChain time.Duration, blockHash []byte) time.Duration { if blockHash != nil { // this means that this entry has a block hash, so we don't have to delete it quickly return s.CacheServer.ExpirationFinalized } // if there is no block hash, for non finalized we cant know if there was a fork, so we have to delete it as soon as we have new data // with the assumption new data should arrive by the arrival of a new block (average block time) - return s.CacheServer.ExpirationForChain(chainID) + return s.CacheServer.ExpirationForChain(averageBlockTimeForChain) } -func getNonExpiredFromCache(c *ristretto.Cache, key string) (value interface{}, found bool) { +func getNonExpiredFromCache(c *ristretto.Cache, key interface{}) (value interface{}, found bool) { value, found = c.Get(key) if found { return value, true @@ -320,8 +355,8 @@ func getNonExpiredFromCache(c *ristretto.Cache, key string) (value interface{}, return nil, false } -func (s *RelayerCacheServer) findInAllCaches(finalized bool, cacheKey string) (retVal CacheValue, cacheSource string, found bool) { - inner := func(finalized bool, cacheKey string) (interface{}, string, bool) { +func (s *RelayerCacheServer) findInAllCaches(finalized bool, cacheKey []byte) (retVal CacheValue, cacheSource string, found bool) { + inner := func(finalized bool, cacheKey []byte) (interface{}, string, bool) { if finalized { cache := s.CacheServer.finalizedCache value, found := getNonExpiredFromCache(cache, cacheKey) @@ -362,20 +397,7 @@ func (s *RelayerCacheServer) findInAllCaches(finalized bool, cacheKey string) (r return CacheValue{}, "", false } -func formatCacheKey(apiInterface string, chainID string, request *pairingtypes.RelayPrivateData, provider string) string { - return chainID + SEP + usedFieldsFromRequest(request, provider) -} - -func usedFieldsFromRequest(request *pairingtypes.RelayPrivateData, provider string) string { - // used fields: - // RelayData except for salt: because it defines the query - // Provider: because we want to keep coherence between calls, assuming different providers can return different forks, useful for cache in rpcconsumer - request.Salt = nil - relayDataStr := request.String() - return relayDataStr + SEP + provider -} - -func formatCacheValue(response *pairingtypes.RelayReply, hash []byte, finalized bool, optionalMetadata []pairingtypes.Metadata) CacheValue { +func formatCacheValue(response *pairingtypes.RelayReply, hash []byte, finalized bool, optionalMetadata []pairingtypes.Metadata, seenBlock int64) CacheValue { response.Sig = []byte{} // make sure we return a signed value, as the output was modified by our outputParser if !finalized { // hash value is only used on non finalized entries to check for forks @@ -383,6 +405,7 @@ func formatCacheValue(response *pairingtypes.RelayReply, hash []byte, finalized Response: *response, Hash: hash, OptionalMetadata: optionalMetadata, + SeenBlock: seenBlock, } } // no need to store the hash value for finalized entries @@ -390,6 +413,7 @@ func formatCacheValue(response *pairingtypes.RelayReply, hash []byte, finalized Response: *response, Hash: nil, OptionalMetadata: optionalMetadata, + SeenBlock: seenBlock, } } @@ -399,15 +423,3 @@ func latestBlockKey(chainID string, uniqueId string) string { // because we want to support coherence in providers return chainID + "_" + uniqueId } - -func getMethodFromRequest(relayCacheGet *pairingtypes.RelayCacheGet) string { - if relayCacheGet.Request.ApiUrl != "" { - return relayCacheGet.Request.ApiUrl - } - var msg rpcInterfaceMessages.JsonrpcMessage - err := json.Unmarshal(relayCacheGet.Request.Data, &msg) - if err != nil { - return "failed_parsing_method" - } - return msg.Method -} diff --git a/ecosystem/cache/metrics.go b/ecosystem/cache/metrics.go index 57cb9dccb8..0bd50813c6 100644 --- a/ecosystem/cache/metrics.go +++ b/ecosystem/cache/metrics.go @@ -17,14 +17,13 @@ const ( ) type CacheMetrics struct { - lock sync.RWMutex - totalHits *prometheus.CounterVec - totalMisses *prometheus.CounterVec - apiSpecifics *prometheus.GaugeVec - useMethodInApiSpecificMetric bool + lock sync.RWMutex + totalHits *prometheus.CounterVec + totalMisses *prometheus.CounterVec + apiSpecifics *prometheus.GaugeVec } -func NewCacheMetricsServer(listenAddress string, useMethodInApiSpecificMetric bool) *CacheMetrics { +func NewCacheMetricsServer(listenAddress string) *CacheMetrics { if listenAddress == DisabledFlagOption { utils.LavaFormatWarning("prometheus endpoint inactive, option is disabled", nil) return nil @@ -39,14 +38,7 @@ func NewCacheMetricsServer(listenAddress string, useMethodInApiSpecificMetric bo Help: "The total number of misses the cache server could not reply.", }, []string{totalMissesKey}) - var apiSpecificsLabelNames []string - - if useMethodInApiSpecificMetric { - apiSpecificsLabelNames = []string{"requested_block", "chain_id", "method", "api_interface", "result"} - } else { - apiSpecificsLabelNames = []string{"requested_block", "chain_id", "api_interface", "result"} - } - + apiSpecificsLabelNames := []string{"requested_block", "chain_id", "result"} apiSpecifics := prometheus.NewGaugeVec(prometheus.GaugeOpts{ Name: "cache_api_specifics", Help: "api specific information", @@ -61,10 +53,9 @@ func NewCacheMetricsServer(listenAddress string, useMethodInApiSpecificMetric bo http.ListenAndServe(listenAddress, nil) }() return &CacheMetrics{ - totalHits: totalHits, - totalMisses: totalMisses, - apiSpecifics: apiSpecifics, - useMethodInApiSpecificMetric: useMethodInApiSpecificMetric, + totalHits: totalHits, + totalMisses: totalMisses, + apiSpecifics: apiSpecifics, } } @@ -82,11 +73,10 @@ func (c *CacheMetrics) addMiss() { c.totalMisses.WithLabelValues(totalMissesKey).Add(1) } -func (c *CacheMetrics) AddApiSpecific(block int64, chainId string, method string, apiInterface string, hit bool) { +func (c *CacheMetrics) AddApiSpecific(block int64, chainId string, hit bool) { if c == nil { return } - requestedBlock := "specific" if spectypes.LATEST_BLOCK == block { requestedBlock = "latest" @@ -97,18 +87,14 @@ func (c *CacheMetrics) AddApiSpecific(block int64, chainId string, method string c.lock.Lock() defer c.lock.Unlock() if hit { - c.apiSpecificWithMethodIfNeeded(requestedBlock, chainId, method, apiInterface, "hit") + c.apiSpecificWithMethodIfNeeded(requestedBlock, chainId, "hit") c.addHit() } else { - c.apiSpecificWithMethodIfNeeded(requestedBlock, chainId, method, apiInterface, "miss") + c.apiSpecificWithMethodIfNeeded(requestedBlock, chainId, "miss") c.addMiss() } } -func (c *CacheMetrics) apiSpecificWithMethodIfNeeded(requestedBlock, chainId, method, apiInterface, hitOrMiss string) { - if c.useMethodInApiSpecificMetric { - c.apiSpecifics.WithLabelValues(requestedBlock, chainId, method, apiInterface, hitOrMiss).Add(1) // Removed "specifics" label - } else { - c.apiSpecifics.WithLabelValues(requestedBlock, chainId, apiInterface, hitOrMiss).Add(1) // Removed "specifics" and "method" label - } +func (c *CacheMetrics) apiSpecificWithMethodIfNeeded(requestedBlock, chainId, hitOrMiss string) { + c.apiSpecifics.WithLabelValues(requestedBlock, chainId, hitOrMiss).Add(1) // Removed "specifics" label } diff --git a/ecosystem/cache/server.go b/ecosystem/cache/server.go index 0facf08dc4..4c801047ba 100644 --- a/ecosystem/cache/server.go +++ b/ecosystem/cache/server.go @@ -9,6 +9,8 @@ import ( "os/signal" "time" + "github.com/lavanet/lava/utils/slices" + "github.com/dgraph-io/ristretto" "github.com/improbable-eng/grpc-web/go/grpcweb" "github.com/lavanet/lava/utils" @@ -20,13 +22,12 @@ import ( ) const ( - ExpirationFlagName = "expiration" - ExpirationNonFinalizedFlagName = "expiration-non-finalized" - FlagCacheSizeName = "max-items" - FlagUseMethodInApiSpecificCacheMetricsName = "use-method-in-cache-metrics" - DefaultExpirationForNonFinalized = 500 * time.Millisecond - DefaultExpirationTimeFinalized = time.Hour - CacheNumCounters = 100000000 // expect 10M items + ExpirationFlagName = "expiration" + ExpirationNonFinalizedFlagName = "expiration-non-finalized" + FlagCacheSizeName = "max-items" + DefaultExpirationForNonFinalized = 500 * time.Millisecond + DefaultExpirationTimeFinalized = time.Hour + CacheNumCounters = 100000000 // expect 10M items ) type CacheServer struct { @@ -38,7 +39,7 @@ type CacheServer struct { CacheMaxCost int64 } -func (cs *CacheServer) InitCache(ctx context.Context, expiration time.Duration, expirationNonFinalized time.Duration, metricsAddr string, useMethodInApiSpecificMetric bool) { +func (cs *CacheServer) InitCache(ctx context.Context, expiration time.Duration, expirationNonFinalized time.Duration, metricsAddr string) { cs.ExpirationFinalized = expiration cs.ExpirationNonFinalized = expirationNonFinalized cache, err := ristretto.NewCache(&ristretto.Config{NumCounters: CacheNumCounters, MaxCost: cs.CacheMaxCost, BufferItems: 64}) @@ -54,7 +55,7 @@ func (cs *CacheServer) InitCache(ctx context.Context, expiration time.Duration, cs.finalizedCache = cache // initialize prometheus - cs.CacheMetrics = NewCacheMetricsServer(metricsAddr, useMethodInApiSpecificMetric) + cs.CacheMetrics = NewCacheMetricsServer(metricsAddr) } func (cs *CacheServer) Serve(ctx context.Context, @@ -112,9 +113,9 @@ func (cs *CacheServer) Serve(ctx context.Context, } } -func (cs *CacheServer) ExpirationForChain(chainID string) time.Duration { - // TODO: query spec from lava for average block time and put here duration max(blockTime/2, 200ms) - return cs.ExpirationNonFinalized +func (cs *CacheServer) ExpirationForChain(averageBlockTimeForChain time.Duration) time.Duration { + eighthBlock := averageBlockTimeForChain / 8 + return slices.Max([]time.Duration{eighthBlock, cs.ExpirationNonFinalized}) // return the maximum TTL between an eighth block and expiration } func Server( @@ -139,12 +140,7 @@ func Server( } cs := CacheServer{CacheMaxCost: cacheMaxCost} - useMethodInApiSpecificMetric, err := flags.GetBool(FlagUseMethodInApiSpecificCacheMetricsName) - if err != nil { - utils.LavaFormatFatal("failed to read flag", err, utils.Attribute{Key: "flag", Value: FlagUseMethodInApiSpecificCacheMetricsName}) - } - - cs.InitCache(ctx, expiration, expirationNonFinalized, metricsAddr, useMethodInApiSpecificMetric) + cs.InitCache(ctx, expiration, expirationNonFinalized, metricsAddr) // TODO: have a state tracker cs.Serve(ctx, listenAddr) } diff --git a/proto/lavanet/lava/pairing/relayCache.proto b/proto/lavanet/lava/pairing/relayCache.proto index f278b767ea..8ad1e37387 100644 --- a/proto/lavanet/lava/pairing/relayCache.proto +++ b/proto/lavanet/lava/pairing/relayCache.proto @@ -23,22 +23,31 @@ message CacheUsage { uint64 CacheMisses = 2; } -message RelayCacheGet { +// data containing the cache key hash +message CacheHash { RelayPrivateData request = 1; - bytes blockHash = 2; - string chainID = 3; //Used to differentiate between different chains so each has its own bucket - bool finalized = 4; - string provider = 5; - string shared_state_id = 6; // empty id for no shared state + string chain_id = 2; +} + +message RelayCacheGet { + bytes request_hash = 1; // hash of the RelayPrivateData + bytes block_hash = 2; + bool finalized = 3; + int64 requested_block = 4; + string shared_state_id = 5; // empty id for no shared state + string chain_id = 6; // used to set latest block per chain. + int64 seen_block = 7; } message RelayCacheSet { - RelayPrivateData request = 1; - bytes blockHash = 2; - string chainID = 3; //Used to differentiate between different chains so each has its own bucket - RelayReply response = 4; - bool finalized = 5; - string provider = 6; - repeated Metadata optional_metadata = 7 [(gogoproto.nullable) = false]; - string shared_state_id = 8; // empty id for no shared state + bytes request_hash = 1; // hash of the RelayPrivateData + bytes block_hash = 2; + RelayReply response = 3; + bool finalized = 4; + repeated Metadata optional_metadata = 5 [(gogoproto.nullable) = false]; + string shared_state_id = 6; // empty id for no shared state + int64 requested_block = 7; + string chain_id = 9; // used to set latest block per chain. + int64 seen_block = 10; + int64 average_block_time = 11; } \ No newline at end of file diff --git a/protocol/chainlib/chain_fetcher.go b/protocol/chainlib/chain_fetcher.go index 8daf2458ed..5397e7e3c5 100644 --- a/protocol/chainlib/chain_fetcher.go +++ b/protocol/chainlib/chain_fetcher.go @@ -7,17 +7,19 @@ import ( "sync/atomic" "time" - "golang.org/x/exp/slices" - "github.com/cosmos/cosmos-sdk/client" + "github.com/golang/protobuf/proto" + formatter "github.com/lavanet/lava/ecosystem/cache/format" "github.com/lavanet/lava/protocol/chainlib/chainproxy" "github.com/lavanet/lava/protocol/common" "github.com/lavanet/lava/protocol/lavasession" "github.com/lavanet/lava/protocol/parser" "github.com/lavanet/lava/protocol/performance" "github.com/lavanet/lava/utils" + "github.com/lavanet/lava/utils/sigs" pairingtypes "github.com/lavanet/lava/x/pairing/types" spectypes "github.com/lavanet/lava/x/spec/types" + "golang.org/x/exp/slices" ) const ( @@ -93,7 +95,26 @@ func (cf *ChainFetcher) populateCache(relayData *pairingtypes.RelayPrivateData, new_ctx, cancel := context.WithTimeout(new_ctx, common.DataReliabilityTimeoutIncrease) defer cancel() // provider side doesn't use SharedStateId, so we default it to empty so it wont have effect. - err := cf.cache.SetEntry(new_ctx, &pairingtypes.RelayCacheSet{Request: relayData, BlockHash: requestedBlockHash, ChainID: cf.endpoint.ChainID, Response: reply, Finalized: finalized, OptionalMetadata: nil, SharedStateId: ""}) + + hash, _, err := HashCacheRequest(relayData, cf.endpoint.ChainID) + if err != nil { + utils.LavaFormatError("populateCache Failed getting Hash for request", err) + return + } + + _, averageBlockTime, _, _ := cf.chainParser.ChainBlockStats() + err = cf.cache.SetEntry(new_ctx, &pairingtypes.RelayCacheSet{ + RequestHash: hash, + BlockHash: requestedBlockHash, + ChainId: cf.endpoint.ChainID, + Response: reply, + Finalized: finalized, + OptionalMetadata: nil, + RequestedBlock: relayData.RequestBlock, + SeenBlock: relayData.SeenBlock, // seen block is latestBlock so it will hit consumers requesting it. + SharedStateId: "", + AverageBlockTime: int64(averageBlockTime), + }) if err != nil { utils.LavaFormatWarning("chain fetcher error updating cache with new entry", err) } @@ -274,7 +295,7 @@ func (cf *ChainFetcher) FetchLatestBlockNum(ctx context.Context) (int64, error) return blockNum, nil } -func (cf *ChainFetcher) constructRelayData(conectionType string, path string, data []byte, requestBlock int64, addon string, extensions []string) *pairingtypes.RelayPrivateData { +func (cf *ChainFetcher) constructRelayData(conectionType string, path string, data []byte, requestBlock int64, addon string, extensions []string, latestBlock int64) *pairingtypes.RelayPrivateData { relayData := &pairingtypes.RelayPrivateData{ ConnectionType: conectionType, ApiUrl: path, @@ -284,6 +305,7 @@ func (cf *ChainFetcher) constructRelayData(conectionType string, path string, da Metadata: nil, Addon: addon, Extensions: extensions, + SeenBlock: latestBlock, } return relayData } @@ -334,7 +356,7 @@ func (cf *ChainFetcher) FetchBlockHashByNum(ctx context.Context, blockNum int64) latestBlock := atomic.LoadInt64(&cf.latestBlock) // assuming FetchLatestBlockNum is called before this one it's always true if latestBlock > 0 { finalized := spectypes.IsFinalizedBlock(blockNum, latestBlock, blockDistanceToFinalization) - cf.populateCache(cf.constructRelayData(collectionData.Type, path, data, blockNum, "", nil), reply, []byte(res), finalized) + cf.populateCache(cf.constructRelayData(collectionData.Type, path, data, blockNum, "", nil, latestBlock), reply, []byte(res), finalized) } return res, nil } @@ -452,3 +474,42 @@ func NewVerificationsOnlyChainFetcher(ctx context.Context, chainRouter ChainRout cf := &DummyChainFetcher{ChainFetcher: &cfi} return cf } + +// this method will calculate the request hash by changing the original object, and returning the data back to it after calculating the hash +// couldn't be used in parallel +func HashCacheRequest(relayData *pairingtypes.RelayPrivateData, chainId string) ([]byte, func([]byte) []byte, error) { + originalData := relayData.Data + originalSalt := relayData.Salt + originalRequestedBlock := relayData.RequestBlock + originalSeenBlock := relayData.SeenBlock + defer func() { + // return all information back to the object on defer (in any case) + relayData.Data = originalData + relayData.Salt = originalSalt + relayData.RequestBlock = originalRequestedBlock + relayData.SeenBlock = originalSeenBlock + }() + + // we need to remove some data from the request so the cache will hit properly. + inputFormatter, outputFormatter := formatter.FormatterForRelayRequestAndResponse(relayData.ApiInterface) + relayData.Data = inputFormatter(relayData.Data) // remove id from request. + relayData.Salt = nil // remove salt + relayData.SeenBlock = 0 // remove seen block + // we remove the discrepancy of requested block from the hash, and add it on the cache side instead + // this is due to the fact that we don't know the latest seen block at this moment, as on shared state + // only the cache has this information. we make sure the hashing at this stage does not include the requested block. + // It does include it on the cache key side. + relayData.RequestBlock = 0 + + cashHash := &pairingtypes.CacheHash{ + Request: relayData, + ChainId: chainId, + } + cashHashBytes, err := proto.Marshal(cashHash) + if err != nil { + return nil, outputFormatter, utils.LavaFormatError("Failed marshalling cash hash in HashCacheRequest", err) + } + + // return the value + return sigs.HashMsg(cashHashBytes), outputFormatter, nil +} diff --git a/protocol/chainlib/chainproxy/connector.go b/protocol/chainlib/chainproxy/connector.go index afcda7060c..d5aeba9690 100644 --- a/protocol/chainlib/chainproxy/connector.go +++ b/protocol/chainlib/chainproxy/connector.go @@ -30,6 +30,7 @@ const ( GRPCUseTls = "use-tls" GRPCAllowInsecureConnection = "allow-insecure-connection" MaximumNumberOfParallelConnectionsAttempts = 10 + MaxCallRecvMsgSize = 1024 * 1024 * 32 // setting receive size to 32mb instead of 4mb default ) var NumberOfParallelConnections uint = 10 @@ -307,7 +308,7 @@ func (connector *GRPCConnector) increaseNumberOfClients(ctx context.Context, num var err error for connectionAttempt := 0; connectionAttempt < MaximumNumberOfParallelConnectionsAttempts; connectionAttempt++ { nctx, cancel := connector.nodeUrl.LowerContextTimeoutWithDuration(ctx, common.AverageWorldLatency*2) - grpcClient, err = grpc.DialContext(nctx, connector.nodeUrl.Url, grpc.WithBlock(), connector.getTransportCredentials()) + grpcClient, err = grpc.DialContext(nctx, connector.nodeUrl.Url, grpc.WithBlock(), connector.getTransportCredentials(), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(MaxCallRecvMsgSize))) if err != nil { utils.LavaFormatDebug("increaseNumberOfClients, Could not connect to the node, retrying", []utils.Attribute{{Key: "err", Value: err.Error()}, {Key: "Number Of Attempts", Value: connectionAttempt}, {Key: "nodeUrl", Value: connector.nodeUrl.UrlStr()}}...) cancel() @@ -455,7 +456,7 @@ func (connector *GRPCConnector) createConnection(ctx context.Context, nodeUrl co return nil, ctx.Err() } nctx, cancel := connector.nodeUrl.LowerContextTimeoutWithDuration(ctx, common.AverageWorldLatency*2) - rpcClient, err = grpc.DialContext(nctx, addr, grpc.WithBlock(), connector.getTransportCredentials()) + rpcClient, err = grpc.DialContext(nctx, addr, grpc.WithBlock(), connector.getTransportCredentials(), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(MaxCallRecvMsgSize))) cancel() if err == nil { return rpcClient, nil @@ -469,7 +470,7 @@ func (connector *GRPCConnector) createConnection(ctx context.Context, nodeUrl co } nctx, cancel := connector.nodeUrl.LowerContextTimeoutWithDuration(ctx, common.AverageWorldLatency*2) var errNew error - rpcClient, errNew = grpc.DialContext(nctx, addr, grpc.WithBlock(), grpc.WithTransportCredentials(credentialsToConnect)) + rpcClient, errNew = grpc.DialContext(nctx, addr, grpc.WithBlock(), grpc.WithTransportCredentials(credentialsToConnect), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(MaxCallRecvMsgSize))) cancel() if errNew == nil { // this means our endpoint is TLS, and we support upgrading even if the config didn't explicitly say it diff --git a/protocol/chainlib/chainproxy/connector_test.go b/protocol/chainlib/chainproxy/connector_test.go index 0a81d404eb..f6e5494654 100644 --- a/protocol/chainlib/chainproxy/connector_test.go +++ b/protocol/chainlib/chainproxy/connector_test.go @@ -64,7 +64,7 @@ func createGRPCServerWithRegisteredProto(t *testing.T) *grpc.Server { return s } -func createRPCServer(t *testing.T) net.Listener { +func createRPCServer() net.Listener { timeserver := new(TimeServer) // Register the timeserver object upon which the GiveServerTime // function will be called from the RPC server (from the client) @@ -85,7 +85,7 @@ func createRPCServer(t *testing.T) net.Listener { } func TestConnector(t *testing.T) { - listener := createRPCServer(t) // create a grpcServer so we can connect to its endpoint and validate everything works. + listener := createRPCServer() // create a grpcServer so we can connect to its endpoint and validate everything works. defer listener.Close() ctx := context.Background() conn, err := NewConnector(ctx, numberOfClients, common.NodeUrl{Url: listenerAddressTcp}) diff --git a/protocol/chainlib/chainproxy/rpcInterfaceMessages/tendermintRPCMessage.go b/protocol/chainlib/chainproxy/rpcInterfaceMessages/tendermintRPCMessage.go index c5ceed3baf..c209988ab7 100644 --- a/protocol/chainlib/chainproxy/rpcInterfaceMessages/tendermintRPCMessage.go +++ b/protocol/chainlib/chainproxy/rpcInterfaceMessages/tendermintRPCMessage.go @@ -40,24 +40,23 @@ func GetTendermintRPCError(jsonError *rpcclient.JsonError) (*tenderminttypes.RPC } var rpcError *tenderminttypes.RPCError - if jsonError != nil { - errData := "" - var ok bool - - // Make sure jsonError.Data exists - if jsonError.Data != nil { - errData, ok = (jsonError.Data).(string) - if !ok { - return nil, utils.LavaFormatError("(rpcMsg.Error.Data).(string) conversion failed", nil, utils.Attribute{Key: "data", Value: jsonError.Data}) - } - } - rpcError = &tenderminttypes.RPCError{ - Code: jsonError.Code, - Message: jsonError.Message, - Data: errData, + errData := "" + var ok bool + + // Make sure jsonError.Data exists + if jsonError.Data != nil { + errData, ok = (jsonError.Data).(string) + if !ok { + return nil, utils.LavaFormatError("(rpcMsg.Error.Data).(string) conversion failed", nil, utils.Attribute{Key: "data", Value: jsonError.Data}) } } + + rpcError = &tenderminttypes.RPCError{ + Code: jsonError.Code, + Message: jsonError.Message, + Data: errData, + } return rpcError, nil } diff --git a/protocol/chainlib/grpc.go b/protocol/chainlib/grpc.go index 74188d73ca..8c27df6dfa 100644 --- a/protocol/chainlib/grpc.go +++ b/protocol/chainlib/grpc.go @@ -360,14 +360,14 @@ func (apil *GrpcChainListener) Serve(ctx context.Context, cmdFlags common.Consum serveExecutor = func() error { return httpServer.Serve(lis) } } - fmt.Printf(fmt.Sprintf(` + fmt.Printf(` ┌───────────────────────────────────────────────────┐ │ Lava's Grpc Server │ │ %s│ │ Lavap Version: %s│ └───────────────────────────────────────────────────┘ -`, truncateAndPadString(apil.endpoint.NetworkAddress, 36), truncateAndPadString(protocoltypes.DefaultVersion.ConsumerTarget, 21))) +`, truncateAndPadString(apil.endpoint.NetworkAddress, 36), truncateAndPadString(protocoltypes.DefaultVersion.ConsumerTarget, 21)) if err := serveExecutor(); !errors.Is(err, http.ErrServerClosed) { utils.LavaFormatFatal("Portal failed to serve", err, utils.Attribute{Key: "Address", Value: lis.Addr()}, utils.Attribute{Key: "ChainID", Value: apil.endpoint.ChainID}) } @@ -395,10 +395,10 @@ func NewGrpcChainProxy(ctx context.Context, nConns uint, rpcProviderEndpoint lav if err != nil { return nil, err } - return newGrpcChainProxy(ctx, nodeUrl.Url, averageBlockTime, parser, conn, rpcProviderEndpoint) + return newGrpcChainProxy(ctx, averageBlockTime, parser, conn, rpcProviderEndpoint) } -func newGrpcChainProxy(ctx context.Context, nodeUrl string, averageBlockTime time.Duration, parser ChainParser, conn grpcConnectorInterface, rpcProviderEndpoint lavasession.RPCProviderEndpoint) (ChainProxy, error) { +func newGrpcChainProxy(ctx context.Context, averageBlockTime time.Duration, parser ChainParser, conn grpcConnectorInterface, rpcProviderEndpoint lavasession.RPCProviderEndpoint) (ChainProxy, error) { cp := &GrpcChainProxy{ BaseChainProxy: BaseChainProxy{averageBlockTime: averageBlockTime, ErrorHandler: &GRPCErrorHandler{}, ChainID: rpcProviderEndpoint.ChainID}, descriptorsCache: &grpcDescriptorCache{}, diff --git a/protocol/chainlib/node_error_handler.go b/protocol/chainlib/node_error_handler.go index dc1391fcf7..474bbab04a 100644 --- a/protocol/chainlib/node_error_handler.go +++ b/protocol/chainlib/node_error_handler.go @@ -54,7 +54,7 @@ func (geh *genericErrorHandler) handleGenericErrors(ctx context.Context, nodeErr return retError } -func (geh *genericErrorHandler) handleCodeErrors(ctx context.Context, code codes.Code) error { +func (geh *genericErrorHandler) handleCodeErrors(code codes.Code) error { if code == codes.DeadlineExceeded { return utils.LavaFormatProduction("Provider Failed Sending Message", common.ContextDeadlineExceededError) } @@ -124,7 +124,7 @@ func (geh *GRPCErrorHandler) HandleNodeError(ctx context.Context, nodeError erro st, ok := status.FromError(nodeError) if ok { // Get the error message from the gRPC status - return geh.handleCodeErrors(ctx, st.Code()) + return geh.handleCodeErrors(st.Code()) } return geh.handleGenericErrors(ctx, nodeError) } diff --git a/protocol/common/conf.go b/protocol/common/conf.go index 2febcbb126..03f466e5b9 100644 --- a/protocol/common/conf.go +++ b/protocol/common/conf.go @@ -18,6 +18,7 @@ const ( MaximumConcurrentProvidersFlagName = "concurrent-providers" StatusCodeMetadataKey = "status-code" VersionMetadataKey = "lavap-version" + TimeOutForFetchingLavaBlocksFlag = "timeout-for-fetching-lava-blocks" ) func ParseEndpointArgs(endpoint_strings, yaml_config_properties []string, endpointsConfigName string) (viper_endpoints *viper.Viper, err error) { diff --git a/protocol/lavaprotocol/request_builder.go b/protocol/lavaprotocol/request_builder.go index 5929db80d2..45c32aae36 100644 --- a/protocol/lavaprotocol/request_builder.go +++ b/protocol/lavaprotocol/request_builder.go @@ -126,6 +126,8 @@ func ReplaceRequestedBlock(requestedBlock, latestBlock int64) int64 { return latestBlock case spectypes.FINALIZED_BLOCK: return latestBlock + case spectypes.PENDING_BLOCK: + return latestBlock case spectypes.EARLIEST_BLOCK: return spectypes.NOT_APPLICABLE // TODO: add support for earliest block reliability } diff --git a/protocol/lavasession/common.go b/protocol/lavasession/common.go index 0b3ffdb2b9..5aeed0a0a4 100644 --- a/protocol/lavasession/common.go +++ b/protocol/lavasession/common.go @@ -14,6 +14,7 @@ import ( sdk "github.com/cosmos/cosmos-sdk/types" "github.com/gogo/status" + "github.com/lavanet/lava/protocol/chainlib/chainproxy" "github.com/lavanet/lava/utils" "github.com/lavanet/lava/x/pairing/keeper/scores" planstypes "github.com/lavanet/lava/x/plans/types" @@ -63,7 +64,7 @@ func ConnectgRPCClient(ctx context.Context, address string, allowInsecure bool) tlsConf.InsecureSkipVerify = true // this will allow us to use self signed certificates in development. } credentials := credentials.NewTLS(&tlsConf) - conn, err := grpc.DialContext(ctx, address, grpc.WithBlock(), grpc.WithTransportCredentials(credentials)) + conn, err := grpc.DialContext(ctx, address, grpc.WithBlock(), grpc.WithTransportCredentials(credentials), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(chainproxy.MaxCallRecvMsgSize))) return conn, err } diff --git a/protocol/lavasession/consumer_session_manager.go b/protocol/lavasession/consumer_session_manager.go index cc26c6b70c..6e993f7438 100644 --- a/protocol/lavasession/consumer_session_manager.go +++ b/protocol/lavasession/consumer_session_manager.go @@ -533,7 +533,7 @@ func (csm *ConsumerSessionManager) getValidConsumerSessionsWithProvider(ignoredP // Fetch provider addresses providerAddresses, err := csm.getValidProviderAddresses(ignoredProviders.providers, cuNeededForSession, requestedBlock, addon, extensions, stateful) if err != nil { - utils.LavaFormatError("could not get a provider addresses", err) + utils.LavaFormatError(csm.rpcEndpoint.ChainID+" could not get a provider addresses", err) return nil, err } diff --git a/protocol/monitoring/health.go b/protocol/monitoring/health.go index db50955ea0..8282f0f3aa 100644 --- a/protocol/monitoring/health.go +++ b/protocol/monitoring/health.go @@ -533,9 +533,6 @@ func CheckProviders(ctx context.Context, clientCtx client.Context, healthResults return err } lavaVersion := param.GetParams().Version - if err != nil { - return err - } targetVersion := lvutil.ParseToSemanticVersion(lavaVersion.ProviderTarget) var wg sync.WaitGroup wg.Add(len(providerEntries)) diff --git a/protocol/performance/cache.go b/protocol/performance/cache.go index d7223c325d..a395fa292a 100644 --- a/protocol/performance/cache.go +++ b/protocol/performance/cache.go @@ -4,6 +4,7 @@ import ( "context" "time" + "github.com/lavanet/lava/protocol/chainlib/chainproxy" pairingtypes "github.com/lavanet/lava/x/pairing/types" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -17,7 +18,7 @@ type Cache struct { func ConnectGRPCConnectionToRelayerCacheService(ctx context.Context, addr string) (*pairingtypes.RelayerCacheClient, error) { connectCtx, cancel := context.WithTimeout(ctx, 3*time.Second) defer cancel() - conn, err := grpc.DialContext(connectCtx, addr, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock()) + conn, err := grpc.DialContext(connectCtx, addr, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock(), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(chainproxy.MaxCallRecvMsgSize))) if err != nil { return nil, err } diff --git a/protocol/rpcconsumer/rpcconsumer.go b/protocol/rpcconsumer/rpcconsumer.go index 145d57ec5d..2901e79d77 100644 --- a/protocol/rpcconsumer/rpcconsumer.go +++ b/protocol/rpcconsumer/rpcconsumer.go @@ -565,6 +565,8 @@ rpcconsumer consumer_examples/full_consumer_example.yml --cache-be "127.0.0.1:77 cmdRPCConsumer.Flags().String(reportsSendBEAddress, "", "address to send reports to") cmdRPCConsumer.Flags().BoolVar(&lavasession.DebugProbes, DebugProbesFlagName, false, "adding information to probes") cmdRPCConsumer.Flags().Bool(common.DisableConflictTransactionsFlag, false, "disabling conflict transactions, this flag should not be used as it harms the network's data reliability and therefore the service.") + cmdRPCConsumer.Flags().DurationVar(&updaters.TimeOutForFetchingLavaBlocks, common.TimeOutForFetchingLavaBlocksFlag, time.Second*5, "setting the timeout for fetching lava blocks") + common.AddRollingLogConfig(cmdRPCConsumer) return cmdRPCConsumer } diff --git a/protocol/rpcconsumer/rpcconsumer_server.go b/protocol/rpcconsumer/rpcconsumer_server.go index a6672aa576..5b5be9ee7a 100644 --- a/protocol/rpcconsumer/rpcconsumer_server.go +++ b/protocol/rpcconsumer/rpcconsumer_server.go @@ -269,7 +269,7 @@ func (rpccs *RPCConsumerServer) SendRelay( // remove lava directive headers metadata, directiveHeaders := rpccs.LavaDirectiveHeaders(metadata) relaySentTime := time.Now() - chainMessage, err := rpccs.chainParser.ParseMsg(url, []byte(req), connectionType, metadata, rpccs.getExtensionsFromDirectiveHeaders(rpccs.getLatestBlock(), directiveHeaders)) + chainMessage, err := rpccs.chainParser.ParseMsg(url, []byte(req), connectionType, metadata, rpccs.getExtensionsFromDirectiveHeaders(directiveHeaders)) if err != nil { return nil, err } @@ -374,7 +374,7 @@ func (rpccs *RPCConsumerServer) SendRelay( return errorRelayResult, utils.LavaFormatError("Failed all relay retries due to timeout consider adding 'lava-relay-timeout' header to extend the allowed timeout duration", nil, utils.Attribute{Key: "GUID", Value: ctx}) } bestRelayError := relayErrors.GetBestErrorMessageForUser() - return errorRelayResult, utils.LavaFormatError("Failed all retries", nil, utils.Attribute{Key: "GUID", Value: ctx}, utils.LogAttr("error", bestRelayError.err)) + return errorRelayResult, utils.LavaFormatError("Failed all retries", nil, utils.Attribute{Key: "GUID", Value: ctx}, utils.LogAttr("error", bestRelayError.err), utils.LogAttr("chain_id", rpccs.listenEndpoint.ChainID)) } else if len(relayErrors.relayErrors) > 0 { utils.LavaFormatDebug("relay succeeded but had some errors", utils.Attribute{Key: "GUID", Value: ctx}, utils.Attribute{Key: "errors", Value: relayErrors}) } @@ -443,38 +443,53 @@ func (rpccs *RPCConsumerServer) sendRelayToProvider( var cacheError error if reqBlock != spectypes.NOT_APPLICABLE || !chainMessage.GetForceCacheRefresh() { var cacheReply *pairingtypes.CacheRelayReply - cacheCtx, cancel := context.WithTimeout(ctx, common.CacheTimeout) - cacheReply, cacheError = rpccs.cache.GetEntry(cacheCtx, &pairingtypes.RelayCacheGet{Request: relayRequestData, BlockHash: nil, ChainID: chainID, Finalized: false, SharedStateId: sharedStateId}) // caching in the portal doesn't care about hashes, and we don't have data on finalization yet - cancel() - reply := cacheReply.GetReply() - // read seen block from cache even if we had a miss we still want to get the seen block so we can use it to get the right provider. - cacheSeenBlock := cacheReply.GetSeenBlock() - // check if the cache seen block is greater than my local seen block, this means the user requested this - // request spoke with another consumer instance and use that block for inter consumer consistency. - if rpccs.sharedState && cacheSeenBlock > relayRequestData.SeenBlock { - utils.LavaFormatDebug("shared state seen block is newer", utils.LogAttr("cache_seen_block", cacheSeenBlock), utils.LogAttr("local_seen_block", relayRequestData.SeenBlock)) - relayRequestData.SeenBlock = cacheSeenBlock - // setting the fetched seen block from the cache server to our local cache as well. - rpccs.consumerConsistency.SetSeenBlock(cacheSeenBlock, dappID, consumerIp) - } + hashKey, outputFormatter, err := chainlib.HashCacheRequest(relayRequestData, chainID) + if err != nil { + utils.LavaFormatError("sendRelayToProvider Failed getting Hash for cache request", err) + } else { + cacheCtx, cancel := context.WithTimeout(ctx, common.CacheTimeout) + cacheReply, cacheError = rpccs.cache.GetEntry(cacheCtx, &pairingtypes.RelayCacheGet{ + RequestHash: hashKey, + RequestedBlock: relayRequestData.RequestBlock, + ChainId: chainID, + BlockHash: nil, + Finalized: false, + SharedStateId: sharedStateId, + SeenBlock: relayRequestData.SeenBlock, + }) // caching in the portal doesn't care about hashes, and we don't have data on finalization yet + cancel() + reply := cacheReply.GetReply() + + // read seen block from cache even if we had a miss we still want to get the seen block so we can use it to get the right provider. + cacheSeenBlock := cacheReply.GetSeenBlock() + // check if the cache seen block is greater than my local seen block, this means the user requested this + // request spoke with another consumer instance and use that block for inter consumer consistency. + if rpccs.sharedState && cacheSeenBlock > relayRequestData.SeenBlock { + utils.LavaFormatDebug("shared state seen block is newer", utils.LogAttr("cache_seen_block", cacheSeenBlock), utils.LogAttr("local_seen_block", relayRequestData.SeenBlock)) + relayRequestData.SeenBlock = cacheSeenBlock + // setting the fetched seen block from the cache server to our local cache as well. + rpccs.consumerConsistency.SetSeenBlock(cacheSeenBlock, dappID, consumerIp) + } - // handle cache reply - if cacheError == nil && reply != nil { - // Info was fetched from cache, so we don't need to change the state - // so we can return here, no need to update anything and calculate as this info was fetched from the cache - relayResult = &common.RelayResult{ - Reply: reply, - Request: &pairingtypes.RelayRequest{ - RelayData: relayRequestData, - }, - Finalized: false, // set false to skip data reliability - ProviderInfo: common.ProviderInfo{ProviderAddress: ""}, + // handle cache reply + if cacheError == nil && reply != nil { + // Info was fetched from cache, so we don't need to change the state + // so we can return here, no need to update anything and calculate as this info was fetched from the cache + reply.Data = outputFormatter(reply.Data) + relayResult = &common.RelayResult{ + Reply: reply, + Request: &pairingtypes.RelayRequest{ + RelayData: relayRequestData, + }, + Finalized: false, // set false to skip data reliability + ProviderInfo: common.ProviderInfo{ProviderAddress: ""}, + } + return relayResult, nil + } + // cache failed, move on to regular relay + if performance.NotConnectedError.Is(cacheError) { + utils.LavaFormatDebug("cache not connected", utils.LogAttr("error", cacheError)) } - return relayResult, nil - } - // cache failed, move on to regular relay - if performance.NotConnectedError.Is(cacheError) { - utils.LavaFormatDebug("cache not connected", utils.LogAttr("error", cacheError)) } } else { utils.LavaFormatDebug("skipping cache due to requested block being NOT_APPLICABLE", utils.Attribute{Key: "api name", Value: chainMessage.GetApi().Name}) @@ -604,26 +619,46 @@ func (rpccs *RPCConsumerServer) sendRelayToProvider( errResponse = rpccs.consumerSessionManager.OnSessionDone(singleConsumerSession, latestBlock, chainlib.GetComputeUnits(chainMessage), relayLatency, singleConsumerSession.CalculateExpectedLatency(relayTimeout), expectedBH, numOfProviders, pairingAddressesLen, chainMessage.GetApi().Category.HangingApi) // session done successfully if rpccs.cache.CacheActive() { - // copy private data so if it changes it doesn't panic mid async send - copyPrivateData := &pairingtypes.RelayPrivateData{} - copyRequestErr := protocopy.DeepCopyProtoObject(localRelayResult.Request.RelayData, copyPrivateData) + // copy reply data so if it changes it doesn't panic mid async send copyReply := &pairingtypes.RelayReply{} copyReplyErr := protocopy.DeepCopyProtoObject(localRelayResult.Reply, copyReply) // set cache in a non blocking call + + requestedBlock := localRelayResult.Request.RelayData.RequestBlock // get requested block before removing it from the data + seenBlock := localRelayResult.Request.RelayData.SeenBlock // get seen block before removing it from the data + hashKey, _, hashErr := chainlib.HashCacheRequest(localRelayResult.Request.RelayData, chainID) // get the hash (this changes the data) + go func() { // deal with copying error. - if copyRequestErr != nil || copyReplyErr != nil { - utils.LavaFormatError("Failed copying relay private data sendRelayToProvider", nil, utils.LogAttr("copyReplyErr", copyReplyErr), utils.LogAttr("copyRequestErr", copyRequestErr)) + if copyReplyErr != nil || hashErr != nil { + utils.LavaFormatError("Failed copying relay private data sendRelayToProvider", nil, + utils.LogAttr("copyReplyErr", copyReplyErr), + utils.LogAttr("hashErr", hashErr), + ) return } - requestedBlock, _ := chainMessage.RequestedBlock() - if requestedBlock == spectypes.NOT_APPLICABLE { + chainMessageRequestedBlock, _ := chainMessage.RequestedBlock() + if chainMessageRequestedBlock == spectypes.NOT_APPLICABLE { return } + new_ctx := context.Background() new_ctx, cancel := context.WithTimeout(new_ctx, common.DataReliabilityTimeoutIncrease) defer cancel() - err2 := rpccs.cache.SetEntry(new_ctx, &pairingtypes.RelayCacheSet{Request: copyPrivateData, BlockHash: nil, ChainID: chainID, Response: copyReply, Finalized: localRelayResult.Finalized, OptionalMetadata: nil, SharedStateId: sharedStateId}) // caching in the portal doesn't care about hashes + _, averageBlockTime, _, _ := rpccs.chainParser.ChainBlockStats() + + err2 := rpccs.cache.SetEntry(new_ctx, &pairingtypes.RelayCacheSet{ + RequestHash: hashKey, + ChainId: chainID, + RequestedBlock: requestedBlock, + SeenBlock: seenBlock, + BlockHash: nil, // consumer cache doesn't care about block hashes + Response: copyReply, + Finalized: localRelayResult.Finalized, + OptionalMetadata: nil, + SharedStateId: sharedStateId, + AverageBlockTime: int64(averageBlockTime), // by using average block time we can set longer TTL + }) if err2 != nil { utils.LavaFormatWarning("error updating cache with new entry", err2) } @@ -920,7 +955,7 @@ func (rpccs *RPCConsumerServer) GetInitialUnwantedProviders(directiveHeaders map return unwantedProviders } -func (rpccs *RPCConsumerServer) getExtensionsFromDirectiveHeaders(latestBlock uint64, directiveHeaders map[string]string) extensionslib.ExtensionInfo { +func (rpccs *RPCConsumerServer) getExtensionsFromDirectiveHeaders(directiveHeaders map[string]string) extensionslib.ExtensionInfo { extensionsStr, ok := directiveHeaders[common.EXTENSION_OVERRIDE_HEADER_NAME] if ok { extensions := strings.Split(extensionsStr, ",") diff --git a/protocol/rpcconsumer/testing.go b/protocol/rpcconsumer/testing.go index 031e5b066c..c62bf01eba 100644 --- a/protocol/rpcconsumer/testing.go +++ b/protocol/rpcconsumer/testing.go @@ -9,7 +9,6 @@ import ( "github.com/cosmos/cosmos-sdk/client" "github.com/cosmos/cosmos-sdk/client/flags" - "github.com/cosmos/cosmos-sdk/client/tx" "github.com/cosmos/cosmos-sdk/version" "github.com/lavanet/lava/protocol/chainlib" "github.com/lavanet/lava/protocol/chainlib/chainproxy" @@ -24,7 +23,7 @@ import ( "github.com/spf13/viper" ) -func startTesting(ctx context.Context, clientCtx client.Context, txFactory tx.Factory, rpcEndpoints []*lavasession.RPCProviderEndpoint, parallelConnections uint) error { +func startTesting(ctx context.Context, clientCtx client.Context, rpcEndpoints []*lavasession.RPCProviderEndpoint, parallelConnections uint) error { ctx, cancel := context.WithCancel(ctx) signalChan := make(chan os.Signal, 1) signal.Notify(signalChan, os.Interrupt) @@ -156,17 +155,13 @@ func CreateTestRPCConsumerCobraCommand() *cobra.Command { } } clientCtx = clientCtx.WithChainID(networkChainId) - txFactory, err := tx.NewFactoryCLI(clientCtx, cmd.Flags()) - if err != nil { - utils.LavaFormatFatal("failed to create txFactory", err) - } utils.LavaFormatInfo("lavad Binary Version: " + version.Version) rand.InitRandomSeed() numberOfNodeParallelConnections, err := cmd.Flags().GetUint(chainproxy.ParallelConnectionsFlag) if err != nil { utils.LavaFormatFatal("error fetching chainproxy.ParallelConnectionsFlag", err) } - return startTesting(ctx, clientCtx, txFactory, modifiedProviderEndpoints, numberOfNodeParallelConnections) + return startTesting(ctx, clientCtx, modifiedProviderEndpoints, numberOfNodeParallelConnections) }, } diff --git a/protocol/rpcprovider/rpcprovider.go b/protocol/rpcprovider/rpcprovider.go index 9b968d79d3..a9d7a15d3a 100644 --- a/protocol/rpcprovider/rpcprovider.go +++ b/protocol/rpcprovider/rpcprovider.go @@ -721,6 +721,7 @@ rpcprovider 127.0.0.1:3333 COS3 tendermintrpc "wss://www.node-path.com:80,https: cmdRPCProvider.Flags().Bool(common.RelaysHealthEnableFlag, true, "enables relays health check") cmdRPCProvider.Flags().Duration(common.RelayHealthIntervalFlag, RelayHealthIntervalFlagDefault, "interval between relay health checks") cmdRPCProvider.Flags().String(HealthCheckURLPathFlagName, HealthCheckURLPathFlagDefault, "the url path for the provider's grpc health check") + cmdRPCProvider.Flags().DurationVar(&updaters.TimeOutForFetchingLavaBlocks, common.TimeOutForFetchingLavaBlocksFlag, time.Second*5, "setting the timeout for fetching lava blocks") common.AddRollingLogConfig(cmdRPCProvider) return cmdRPCProvider diff --git a/protocol/rpcprovider/rpcprovider_server.go b/protocol/rpcprovider/rpcprovider_server.go index 93fe18e681..f435e07585 100644 --- a/protocol/rpcprovider/rpcprovider_server.go +++ b/protocol/rpcprovider/rpcprovider_server.go @@ -117,7 +117,7 @@ func (rpcps *RPCProviderServer) initRelaysMonitor(ctx context.Context) { } rpcps.relaysMonitor.SetRelaySender(func() (bool, error) { - chainMessage, err := rpcps.craftChainMessage(ctx) + chainMessage, err := rpcps.craftChainMessage() if err != nil { return false, err } @@ -129,7 +129,7 @@ func (rpcps *RPCProviderServer) initRelaysMonitor(ctx context.Context) { rpcps.relaysMonitor.Start(ctx) } -func (rpcps *RPCProviderServer) craftChainMessage(ctx context.Context) (chainMessage chainlib.ChainMessage, err error) { +func (rpcps *RPCProviderServer) craftChainMessage() (chainMessage chainlib.ChainMessage, err error) { parsing, collectionData, ok := rpcps.chainParser.GetParsingByTag(spectypes.FUNCTION_TAG_GET_BLOCKNUM) if !ok { return nil, utils.LavaFormatWarning("did not send initial relays because the spec does not contain "+spectypes.FUNCTION_TAG_GET_BLOCKNUM.String(), nil, @@ -677,9 +677,9 @@ func (rpcps *RPCProviderServer) TryRelay(ctx context.Context, request *pairingty var blockDistanceToFinalization uint32 var averageBlockTime time.Duration updatedChainMessage := false + var blockLagForQosSync int64 + blockLagForQosSync, averageBlockTime, blockDistanceToFinalization, blocksInFinalizationData = rpcps.chainParser.ChainBlockStats() if dataReliabilityEnabled { - var blockLagForQosSync int64 - blockLagForQosSync, averageBlockTime, blockDistanceToFinalization, blocksInFinalizationData = rpcps.chainParser.ChainBlockStats() var err error specificBlock := request.RelayData.RequestBlock if specificBlock < spectypes.LATEST_BLOCK { @@ -722,13 +722,29 @@ func (rpcps *RPCProviderServer) TryRelay(ctx context.Context, request *pairingty ignoredMetadata := []pairingtypes.Metadata{} if requestedBlockHash != nil || finalized { var cacheReply *pairingtypes.CacheRelayReply - cacheCtx, cancel := context.WithTimeout(ctx, common.CacheTimeout) - cacheReply, err = cache.GetEntry(cacheCtx, &pairingtypes.RelayCacheGet{Request: request.RelayData, BlockHash: requestedBlockHash, ChainID: rpcps.rpcProviderEndpoint.ChainID, Finalized: finalized, Provider: rpcps.providerAddress.String()}) - cancel() - reply = cacheReply.GetReply() - ignoredMetadata = cacheReply.GetOptionalMetadata() - if err != nil && performance.NotConnectedError.Is(err) { - utils.LavaFormatDebug("cache not connected", utils.LogAttr("err", err), utils.Attribute{Key: "GUID", Value: ctx}) + + hashKey, outPutFormatter, hashErr := chainlib.HashCacheRequest(request.RelayData, rpcps.rpcProviderEndpoint.ChainID) + if hashErr != nil { + utils.LavaFormatError("TryRelay Failed computing hash for cache request", hashErr) + } else { + cacheCtx, cancel := context.WithTimeout(ctx, common.CacheTimeout) + cacheReply, err = cache.GetEntry(cacheCtx, &pairingtypes.RelayCacheGet{ + RequestHash: hashKey, + RequestedBlock: request.RelayData.RequestBlock, + ChainId: rpcps.rpcProviderEndpoint.ChainID, + BlockHash: requestedBlockHash, + Finalized: finalized, + SeenBlock: request.RelayData.SeenBlock, + }) + cancel() + reply = cacheReply.GetReply() + if reply != nil { + reply.Data = outPutFormatter(reply.Data) // setting request id back to reply. + } + ignoredMetadata = cacheReply.GetOptionalMetadata() + if err != nil && performance.NotConnectedError.Is(err) { + utils.LavaFormatDebug("cache not connected", utils.LogAttr("err", err), utils.Attribute{Key: "GUID", Value: ctx}) + } } } if err != nil || reply == nil { @@ -754,19 +770,33 @@ func (rpcps *RPCProviderServer) TryRelay(ctx context.Context, request *pairingty // TODO: use overwriteReqBlock on the reply metadata to set the correct latest block if cache.CacheActive() && (requestedBlockHash != nil || finalized) { // copy request and reply as they change later on and we call SetEntry in a routine. - copyPrivateData := &pairingtypes.RelayPrivateData{} - copyRequestErr := protocopy.DeepCopyProtoObject(request.RelayData, copyPrivateData) + requestedBlock := request.RelayData.RequestBlock // get requested block before removing it from the data + hashKey, _, hashErr := chainlib.HashCacheRequest(request.RelayData, rpcps.rpcProviderEndpoint.ChainID) // get the hash (this changes the data) copyReply := &pairingtypes.RelayReply{} copyReplyErr := protocopy.DeepCopyProtoObject(reply, copyReply) go func() { - if copyRequestErr != nil || copyReplyErr != nil { - utils.LavaFormatError("Failed copying relay private data on TryRelay", nil, utils.LogAttr("copyReplyErr", copyReplyErr), utils.LogAttr("copyRequestErr", copyRequestErr)) + if hashErr != nil || copyReplyErr != nil { + utils.LavaFormatError("Failed copying relay private data on TryRelay", nil, utils.LogAttr("copyReplyErr", copyReplyErr), utils.LogAttr("hashErr", hashErr)) return } new_ctx := context.Background() new_ctx, cancel := context.WithTimeout(new_ctx, common.DataReliabilityTimeoutIncrease) defer cancel() - err := cache.SetEntry(new_ctx, &pairingtypes.RelayCacheSet{Request: copyPrivateData, BlockHash: requestedBlockHash, ChainID: rpcps.rpcProviderEndpoint.ChainID, Response: copyReply, Finalized: finalized, Provider: rpcps.providerAddress.String(), OptionalMetadata: ignoredMetadata}) + if err != nil { + utils.LavaFormatError("TryRelay failed calculating hash for cach.SetEntry", err) + return + } + err = cache.SetEntry(new_ctx, &pairingtypes.RelayCacheSet{ + RequestHash: hashKey, + RequestedBlock: requestedBlock, + BlockHash: requestedBlockHash, + ChainId: rpcps.rpcProviderEndpoint.ChainID, + Response: copyReply, + Finalized: finalized, + OptionalMetadata: ignoredMetadata, + AverageBlockTime: int64(averageBlockTime), + SeenBlock: latestBlock, + }) if err != nil && request.RelaySession.Epoch != spectypes.NOT_APPLICABLE { utils.LavaFormatWarning("error updating cache with new entry", err, utils.Attribute{Key: "GUID", Value: ctx}) } diff --git a/protocol/rpcprovider/spec_validator.go b/protocol/rpcprovider/spec_validator.go index 4d5dd4f8ac..8e6b309269 100644 --- a/protocol/rpcprovider/spec_validator.go +++ b/protocol/rpcprovider/spec_validator.go @@ -128,12 +128,12 @@ func (sv *SpecValidator) validateAllChains(ctx context.Context) { } func (sv *SpecValidator) validateAllDisabledChains(ctx context.Context) { - for chainId := range sv.getDisabledChains(ctx) { + for chainId := range sv.getDisabledChains() { sv.validateChain(ctx, chainId) } } -func (sv *SpecValidator) getDisabledChains(ctx context.Context) map[string]struct{} { +func (sv *SpecValidator) getDisabledChains() map[string]struct{} { disabledChains := map[string]struct{}{} for _, chainFetchersList := range sv.chainFetchers { for _, chainFetcher := range chainFetchersList { diff --git a/protocol/rpcprovider/testing.go b/protocol/rpcprovider/testing.go index 12b4247846..b2654499f9 100644 --- a/protocol/rpcprovider/testing.go +++ b/protocol/rpcprovider/testing.go @@ -13,7 +13,6 @@ import ( "github.com/cosmos/cosmos-sdk/client" "github.com/cosmos/cosmos-sdk/client/flags" - "github.com/cosmos/cosmos-sdk/client/tx" "github.com/cosmos/cosmos-sdk/version" "github.com/gogo/status" lvutil "github.com/lavanet/lava/ecosystem/lavavisor/pkg/util" @@ -101,7 +100,7 @@ func validateCORSHeaders(resp *http.Response) error { return nil } -func startTesting(ctx context.Context, clientCtx client.Context, txFactory tx.Factory, providerEntries []epochstoragetypes.StakeEntry) error { +func startTesting(ctx context.Context, clientCtx client.Context, providerEntries []epochstoragetypes.StakeEntry) error { ctx, cancel := context.WithCancel(ctx) signalChan := make(chan os.Signal, 1) signal.Notify(signalChan, os.Interrupt) @@ -118,9 +117,6 @@ func startTesting(ctx context.Context, clientCtx client.Context, txFactory tx.Fa return err } lavaVersion := param.GetParams().Version - if err != nil { - utils.LavaFormatFatal("failed fetching protocol version from node", err) - } targetVersion := lvutil.ParseToSemanticVersion(lavaVersion.ProviderTarget) for _, providerEntry := range providerEntries { utils.LavaFormatInfo("checking provider entry", utils.Attribute{Key: "chainID", Value: providerEntry.Chain}, utils.Attribute{Key: "endpoints", Value: providerEntry.Endpoints}) @@ -274,10 +270,6 @@ rpcprovider --from providerWallet --endpoints "provider-public-grpc:port,jsonrpc utils.LavaFormatInfo("RPCProvider Test started", utils.Attribute{Key: "address", Value: address}) utils.SetGlobalLoggingLevel(logLevel) clientCtx = clientCtx.WithChainID(networkChainId) - txFactory, err := tx.NewFactoryCLI(clientCtx, cmd.Flags()) - if err != nil { - utils.LavaFormatFatal("failed to create txFactory", err) - } utils.LavaFormatInfo("lavad Binary Version: " + version.Version) rand.InitRandomSeed() @@ -359,7 +351,7 @@ rpcprovider --from providerWallet --endpoints "provider-public-grpc:port,jsonrpc utils.LavaFormatError("no active chains for provider", nil, utils.Attribute{Key: "address", Value: address}) } utils.LavaFormatDebug("checking chain entries", utils.Attribute{Key: "stakedProviderChains", Value: stakedProviderChains}) - return startTesting(ctx, clientCtx, txFactory, stakedProviderChains) + return startTesting(ctx, clientCtx, stakedProviderChains) }, } diff --git a/protocol/statetracker/updaters/event_tracker.go b/protocol/statetracker/updaters/event_tracker.go index d42d85f399..e606962a2a 100644 --- a/protocol/statetracker/updaters/event_tracker.go +++ b/protocol/statetracker/updaters/event_tracker.go @@ -23,6 +23,8 @@ const ( BlockResultRetry = 20 ) +var TimeOutForFetchingLavaBlocks = time.Second * 5 + type EventTracker struct { lock sync.RWMutex ClientCtx client.Context @@ -36,7 +38,7 @@ func (et *EventTracker) UpdateBlockResults(latestBlock int64) (err error) { if latestBlock == 0 { var res *ctypes.ResultStatus for i := 0; i < 3; i++ { - timeoutCtx, cancel := context.WithTimeout(ctx, time.Second) + timeoutCtx, cancel := context.WithTimeout(ctx, TimeOutForFetchingLavaBlocks) res, err = et.ClientCtx.Client.Status(timeoutCtx) cancel() if err == nil { @@ -55,7 +57,7 @@ func (et *EventTracker) UpdateBlockResults(latestBlock int64) (err error) { } var blockResults *ctypes.ResultBlockResults for i := 0; i < BlockResultRetry; i++ { - timeoutCtx, cancel := context.WithTimeout(ctx, time.Second) + timeoutCtx, cancel := context.WithTimeout(ctx, TimeOutForFetchingLavaBlocks) blockResults, err = brp.BlockResults(timeoutCtx, &latestBlock) cancel() if err == nil { diff --git a/scripts/pre_setups/init_lava_only_with_node_with_cache.sh b/scripts/pre_setups/init_lava_only_with_node_with_cache.sh index bfd44d9054..4e6de63d33 100755 --- a/scripts/pre_setups/init_lava_only_with_node_with_cache.sh +++ b/scripts/pre_setups/init_lava_only_with_node_with_cache.sh @@ -42,19 +42,24 @@ PROVIDER1_LISTENER="127.0.0.1:2220" lavad tx subscription buy DefaultPlan $(lavad keys show user1 -a) -y --from user1 --gas-adjustment "1.5" --gas "auto" --gas-prices $GASPRICE wait_next_block -lavad tx pairing stake-provider "LAV1" $PROVIDERSTAKE "$PROVIDER1_LISTENER,1" 1 $(operator_address) -y --from servicer1 --provider-moniker "dummyMoniker" --gas-adjustment "1.5" --gas "auto" --gas-prices $GASPRICE +lavad tx pairing stake-provider "LAV1" $PROVIDERSTAKE "$PROVIDER1_LISTENER,1" 1 $(operator_address) -y --delegate-limit 1000ulava --from servicer1 --provider-moniker "dummyMoniker" --gas-adjustment "1.5" --gas "auto" --gas-prices $GASPRICE wait_next_block sleep_until_next_epoch + +screen -d -m -S cache_consumer bash -c "source ~/.bashrc; lavap cache \ +127.0.0.1:20100 --metrics_address 0.0.0.0:20200 --log_level debug 2>&1 | tee $LOGS_DIR/CACHE_CONSUMER.log" && sleep 0.25 +sleep 2; +screen -d -m -S cache_provider bash -c "source ~/.bashrc; lavap cache \ +127.0.0.1:20101 --metrics_address 0.0.0.0:20201 --log_level debug 2>&1 | tee $LOGS_DIR/CACHE_PROVIDER.log" && sleep 0.25 +sleep 2; + screen -d -m -S provider1 bash -c "source ~/.bashrc; lavap rpcprovider \ $PROVIDER1_LISTENER LAV1 rest '$LAVA_REST' \ $PROVIDER1_LISTENER LAV1 tendermintrpc '$LAVA_RPC,$LAVA_RPC' \ $PROVIDER1_LISTENER LAV1 grpc '$LAVA_GRPC' \ -$EXTRA_PROVIDER_FLAGS --geolocation 1 --log_level debug --from servicer1 --chain-id lava --metrics-listen-address ":7776" 2>&1 | tee $LOGS_DIR/PROVIDER1.log" && sleep 0.25 +$EXTRA_PROVIDER_FLAGS --geolocation 1 --log_level debug --from servicer1 --chain-id lava --cache-be 127.0.0.1:20101 --metrics-listen-address ":7776" 2>&1 | tee $LOGS_DIR/PROVIDER1.log" && sleep 0.25 -screen -d -m -S cache bash -c "source ~/.bashrc; lavap cache \ -127.0.0.1:20100 --metrics_address 0.0.0.0:20200 --log_level debug 2>&1 | tee $LOGS_DIR/CACHE.log" && sleep 0.25 -sleep 2; screen -d -m -S consumers bash -c "source ~/.bashrc; lavap rpcconsumer \ 127.0.0.1:3360 LAV1 rest 127.0.0.1:3361 LAV1 tendermintrpc 127.0.0.1:3362 LAV1 grpc \ $EXTRA_PORTAL_FLAGS --geolocation 1 --log_level debug --from user1 --shared-state --chain-id lava --cache-be 127.0.0.1:20100 --allow-insecure-provider-dialing --metrics-listen-address ":7779" 2>&1 | tee $LOGS_DIR/CONSUMERS.log" && sleep 0.25 diff --git a/x/pairing/types/relayCache.pb.go b/x/pairing/types/relayCache.pb.go index 5299e2d60a..35665e71bf 100644 --- a/x/pairing/types/relayCache.pb.go +++ b/x/pairing/types/relayCache.pb.go @@ -141,20 +141,74 @@ func (m *CacheUsage) GetCacheMisses() uint64 { return 0 } +// data containing the cache key hash +type CacheHash struct { + Request *RelayPrivateData `protobuf:"bytes,1,opt,name=request,proto3" json:"request,omitempty"` + ChainId string `protobuf:"bytes,2,opt,name=chain_id,json=chainId,proto3" json:"chain_id,omitempty"` +} + +func (m *CacheHash) Reset() { *m = CacheHash{} } +func (m *CacheHash) String() string { return proto.CompactTextString(m) } +func (*CacheHash) ProtoMessage() {} +func (*CacheHash) Descriptor() ([]byte, []int) { + return fileDescriptor_36fbab536e2bbad1, []int{2} +} +func (m *CacheHash) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *CacheHash) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_CacheHash.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *CacheHash) XXX_Merge(src proto.Message) { + xxx_messageInfo_CacheHash.Merge(m, src) +} +func (m *CacheHash) XXX_Size() int { + return m.Size() +} +func (m *CacheHash) XXX_DiscardUnknown() { + xxx_messageInfo_CacheHash.DiscardUnknown(m) +} + +var xxx_messageInfo_CacheHash proto.InternalMessageInfo + +func (m *CacheHash) GetRequest() *RelayPrivateData { + if m != nil { + return m.Request + } + return nil +} + +func (m *CacheHash) GetChainId() string { + if m != nil { + return m.ChainId + } + return "" +} + type RelayCacheGet struct { - Request *RelayPrivateData `protobuf:"bytes,1,opt,name=request,proto3" json:"request,omitempty"` - BlockHash []byte `protobuf:"bytes,2,opt,name=blockHash,proto3" json:"blockHash,omitempty"` - ChainID string `protobuf:"bytes,3,opt,name=chainID,proto3" json:"chainID,omitempty"` - Finalized bool `protobuf:"varint,4,opt,name=finalized,proto3" json:"finalized,omitempty"` - Provider string `protobuf:"bytes,5,opt,name=provider,proto3" json:"provider,omitempty"` - SharedStateId string `protobuf:"bytes,6,opt,name=shared_state_id,json=sharedStateId,proto3" json:"shared_state_id,omitempty"` + RequestHash []byte `protobuf:"bytes,1,opt,name=request_hash,json=requestHash,proto3" json:"request_hash,omitempty"` + BlockHash []byte `protobuf:"bytes,2,opt,name=block_hash,json=blockHash,proto3" json:"block_hash,omitempty"` + Finalized bool `protobuf:"varint,3,opt,name=finalized,proto3" json:"finalized,omitempty"` + RequestedBlock int64 `protobuf:"varint,4,opt,name=requested_block,json=requestedBlock,proto3" json:"requested_block,omitempty"` + SharedStateId string `protobuf:"bytes,5,opt,name=shared_state_id,json=sharedStateId,proto3" json:"shared_state_id,omitempty"` + ChainId string `protobuf:"bytes,6,opt,name=chain_id,json=chainId,proto3" json:"chain_id,omitempty"` + SeenBlock int64 `protobuf:"varint,7,opt,name=seen_block,json=seenBlock,proto3" json:"seen_block,omitempty"` } func (m *RelayCacheGet) Reset() { *m = RelayCacheGet{} } func (m *RelayCacheGet) String() string { return proto.CompactTextString(m) } func (*RelayCacheGet) ProtoMessage() {} func (*RelayCacheGet) Descriptor() ([]byte, []int) { - return fileDescriptor_36fbab536e2bbad1, []int{2} + return fileDescriptor_36fbab536e2bbad1, []int{3} } func (m *RelayCacheGet) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -183,9 +237,9 @@ func (m *RelayCacheGet) XXX_DiscardUnknown() { var xxx_messageInfo_RelayCacheGet proto.InternalMessageInfo -func (m *RelayCacheGet) GetRequest() *RelayPrivateData { +func (m *RelayCacheGet) GetRequestHash() []byte { if m != nil { - return m.Request + return m.RequestHash } return nil } @@ -197,13 +251,6 @@ func (m *RelayCacheGet) GetBlockHash() []byte { return nil } -func (m *RelayCacheGet) GetChainID() string { - if m != nil { - return m.ChainID - } - return "" -} - func (m *RelayCacheGet) GetFinalized() bool { if m != nil { return m.Finalized @@ -211,11 +258,11 @@ func (m *RelayCacheGet) GetFinalized() bool { return false } -func (m *RelayCacheGet) GetProvider() string { +func (m *RelayCacheGet) GetRequestedBlock() int64 { if m != nil { - return m.Provider + return m.RequestedBlock } - return "" + return 0 } func (m *RelayCacheGet) GetSharedStateId() string { @@ -225,22 +272,38 @@ func (m *RelayCacheGet) GetSharedStateId() string { return "" } +func (m *RelayCacheGet) GetChainId() string { + if m != nil { + return m.ChainId + } + return "" +} + +func (m *RelayCacheGet) GetSeenBlock() int64 { + if m != nil { + return m.SeenBlock + } + return 0 +} + type RelayCacheSet struct { - Request *RelayPrivateData `protobuf:"bytes,1,opt,name=request,proto3" json:"request,omitempty"` - BlockHash []byte `protobuf:"bytes,2,opt,name=blockHash,proto3" json:"blockHash,omitempty"` - ChainID string `protobuf:"bytes,3,opt,name=chainID,proto3" json:"chainID,omitempty"` - Response *RelayReply `protobuf:"bytes,4,opt,name=response,proto3" json:"response,omitempty"` - Finalized bool `protobuf:"varint,5,opt,name=finalized,proto3" json:"finalized,omitempty"` - Provider string `protobuf:"bytes,6,opt,name=provider,proto3" json:"provider,omitempty"` - OptionalMetadata []Metadata `protobuf:"bytes,7,rep,name=optional_metadata,json=optionalMetadata,proto3" json:"optional_metadata"` - SharedStateId string `protobuf:"bytes,8,opt,name=shared_state_id,json=sharedStateId,proto3" json:"shared_state_id,omitempty"` + RequestHash []byte `protobuf:"bytes,1,opt,name=request_hash,json=requestHash,proto3" json:"request_hash,omitempty"` + BlockHash []byte `protobuf:"bytes,2,opt,name=block_hash,json=blockHash,proto3" json:"block_hash,omitempty"` + Response *RelayReply `protobuf:"bytes,3,opt,name=response,proto3" json:"response,omitempty"` + Finalized bool `protobuf:"varint,4,opt,name=finalized,proto3" json:"finalized,omitempty"` + OptionalMetadata []Metadata `protobuf:"bytes,5,rep,name=optional_metadata,json=optionalMetadata,proto3" json:"optional_metadata"` + SharedStateId string `protobuf:"bytes,6,opt,name=shared_state_id,json=sharedStateId,proto3" json:"shared_state_id,omitempty"` + RequestedBlock int64 `protobuf:"varint,7,opt,name=requested_block,json=requestedBlock,proto3" json:"requested_block,omitempty"` + ChainId string `protobuf:"bytes,9,opt,name=chain_id,json=chainId,proto3" json:"chain_id,omitempty"` + SeenBlock int64 `protobuf:"varint,10,opt,name=seen_block,json=seenBlock,proto3" json:"seen_block,omitempty"` + AverageBlockTime int64 `protobuf:"varint,11,opt,name=average_block_time,json=averageBlockTime,proto3" json:"average_block_time,omitempty"` } func (m *RelayCacheSet) Reset() { *m = RelayCacheSet{} } func (m *RelayCacheSet) String() string { return proto.CompactTextString(m) } func (*RelayCacheSet) ProtoMessage() {} func (*RelayCacheSet) Descriptor() ([]byte, []int) { - return fileDescriptor_36fbab536e2bbad1, []int{3} + return fileDescriptor_36fbab536e2bbad1, []int{4} } func (m *RelayCacheSet) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -269,9 +332,9 @@ func (m *RelayCacheSet) XXX_DiscardUnknown() { var xxx_messageInfo_RelayCacheSet proto.InternalMessageInfo -func (m *RelayCacheSet) GetRequest() *RelayPrivateData { +func (m *RelayCacheSet) GetRequestHash() []byte { if m != nil { - return m.Request + return m.RequestHash } return nil } @@ -283,13 +346,6 @@ func (m *RelayCacheSet) GetBlockHash() []byte { return nil } -func (m *RelayCacheSet) GetChainID() string { - if m != nil { - return m.ChainID - } - return "" -} - func (m *RelayCacheSet) GetResponse() *RelayReply { if m != nil { return m.Response @@ -304,13 +360,6 @@ func (m *RelayCacheSet) GetFinalized() bool { return false } -func (m *RelayCacheSet) GetProvider() string { - if m != nil { - return m.Provider - } - return "" -} - func (m *RelayCacheSet) GetOptionalMetadata() []Metadata { if m != nil { return m.OptionalMetadata @@ -325,9 +374,38 @@ func (m *RelayCacheSet) GetSharedStateId() string { return "" } +func (m *RelayCacheSet) GetRequestedBlock() int64 { + if m != nil { + return m.RequestedBlock + } + return 0 +} + +func (m *RelayCacheSet) GetChainId() string { + if m != nil { + return m.ChainId + } + return "" +} + +func (m *RelayCacheSet) GetSeenBlock() int64 { + if m != nil { + return m.SeenBlock + } + return 0 +} + +func (m *RelayCacheSet) GetAverageBlockTime() int64 { + if m != nil { + return m.AverageBlockTime + } + return 0 +} + func init() { proto.RegisterType((*CacheRelayReply)(nil), "lavanet.lava.pairing.CacheRelayReply") proto.RegisterType((*CacheUsage)(nil), "lavanet.lava.pairing.CacheUsage") + proto.RegisterType((*CacheHash)(nil), "lavanet.lava.pairing.CacheHash") proto.RegisterType((*RelayCacheGet)(nil), "lavanet.lava.pairing.RelayCacheGet") proto.RegisterType((*RelayCacheSet)(nil), "lavanet.lava.pairing.RelayCacheSet") } @@ -337,43 +415,47 @@ func init() { } var fileDescriptor_36fbab536e2bbad1 = []byte{ - // 571 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xc4, 0x54, 0xcd, 0x6e, 0xd3, 0x4c, - 0x14, 0xb5, 0xdb, 0xfc, 0x38, 0x93, 0x56, 0xfd, 0xbe, 0x51, 0x85, 0x2c, 0x03, 0xc6, 0x32, 0x6a, - 0xc9, 0xca, 0x96, 0x82, 0xc4, 0x8a, 0x05, 0x84, 0xa0, 0x26, 0x12, 0x95, 0xc0, 0x11, 0x12, 0x62, - 0x13, 0x4d, 0xe2, 0x5b, 0x7b, 0x84, 0xe3, 0x31, 0x9e, 0x49, 0x44, 0x78, 0x0a, 0xde, 0x87, 0x17, - 0xe8, 0xb2, 0x4b, 0x56, 0x08, 0x25, 0x4b, 0x9e, 0x00, 0x89, 0x05, 0xf2, 0xc4, 0x4e, 0xda, 0xca, - 0x8d, 0x2a, 0xb1, 0x60, 0x65, 0xdf, 0x33, 0xe7, 0xcc, 0xdc, 0x7b, 0xe6, 0xce, 0x45, 0x47, 0x11, - 0x99, 0x91, 0x18, 0x84, 0x9b, 0x7d, 0xdd, 0x84, 0xd0, 0x94, 0xc6, 0x81, 0x9b, 0x42, 0x44, 0xe6, - 0x2f, 0xc8, 0x38, 0x04, 0x27, 0x49, 0x99, 0x60, 0xf8, 0x30, 0xa7, 0x39, 0xd9, 0xd7, 0xc9, 0x69, - 0xc6, 0x61, 0xc0, 0x02, 0x26, 0x09, 0x6e, 0xf6, 0xb7, 0xe2, 0x1a, 0xd6, 0xcd, 0x5b, 0xe6, 0x8c, - 0xbb, 0x01, 0x63, 0x41, 0x04, 0xae, 0x8c, 0x46, 0xd3, 0x33, 0x17, 0x26, 0x89, 0xc8, 0x17, 0xed, - 0xaf, 0x2a, 0x3a, 0x90, 0x47, 0x7b, 0x99, 0xc2, 0x83, 0x24, 0x9a, 0xe3, 0x27, 0xa8, 0x9a, 0x66, - 0x3f, 0xba, 0x6a, 0xa9, 0xad, 0x66, 0xdb, 0x72, 0xca, 0xd2, 0x71, 0x36, 0x02, 0x6f, 0x45, 0xc7, - 0x6f, 0xd0, 0xff, 0x2c, 0x11, 0x94, 0xc5, 0x24, 0x1a, 0x4e, 0x40, 0x10, 0x9f, 0x08, 0xa2, 0xef, - 0x58, 0xbb, 0xad, 0x66, 0xdb, 0x2c, 0xdf, 0xe3, 0x34, 0x67, 0x75, 0x2a, 0xe7, 0xdf, 0x1f, 0x28, - 0xde, 0x7f, 0x85, 0xbc, 0xc0, 0xf1, 0x7d, 0x84, 0x38, 0x40, 0x3c, 0x1c, 0x45, 0x6c, 0xfc, 0x41, - 0xdf, 0xb5, 0xd4, 0xd6, 0xae, 0xd7, 0xc8, 0x90, 0x4e, 0x06, 0xd8, 0xaf, 0x10, 0x92, 0xc9, 0xbf, - 0xe5, 0x24, 0x00, 0x7c, 0x0f, 0x35, 0x64, 0xd4, 0xa3, 0x82, 0xcb, 0xdc, 0x2b, 0xde, 0x06, 0xc0, - 0x16, 0x6a, 0xca, 0xe0, 0x94, 0x72, 0x0e, 0x5c, 0xdf, 0x91, 0xeb, 0x97, 0x21, 0xfb, 0xa7, 0x8a, - 0xf6, 0xbd, 0xf5, 0x5d, 0x9c, 0x80, 0xc0, 0xcf, 0x50, 0x3d, 0x85, 0x8f, 0x53, 0xe0, 0x22, 0xf7, - 0xe2, 0x78, 0x8b, 0x17, 0xaf, 0x53, 0x3a, 0x23, 0x02, 0xba, 0x44, 0x10, 0xaf, 0x90, 0x65, 0x39, - 0xc9, 0xdc, 0x7b, 0x84, 0x87, 0xf2, 0xcc, 0x3d, 0x6f, 0x03, 0x60, 0x1d, 0xd5, 0xc7, 0x21, 0xa1, - 0x71, 0xbf, 0x2b, 0x6b, 0x6b, 0x78, 0x45, 0x98, 0xe9, 0xce, 0x68, 0x4c, 0x22, 0xfa, 0x19, 0x7c, - 0xbd, 0x62, 0xa9, 0x2d, 0xcd, 0xdb, 0x00, 0xd8, 0x40, 0x5a, 0x92, 0xb2, 0x19, 0xf5, 0x21, 0xd5, - 0xab, 0x52, 0xb8, 0x8e, 0xf1, 0x31, 0x3a, 0xe0, 0x21, 0x49, 0xc1, 0x1f, 0x72, 0x41, 0x04, 0x0c, - 0xa9, 0xaf, 0xd7, 0x24, 0x65, 0x7f, 0x05, 0x0f, 0x32, 0xb4, 0xef, 0xdb, 0xbf, 0x77, 0x2e, 0x57, - 0x3b, 0xf8, 0xa7, 0xd5, 0x3e, 0x45, 0x5a, 0x0a, 0x3c, 0x61, 0x31, 0x07, 0x59, 0xec, 0x6d, 0x9a, - 0x6e, 0xad, 0xb8, 0xea, 0x55, 0x75, 0x9b, 0x57, 0xb5, 0x6b, 0x5e, 0x95, 0x76, 0x6c, 0xfd, 0xaf, - 0x3a, 0xb6, 0xc4, 0x7e, 0xad, 0xc4, 0xfe, 0xf6, 0x2f, 0x15, 0xed, 0xc9, 0x6a, 0x20, 0x95, 0x17, - 0x80, 0xdf, 0x21, 0xed, 0x04, 0x84, 0x84, 0xf0, 0xc3, 0x2d, 0xd5, 0x17, 0xcd, 0x69, 0x1c, 0x95, - 0x93, 0xae, 0xbd, 0x66, 0x5b, 0xc1, 0x7d, 0xa4, 0x0d, 0x6e, 0xbd, 0xf3, 0x00, 0x84, 0x71, 0xc7, - 0x59, 0x8d, 0x0c, 0xa7, 0x18, 0x19, 0xce, 0xcb, 0x6c, 0x64, 0xd8, 0x0a, 0xee, 0xa2, 0x5a, 0x0f, - 0x48, 0x24, 0x42, 0x7c, 0x03, 0xc7, 0xb0, 0xb6, 0x64, 0x25, 0x9f, 0xa9, 0xad, 0x74, 0x9e, 0x9f, - 0x2f, 0x4c, 0xf5, 0x62, 0x61, 0xaa, 0x3f, 0x16, 0xa6, 0xfa, 0x65, 0x69, 0x2a, 0x17, 0x4b, 0x53, - 0xf9, 0xb6, 0x34, 0x95, 0xf7, 0x8f, 0x02, 0x2a, 0xc2, 0xe9, 0xc8, 0x19, 0xb3, 0x89, 0x7b, 0x65, - 0xb0, 0x7d, 0x5a, 0x8f, 0x36, 0x31, 0x4f, 0x80, 0x8f, 0x6a, 0xf2, 0xd8, 0xc7, 0x7f, 0x02, 0x00, - 0x00, 0xff, 0xff, 0xad, 0x47, 0x23, 0x0c, 0x52, 0x05, 0x00, 0x00, + // 638 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x54, 0xcd, 0x6e, 0xd3, 0x40, + 0x10, 0xb6, 0x9b, 0x34, 0x3f, 0x93, 0x96, 0x96, 0x55, 0x85, 0x42, 0x68, 0x8d, 0x31, 0xea, 0xcf, + 0x01, 0xd9, 0x52, 0x91, 0x38, 0x71, 0x80, 0x52, 0xd4, 0x56, 0xa2, 0x12, 0x38, 0x20, 0x21, 0x2e, + 0xd1, 0x26, 0x9e, 0xda, 0x2b, 0x1c, 0xdb, 0x78, 0xb7, 0x15, 0xe5, 0x29, 0x78, 0x1d, 0xc4, 0x0b, + 0xf4, 0xd8, 0x23, 0x27, 0x84, 0xda, 0xa7, 0x80, 0x13, 0xf2, 0xc4, 0x49, 0x93, 0xc8, 0x8d, 0x2a, + 0xc1, 0xc9, 0xde, 0x6f, 0xbf, 0xd9, 0x99, 0xf9, 0xbe, 0xd1, 0xc0, 0x7a, 0xc8, 0x4f, 0x78, 0x84, + 0xca, 0xc9, 0xbe, 0x4e, 0xc2, 0x45, 0x2a, 0x22, 0xdf, 0x49, 0x31, 0xe4, 0xa7, 0x2f, 0x78, 0x2f, + 0x40, 0x3b, 0x49, 0x63, 0x15, 0xb3, 0x95, 0x9c, 0x66, 0x67, 0x5f, 0x3b, 0xa7, 0xb5, 0x56, 0xfc, + 0xd8, 0x8f, 0x89, 0xe0, 0x64, 0x7f, 0x03, 0x6e, 0xcb, 0xbc, 0xfe, 0xc9, 0x9c, 0x71, 0xcf, 0x8f, + 0x63, 0x3f, 0x44, 0x87, 0x4e, 0xdd, 0xe3, 0x23, 0x07, 0xfb, 0x89, 0xca, 0x2f, 0xad, 0xef, 0x3a, + 0x2c, 0x51, 0x6a, 0x37, 0x8b, 0x70, 0x31, 0x09, 0x4f, 0xd9, 0x13, 0x98, 0x4f, 0xb3, 0x9f, 0xa6, + 0x6e, 0xea, 0x5b, 0x8d, 0x6d, 0xd3, 0x2e, 0x2a, 0xc7, 0xbe, 0x0a, 0x70, 0x07, 0x74, 0xf6, 0x06, + 0x6e, 0xc7, 0x89, 0x12, 0x71, 0xc4, 0xc3, 0x4e, 0x1f, 0x15, 0xf7, 0xb8, 0xe2, 0xcd, 0x39, 0xb3, + 0xb4, 0xd5, 0xd8, 0x36, 0x8a, 0xdf, 0x38, 0xcc, 0x59, 0x3b, 0xe5, 0xb3, 0x9f, 0xf7, 0x35, 0x77, + 0x79, 0x18, 0x3e, 0xc4, 0xd9, 0x1a, 0x80, 0x44, 0x8c, 0x3a, 0xdd, 0x30, 0xee, 0x7d, 0x6c, 0x96, + 0x4c, 0x7d, 0xab, 0xe4, 0xd6, 0x33, 0x64, 0x27, 0x03, 0xac, 0x57, 0x00, 0x54, 0xfc, 0x3b, 0xc9, + 0x7d, 0x64, 0xab, 0x50, 0xa7, 0xd3, 0xbe, 0x50, 0x92, 0x6a, 0x2f, 0xbb, 0x57, 0x00, 0x33, 0xa1, + 0x41, 0x87, 0x43, 0x21, 0x25, 0xca, 0xe6, 0x1c, 0xdd, 0x8f, 0x43, 0x56, 0x30, 0x8c, 0xe7, 0x32, + 0x60, 0xcf, 0xa0, 0x9a, 0xe2, 0xa7, 0x63, 0x94, 0x2a, 0x97, 0x61, 0x63, 0x86, 0x0c, 0xaf, 0x53, + 0x71, 0xc2, 0x15, 0xee, 0x72, 0xc5, 0xdd, 0x61, 0x18, 0xbb, 0x0b, 0xb5, 0x5e, 0xc0, 0x45, 0xd4, + 0x11, 0x1e, 0x65, 0xab, 0xbb, 0x55, 0x3a, 0x1f, 0x78, 0xd6, 0x1f, 0x1d, 0x16, 0xdd, 0x91, 0xeb, + 0x7b, 0xa8, 0xd8, 0x03, 0x58, 0xc8, 0xe3, 0x3a, 0x01, 0x97, 0x01, 0xe5, 0x5c, 0x70, 0x1b, 0x39, + 0x46, 0x15, 0xad, 0x01, 0x90, 0x0c, 0x03, 0xc2, 0x1c, 0x11, 0xea, 0x84, 0xd0, 0xf5, 0x2a, 0xd4, + 0x8f, 0x44, 0xc4, 0x43, 0xf1, 0x05, 0x3d, 0x52, 0xaa, 0xe6, 0x5e, 0x01, 0x6c, 0x13, 0x96, 0xf2, + 0xb7, 0xd0, 0xcb, 0xd5, 0x2c, 0x93, 0x9a, 0xb7, 0x46, 0x30, 0x49, 0xca, 0x36, 0x60, 0x49, 0x06, + 0x3c, 0x45, 0xaf, 0x23, 0x15, 0x57, 0x98, 0x15, 0x3f, 0x4f, 0xc5, 0x2f, 0x0e, 0xe0, 0x76, 0x86, + 0x1e, 0x78, 0x13, 0xdd, 0x55, 0x26, 0xba, 0x9b, 0x32, 0xad, 0x3a, 0x6d, 0xda, 0xb7, 0xd2, 0x78, + 0xf3, 0xed, 0xff, 0xd2, 0xfc, 0x53, 0xa8, 0xa5, 0x28, 0x93, 0x38, 0x92, 0x48, 0xbd, 0xdf, 0x64, + 0x6a, 0x47, 0x11, 0x93, 0xd2, 0x95, 0xa7, 0xa5, 0x2b, 0x1c, 0xeb, 0xf9, 0x7f, 0x1a, 0xeb, 0x02, + 0x91, 0x2b, 0x45, 0x22, 0x17, 0xb8, 0x56, 0x2d, 0x74, 0x6d, 0xdc, 0x8d, 0xfa, 0x2c, 0x37, 0x60, + 0xca, 0x0d, 0xf6, 0x08, 0x18, 0x3f, 0xc1, 0x94, 0xfb, 0x38, 0x60, 0x74, 0x94, 0xe8, 0x63, 0xb3, + 0x41, 0xb4, 0xe5, 0xfc, 0x86, 0x98, 0x6f, 0x45, 0x1f, 0xb7, 0x7f, 0xeb, 0xb0, 0x40, 0x12, 0x62, + 0x4a, 0xee, 0xb1, 0xf7, 0x50, 0xdb, 0x43, 0x45, 0x10, 0x7b, 0x38, 0x43, 0xf2, 0xe1, 0xa0, 0xb7, + 0xd6, 0x8b, 0x49, 0x53, 0x3b, 0xc8, 0xd2, 0xd8, 0x01, 0xd4, 0xda, 0x37, 0x7e, 0xb9, 0x8d, 0xaa, + 0x75, 0xc7, 0x1e, 0x2c, 0x3a, 0x7b, 0xb8, 0xe8, 0xec, 0x97, 0xd9, 0xa2, 0xb3, 0x34, 0xb6, 0x0b, + 0x95, 0x7d, 0xe4, 0xa1, 0x0a, 0xd8, 0x35, 0x9c, 0x96, 0x39, 0xa3, 0x2a, 0x5a, 0x2e, 0x96, 0xb6, + 0xf3, 0xfc, 0xec, 0xc2, 0xd0, 0xcf, 0x2f, 0x0c, 0xfd, 0xd7, 0x85, 0xa1, 0x7f, 0xbd, 0x34, 0xb4, + 0xf3, 0x4b, 0x43, 0xfb, 0x71, 0x69, 0x68, 0x1f, 0x36, 0x7d, 0xa1, 0x82, 0xe3, 0xae, 0xdd, 0x8b, + 0xfb, 0xce, 0xc4, 0x3a, 0xfe, 0x3c, 0x5a, 0xc8, 0xea, 0x34, 0x41, 0xd9, 0xad, 0x50, 0xda, 0xc7, + 0x7f, 0x03, 0x00, 0x00, 0xff, 0xff, 0x81, 0x66, 0x88, 0xc1, 0x08, 0x06, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -615,6 +697,48 @@ func (m *CacheUsage) MarshalToSizedBuffer(dAtA []byte) (int, error) { return len(dAtA) - i, nil } +func (m *CacheHash) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *CacheHash) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *CacheHash) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.ChainId) > 0 { + i -= len(m.ChainId) + copy(dAtA[i:], m.ChainId) + i = encodeVarintRelayCache(dAtA, i, uint64(len(m.ChainId))) + i-- + dAtA[i] = 0x12 + } + if m.Request != nil { + { + size, err := m.Request.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintRelayCache(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + func (m *RelayCacheGet) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -635,19 +759,29 @@ func (m *RelayCacheGet) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.SeenBlock != 0 { + i = encodeVarintRelayCache(dAtA, i, uint64(m.SeenBlock)) + i-- + dAtA[i] = 0x38 + } + if len(m.ChainId) > 0 { + i -= len(m.ChainId) + copy(dAtA[i:], m.ChainId) + i = encodeVarintRelayCache(dAtA, i, uint64(len(m.ChainId))) + i-- + dAtA[i] = 0x32 + } if len(m.SharedStateId) > 0 { i -= len(m.SharedStateId) copy(dAtA[i:], m.SharedStateId) i = encodeVarintRelayCache(dAtA, i, uint64(len(m.SharedStateId))) i-- - dAtA[i] = 0x32 + dAtA[i] = 0x2a } - if len(m.Provider) > 0 { - i -= len(m.Provider) - copy(dAtA[i:], m.Provider) - i = encodeVarintRelayCache(dAtA, i, uint64(len(m.Provider))) + if m.RequestedBlock != 0 { + i = encodeVarintRelayCache(dAtA, i, uint64(m.RequestedBlock)) i-- - dAtA[i] = 0x2a + dAtA[i] = 0x20 } if m.Finalized { i-- @@ -657,14 +791,7 @@ func (m *RelayCacheGet) MarshalToSizedBuffer(dAtA []byte) (int, error) { dAtA[i] = 0 } i-- - dAtA[i] = 0x20 - } - if len(m.ChainID) > 0 { - i -= len(m.ChainID) - copy(dAtA[i:], m.ChainID) - i = encodeVarintRelayCache(dAtA, i, uint64(len(m.ChainID))) - i-- - dAtA[i] = 0x1a + dAtA[i] = 0x18 } if len(m.BlockHash) > 0 { i -= len(m.BlockHash) @@ -673,15 +800,10 @@ func (m *RelayCacheGet) MarshalToSizedBuffer(dAtA []byte) (int, error) { i-- dAtA[i] = 0x12 } - if m.Request != nil { - { - size, err := m.Request.MarshalToSizedBuffer(dAtA[:i]) - if err != nil { - return 0, err - } - i -= size - i = encodeVarintRelayCache(dAtA, i, uint64(size)) - } + if len(m.RequestHash) > 0 { + i -= len(m.RequestHash) + copy(dAtA[i:], m.RequestHash) + i = encodeVarintRelayCache(dAtA, i, uint64(len(m.RequestHash))) i-- dAtA[i] = 0xa } @@ -708,12 +830,34 @@ func (m *RelayCacheSet) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.AverageBlockTime != 0 { + i = encodeVarintRelayCache(dAtA, i, uint64(m.AverageBlockTime)) + i-- + dAtA[i] = 0x58 + } + if m.SeenBlock != 0 { + i = encodeVarintRelayCache(dAtA, i, uint64(m.SeenBlock)) + i-- + dAtA[i] = 0x50 + } + if len(m.ChainId) > 0 { + i -= len(m.ChainId) + copy(dAtA[i:], m.ChainId) + i = encodeVarintRelayCache(dAtA, i, uint64(len(m.ChainId))) + i-- + dAtA[i] = 0x4a + } + if m.RequestedBlock != 0 { + i = encodeVarintRelayCache(dAtA, i, uint64(m.RequestedBlock)) + i-- + dAtA[i] = 0x38 + } if len(m.SharedStateId) > 0 { i -= len(m.SharedStateId) copy(dAtA[i:], m.SharedStateId) i = encodeVarintRelayCache(dAtA, i, uint64(len(m.SharedStateId))) i-- - dAtA[i] = 0x42 + dAtA[i] = 0x32 } if len(m.OptionalMetadata) > 0 { for iNdEx := len(m.OptionalMetadata) - 1; iNdEx >= 0; iNdEx-- { @@ -726,16 +870,9 @@ func (m *RelayCacheSet) MarshalToSizedBuffer(dAtA []byte) (int, error) { i = encodeVarintRelayCache(dAtA, i, uint64(size)) } i-- - dAtA[i] = 0x3a + dAtA[i] = 0x2a } } - if len(m.Provider) > 0 { - i -= len(m.Provider) - copy(dAtA[i:], m.Provider) - i = encodeVarintRelayCache(dAtA, i, uint64(len(m.Provider))) - i-- - dAtA[i] = 0x32 - } if m.Finalized { i-- if m.Finalized { @@ -744,7 +881,7 @@ func (m *RelayCacheSet) MarshalToSizedBuffer(dAtA []byte) (int, error) { dAtA[i] = 0 } i-- - dAtA[i] = 0x28 + dAtA[i] = 0x20 } if m.Response != nil { { @@ -756,13 +893,6 @@ func (m *RelayCacheSet) MarshalToSizedBuffer(dAtA []byte) (int, error) { i = encodeVarintRelayCache(dAtA, i, uint64(size)) } i-- - dAtA[i] = 0x22 - } - if len(m.ChainID) > 0 { - i -= len(m.ChainID) - copy(dAtA[i:], m.ChainID) - i = encodeVarintRelayCache(dAtA, i, uint64(len(m.ChainID))) - i-- dAtA[i] = 0x1a } if len(m.BlockHash) > 0 { @@ -772,15 +902,10 @@ func (m *RelayCacheSet) MarshalToSizedBuffer(dAtA []byte) (int, error) { i-- dAtA[i] = 0x12 } - if m.Request != nil { - { - size, err := m.Request.MarshalToSizedBuffer(dAtA[:i]) - if err != nil { - return 0, err - } - i -= size - i = encodeVarintRelayCache(dAtA, i, uint64(size)) - } + if len(m.RequestHash) > 0 { + i -= len(m.RequestHash) + copy(dAtA[i:], m.RequestHash) + i = encodeVarintRelayCache(dAtA, i, uint64(len(m.RequestHash))) i-- dAtA[i] = 0xa } @@ -835,7 +960,7 @@ func (m *CacheUsage) Size() (n int) { return n } -func (m *RelayCacheGet) Size() (n int) { +func (m *CacheHash) Size() (n int) { if m == nil { return 0 } @@ -845,25 +970,44 @@ func (m *RelayCacheGet) Size() (n int) { l = m.Request.Size() n += 1 + l + sovRelayCache(uint64(l)) } - l = len(m.BlockHash) + l = len(m.ChainId) if l > 0 { n += 1 + l + sovRelayCache(uint64(l)) } - l = len(m.ChainID) + return n +} + +func (m *RelayCacheGet) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.RequestHash) + if l > 0 { + n += 1 + l + sovRelayCache(uint64(l)) + } + l = len(m.BlockHash) if l > 0 { n += 1 + l + sovRelayCache(uint64(l)) } if m.Finalized { n += 2 } - l = len(m.Provider) + if m.RequestedBlock != 0 { + n += 1 + sovRelayCache(uint64(m.RequestedBlock)) + } + l = len(m.SharedStateId) if l > 0 { n += 1 + l + sovRelayCache(uint64(l)) } - l = len(m.SharedStateId) + l = len(m.ChainId) if l > 0 { n += 1 + l + sovRelayCache(uint64(l)) } + if m.SeenBlock != 0 { + n += 1 + sovRelayCache(uint64(m.SeenBlock)) + } return n } @@ -873,15 +1017,11 @@ func (m *RelayCacheSet) Size() (n int) { } var l int _ = l - if m.Request != nil { - l = m.Request.Size() - n += 1 + l + sovRelayCache(uint64(l)) - } - l = len(m.BlockHash) + l = len(m.RequestHash) if l > 0 { n += 1 + l + sovRelayCache(uint64(l)) } - l = len(m.ChainID) + l = len(m.BlockHash) if l > 0 { n += 1 + l + sovRelayCache(uint64(l)) } @@ -892,10 +1032,6 @@ func (m *RelayCacheSet) Size() (n int) { if m.Finalized { n += 2 } - l = len(m.Provider) - if l > 0 { - n += 1 + l + sovRelayCache(uint64(l)) - } if len(m.OptionalMetadata) > 0 { for _, e := range m.OptionalMetadata { l = e.Size() @@ -906,6 +1042,19 @@ func (m *RelayCacheSet) Size() (n int) { if l > 0 { n += 1 + l + sovRelayCache(uint64(l)) } + if m.RequestedBlock != 0 { + n += 1 + sovRelayCache(uint64(m.RequestedBlock)) + } + l = len(m.ChainId) + if l > 0 { + n += 1 + l + sovRelayCache(uint64(l)) + } + if m.SeenBlock != 0 { + n += 1 + sovRelayCache(uint64(m.SeenBlock)) + } + if m.AverageBlockTime != 0 { + n += 1 + sovRelayCache(uint64(m.AverageBlockTime)) + } return n } @@ -1142,7 +1291,7 @@ func (m *CacheUsage) Unmarshal(dAtA []byte) error { } return nil } -func (m *RelayCacheGet) Unmarshal(dAtA []byte) error { +func (m *CacheHash) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 for iNdEx < l { @@ -1165,10 +1314,10 @@ func (m *RelayCacheGet) Unmarshal(dAtA []byte) error { fieldNum := int32(wire >> 3) wireType := int(wire & 0x7) if wireType == 4 { - return fmt.Errorf("proto: RelayCacheGet: wiretype end group for non-group") + return fmt.Errorf("proto: CacheHash: wiretype end group for non-group") } if fieldNum <= 0 { - return fmt.Errorf("proto: RelayCacheGet: illegal tag %d (wire type %d)", fieldNum, wire) + return fmt.Errorf("proto: CacheHash: illegal tag %d (wire type %d)", fieldNum, wire) } switch fieldNum { case 1: @@ -1209,7 +1358,89 @@ func (m *RelayCacheGet) Unmarshal(dAtA []byte) error { iNdEx = postIndex case 2: if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field BlockHash", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field ChainId", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRelayCache + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthRelayCache + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthRelayCache + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.ChainId = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipRelayCache(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthRelayCache + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *RelayCacheGet) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRelayCache + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: RelayCacheGet: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: RelayCacheGet: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field RequestHash", wireType) } var byteLen int for shift := uint(0); ; shift += 7 { @@ -1236,16 +1467,16 @@ func (m *RelayCacheGet) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.BlockHash = append(m.BlockHash[:0], dAtA[iNdEx:postIndex]...) - if m.BlockHash == nil { - m.BlockHash = []byte{} + m.RequestHash = append(m.RequestHash[:0], dAtA[iNdEx:postIndex]...) + if m.RequestHash == nil { + m.RequestHash = []byte{} } iNdEx = postIndex - case 3: + case 2: if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field ChainID", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field BlockHash", wireType) } - var stringLen uint64 + var byteLen int for shift := uint(0); ; shift += 7 { if shift >= 64 { return ErrIntOverflowRelayCache @@ -1255,25 +1486,27 @@ func (m *RelayCacheGet) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - stringLen |= uint64(b&0x7F) << shift + byteLen |= int(b&0x7F) << shift if b < 0x80 { break } } - intStringLen := int(stringLen) - if intStringLen < 0 { + if byteLen < 0 { return ErrInvalidLengthRelayCache } - postIndex := iNdEx + intStringLen + postIndex := iNdEx + byteLen if postIndex < 0 { return ErrInvalidLengthRelayCache } if postIndex > l { return io.ErrUnexpectedEOF } - m.ChainID = string(dAtA[iNdEx:postIndex]) + m.BlockHash = append(m.BlockHash[:0], dAtA[iNdEx:postIndex]...) + if m.BlockHash == nil { + m.BlockHash = []byte{} + } iNdEx = postIndex - case 4: + case 3: if wireType != 0 { return fmt.Errorf("proto: wrong wireType = %d for field Finalized", wireType) } @@ -1293,9 +1526,28 @@ func (m *RelayCacheGet) Unmarshal(dAtA []byte) error { } } m.Finalized = bool(v != 0) + case 4: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field RequestedBlock", wireType) + } + m.RequestedBlock = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRelayCache + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.RequestedBlock |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } case 5: if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Provider", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field SharedStateId", wireType) } var stringLen uint64 for shift := uint(0); ; shift += 7 { @@ -1323,11 +1575,11 @@ func (m *RelayCacheGet) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.Provider = string(dAtA[iNdEx:postIndex]) + m.SharedStateId = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex case 6: if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field SharedStateId", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field ChainId", wireType) } var stringLen uint64 for shift := uint(0); ; shift += 7 { @@ -1355,8 +1607,27 @@ func (m *RelayCacheGet) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.SharedStateId = string(dAtA[iNdEx:postIndex]) + m.ChainId = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex + case 7: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field SeenBlock", wireType) + } + m.SeenBlock = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRelayCache + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.SeenBlock |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipRelayCache(dAtA[iNdEx:]) @@ -1409,9 +1680,9 @@ func (m *RelayCacheSet) Unmarshal(dAtA []byte) error { switch fieldNum { case 1: if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Request", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field RequestHash", wireType) } - var msglen int + var byteLen int for shift := uint(0); ; shift += 7 { if shift >= 64 { return ErrIntOverflowRelayCache @@ -1421,26 +1692,24 @@ func (m *RelayCacheSet) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - msglen |= int(b&0x7F) << shift + byteLen |= int(b&0x7F) << shift if b < 0x80 { break } } - if msglen < 0 { + if byteLen < 0 { return ErrInvalidLengthRelayCache } - postIndex := iNdEx + msglen + postIndex := iNdEx + byteLen if postIndex < 0 { return ErrInvalidLengthRelayCache } if postIndex > l { return io.ErrUnexpectedEOF } - if m.Request == nil { - m.Request = &RelayPrivateData{} - } - if err := m.Request.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { - return err + m.RequestHash = append(m.RequestHash[:0], dAtA[iNdEx:postIndex]...) + if m.RequestHash == nil { + m.RequestHash = []byte{} } iNdEx = postIndex case 2: @@ -1478,38 +1747,6 @@ func (m *RelayCacheSet) Unmarshal(dAtA []byte) error { } iNdEx = postIndex case 3: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field ChainID", wireType) - } - var stringLen uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowRelayCache - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - stringLen |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - intStringLen := int(stringLen) - if intStringLen < 0 { - return ErrInvalidLengthRelayCache - } - postIndex := iNdEx + intStringLen - if postIndex < 0 { - return ErrInvalidLengthRelayCache - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.ChainID = string(dAtA[iNdEx:postIndex]) - iNdEx = postIndex - case 4: if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field Response", wireType) } @@ -1545,7 +1782,7 @@ func (m *RelayCacheSet) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex - case 5: + case 4: if wireType != 0 { return fmt.Errorf("proto: wrong wireType = %d for field Finalized", wireType) } @@ -1565,11 +1802,11 @@ func (m *RelayCacheSet) Unmarshal(dAtA []byte) error { } } m.Finalized = bool(v != 0) - case 6: + case 5: if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Provider", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field OptionalMetadata", wireType) } - var stringLen uint64 + var msglen int for shift := uint(0); ; shift += 7 { if shift >= 64 { return ErrIntOverflowRelayCache @@ -1579,29 +1816,31 @@ func (m *RelayCacheSet) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - stringLen |= uint64(b&0x7F) << shift + msglen |= int(b&0x7F) << shift if b < 0x80 { break } } - intStringLen := int(stringLen) - if intStringLen < 0 { + if msglen < 0 { return ErrInvalidLengthRelayCache } - postIndex := iNdEx + intStringLen + postIndex := iNdEx + msglen if postIndex < 0 { return ErrInvalidLengthRelayCache } if postIndex > l { return io.ErrUnexpectedEOF } - m.Provider = string(dAtA[iNdEx:postIndex]) + m.OptionalMetadata = append(m.OptionalMetadata, Metadata{}) + if err := m.OptionalMetadata[len(m.OptionalMetadata)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } iNdEx = postIndex - case 7: + case 6: if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field OptionalMetadata", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field SharedStateId", wireType) } - var msglen int + var stringLen uint64 for shift := uint(0); ; shift += 7 { if shift >= 64 { return ErrIntOverflowRelayCache @@ -1611,29 +1850,46 @@ func (m *RelayCacheSet) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - msglen |= int(b&0x7F) << shift + stringLen |= uint64(b&0x7F) << shift if b < 0x80 { break } } - if msglen < 0 { + intStringLen := int(stringLen) + if intStringLen < 0 { return ErrInvalidLengthRelayCache } - postIndex := iNdEx + msglen + postIndex := iNdEx + intStringLen if postIndex < 0 { return ErrInvalidLengthRelayCache } if postIndex > l { return io.ErrUnexpectedEOF } - m.OptionalMetadata = append(m.OptionalMetadata, Metadata{}) - if err := m.OptionalMetadata[len(m.OptionalMetadata)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { - return err - } + m.SharedStateId = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex - case 8: + case 7: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field RequestedBlock", wireType) + } + m.RequestedBlock = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRelayCache + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.RequestedBlock |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 9: if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field SharedStateId", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field ChainId", wireType) } var stringLen uint64 for shift := uint(0); ; shift += 7 { @@ -1661,8 +1917,46 @@ func (m *RelayCacheSet) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.SharedStateId = string(dAtA[iNdEx:postIndex]) + m.ChainId = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex + case 10: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field SeenBlock", wireType) + } + m.SeenBlock = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRelayCache + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.SeenBlock |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 11: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field AverageBlockTime", wireType) + } + m.AverageBlockTime = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRelayCache + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.AverageBlockTime |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipRelayCache(dAtA[iNdEx:]) diff --git a/x/protocol/types/params.go b/x/protocol/types/params.go index a480ba6c09..aa1050628e 100644 --- a/x/protocol/types/params.go +++ b/x/protocol/types/params.go @@ -12,7 +12,7 @@ import ( var _ paramtypes.ParamSet = (*Params)(nil) const ( - TARGET_VERSION = "1.0.0" + TARGET_VERSION = "1.0.2" MIN_VERSION = "0.35.6" )