diff --git a/cookbook/specs/near.json b/cookbook/specs/near.json index 037335089c..3deff64338 100644 --- a/cookbook/specs/near.json +++ b/cookbook/specs/near.json @@ -46,7 +46,15 @@ "subscription": false, "stateful": 0 }, - "extra_compute_units": 0 + "extra_compute_units": 0, + "parsers": [ + { + "parse_path": ".params.finality", + "value": "latest", + "rule": "=final || =optimistic", + "parse_type": "DEFAULT_VALUE" + } + ] }, { "name": "EXPERIMENTAL_changes", @@ -339,7 +347,13 @@ "hanging_api": true }, "extra_compute_units": 0, - "timeout_ms": 10000 + "timeout_ms": 10000, + "parsers": [ + { + "parse_path": ".params.[0]", + "parse_type": "BLOCK_HASH" + } + ] }, { "name": "EXPERIMENTAL_tx_status", @@ -357,7 +371,13 @@ "subscription": false, "stateful": 0 }, - "extra_compute_units": 0 + "extra_compute_units": 0, + "parsers": [ + { + "parse_path": ".params.tx_hash", + "parse_type": "BLOCK_HASH" + } + ] }, { "name": "EXPERIMENTAL_receipt", diff --git a/ecosystem/cache/cache_test.go b/ecosystem/cache/cache_test.go index 3f6e763a4e..1f2cfbbbbc 100644 --- a/ecosystem/cache/cache_test.go +++ b/ecosystem/cache/cache_test.go @@ -31,7 +31,16 @@ const ( func initTest() (context.Context, *cache.RelayerCacheServer) { ctx := context.Background() cs := cache.CacheServer{CacheMaxCost: 2 * 1024 * 1024 * 1024} - cs.InitCache(ctx, cache.DefaultExpirationTimeFinalized, cache.DefaultExpirationForNonFinalized, cache.DisabledFlagOption, cache.DefaultExpirationTimeFinalizedMultiplier, cache.DefaultExpirationTimeNonFinalizedMultiplier) + cs.InitCache( + ctx, + cache.DefaultExpirationTimeFinalized, + cache.DefaultExpirationForNonFinalized, + cache.DefaultExpirationNodeErrors, + cache.DefaultExpirationBlocksHashesToHeights, + cache.DisabledFlagOption, + cache.DefaultExpirationTimeFinalizedMultiplier, + cache.DefaultExpirationTimeNonFinalizedMultiplier, + ) cacheServer := &cache.RelayerCacheServer{CacheServer: &cs} return ctx, cacheServer } @@ -85,11 +94,12 @@ func TestCacheSetGet(t *testing.T) { Finalized: tt.finalized, RequestedBlock: request.RequestBlock, } - _, err = cacheServer.GetRelay(ctx, &messageGet) + reply, err := cacheServer.GetRelay(ctx, &messageGet) + require.NoError(t, err) if tt.valid { - require.NoError(t, err) + require.NotNil(t, reply.Reply) } else { - require.Error(t, err) + require.Nil(t, reply.Reply) } }) } @@ -169,9 +179,9 @@ func TestCacheGetWithoutSet(t *testing.T) { Finalized: tt.finalized, RequestedBlock: request.RequestBlock, } - _, err := cacheServer.GetRelay(ctx, &messageGet) - - require.Error(t, err) + reply, err := cacheServer.GetRelay(ctx, &messageGet) + require.Nil(t, reply.Reply) + require.NoError(t, err) }) } } @@ -333,7 +343,7 @@ func TestCacheSetGetLatest(t *testing.T) { require.Equal(t, cacheReply.GetReply().LatestBlock, latestBlockForRelay) } } else { - require.Error(t, err) + require.Nil(t, cacheReply.Reply) } }) } @@ -410,7 +420,7 @@ func TestCacheSetGetLatestWhenAdvancingLatest(t *testing.T) { require.Equal(t, cacheReply.GetReply().LatestBlock, latestBlockForRelay) } } else { - require.Error(t, err) + require.Nil(t, cacheReply.Reply) } request2 := shallowCopy(request) @@ -435,8 +445,9 @@ func TestCacheSetGetLatestWhenAdvancingLatest(t *testing.T) { RequestedBlock: request.RequestBlock, } // repeat our latest block get, this time we expect it to look for a newer block and fail - _, err = cacheServer.GetRelay(ctx, &messageGet) - require.Error(t, err) + reply, err := cacheServer.GetRelay(ctx, &messageGet) + require.NoError(t, err) + require.Nil(t, reply.Reply) }) } } @@ -462,7 +473,7 @@ func TestCacheSetGetJsonRPCWithID(t *testing.T) { {name: "NonFinalized With Hash", valid: true, delay: time.Millisecond, finalized: false, hash: []byte{1, 2, 3}}, {name: "NonFinalized After delay With Hash", valid: true, delay: cache.DefaultExpirationForNonFinalized + time.Millisecond, finalized: false, hash: []byte{1, 2, 3}}, - // Null ID in get and set + // // Null ID in get and set {name: "Finalized No Hash, with null id in get and set", valid: true, delay: time.Millisecond, finalized: true, hash: nil, nullIdInGet: true, nullIdInSet: true}, {name: "Finalized After delay No Hash, with null id in get and set", valid: true, delay: cache.DefaultExpirationForNonFinalized + time.Millisecond, finalized: true, hash: nil, nullIdInGet: true, nullIdInSet: true}, {name: "NonFinalized No Hash, with null id in get and set", valid: true, delay: time.Millisecond, finalized: false, hash: nil, nullIdInGet: true, nullIdInSet: true}, @@ -472,7 +483,7 @@ func TestCacheSetGetJsonRPCWithID(t *testing.T) { {name: "NonFinalized With Hash, with null id in get and set", valid: true, delay: time.Millisecond, finalized: false, hash: []byte{1, 2, 3}, nullIdInGet: true, nullIdInSet: true}, {name: "NonFinalized After delay With Hash, with null id in get and set", valid: true, delay: cache.DefaultExpirationForNonFinalized + time.Millisecond, finalized: false, hash: []byte{1, 2, 3}, nullIdInGet: true, nullIdInSet: true}, - // Null ID only in get + // // Null ID only in get {name: "Finalized No Hash, with null id only in get", valid: true, delay: time.Millisecond, finalized: true, hash: nil, nullIdInGet: true}, {name: "Finalized After delay No Hash, with null id only in get", valid: true, delay: cache.DefaultExpirationForNonFinalized + time.Millisecond, finalized: true, hash: nil, nullIdInGet: true}, {name: "NonFinalized No Hash, with null id only in get", valid: true, delay: time.Millisecond, finalized: false, hash: nil, nullIdInGet: true}, @@ -482,7 +493,7 @@ func TestCacheSetGetJsonRPCWithID(t *testing.T) { {name: "NonFinalized With Hash, with null id only in get", valid: true, delay: time.Millisecond, finalized: false, hash: []byte{1, 2, 3}, nullIdInGet: true}, {name: "NonFinalized After delay With Hash, with null id only in get", valid: true, delay: cache.DefaultExpirationForNonFinalized + time.Millisecond, finalized: false, hash: []byte{1, 2, 3}, nullIdInGet: true}, - // Null ID only in set + // // Null ID only in set {name: "Finalized No Hash, with null id only in set", valid: true, delay: time.Millisecond, finalized: true, hash: nil, nullIdInSet: true}, {name: "Finalized After delay No Hash, with null id only in set", valid: true, delay: cache.DefaultExpirationForNonFinalized + time.Millisecond, finalized: true, hash: nil, nullIdInSet: true}, {name: "NonFinalized No Hash, with null id only in set", valid: true, delay: time.Millisecond, finalized: false, hash: nil, nullIdInSet: true}, @@ -547,20 +558,21 @@ func TestCacheSetGetJsonRPCWithID(t *testing.T) { } cacheReply, err := cacheServer.GetRelay(ctx, &messageGet) + // because we always need a cache reply. we cant return an error in any case. + // grpc do not allow returning errors + messages + require.NoError(t, err) + if tt.valid { cacheReply.Reply.Data = outputFormatter(cacheReply.Reply.Data) - require.NoError(t, err) - result := gjson.GetBytes(cacheReply.GetReply().Data, format.IDFieldName) extractedID := result.Raw - if tt.nullIdInGet { require.Equal(t, "null", extractedID) } else { require.Equal(t, strconv.FormatInt(changedID, 10), extractedID) } } else { - require.Error(t, err) + require.Nil(t, cacheReply.Reply) } }) } @@ -583,7 +595,16 @@ func TestCacheExpirationMultiplier(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { cs := cache.CacheServer{CacheMaxCost: 2 * 1024 * 1024 * 1024} - cs.InitCache(context.Background(), cache.DefaultExpirationTimeFinalized, cache.DefaultExpirationForNonFinalized, cache.DisabledFlagOption, 1, tt.multiplier) + cs.InitCache( + context.Background(), + cache.DefaultExpirationTimeFinalized, + cache.DefaultExpirationForNonFinalized, + cache.DefaultExpirationNodeErrors, + cache.DefaultExpirationBlocksHashesToHeights, + cache.DisabledFlagOption, + cache.DefaultExpirationTimeFinalizedMultiplier, + tt.multiplier, + ) cacheServer := &cache.RelayerCacheServer{CacheServer: &cs} durationActual := cacheServer.CacheServer.ExpirationForChain(cache.DefaultExpirationForNonFinalized) @@ -591,3 +612,246 @@ func TestCacheExpirationMultiplier(t *testing.T) { }) } } + +func TestCacheSetGetBlocksHashesToHeightsHappyFlow(t *testing.T) { + t.Parallel() + const ( + SET_INPUT int = iota + GET_INPUT + EXPECTED_FROM_GET + ) + + type step struct { + blockHashesToHeights []*pairingtypes.BlockHashToHeight + inputOrExpected int + } + + steps := []step{ + { + inputOrExpected: SET_INPUT, + blockHashesToHeights: []*pairingtypes.BlockHashToHeight{}, + }, + { + inputOrExpected: GET_INPUT, + blockHashesToHeights: []*pairingtypes.BlockHashToHeight{ + {Hash: "H1"}, + {Hash: "H2"}, + }, + }, + { + inputOrExpected: EXPECTED_FROM_GET, + blockHashesToHeights: []*pairingtypes.BlockHashToHeight{ + { + Hash: "H1", + Height: spectypes.NOT_APPLICABLE, + }, + { + Hash: "H2", + Height: spectypes.NOT_APPLICABLE, + }, + }, + }, + { + inputOrExpected: SET_INPUT, + blockHashesToHeights: []*pairingtypes.BlockHashToHeight{ + { + Hash: "H1", + Height: 1, + }, + }, + }, + { + inputOrExpected: GET_INPUT, + blockHashesToHeights: []*pairingtypes.BlockHashToHeight{ + {Hash: "H1"}, + {Hash: "H2"}, + }, + }, + { + inputOrExpected: EXPECTED_FROM_GET, + blockHashesToHeights: []*pairingtypes.BlockHashToHeight{ + { + Hash: "H1", + Height: 1, + }, + { + Hash: "H2", + Height: spectypes.NOT_APPLICABLE, + }, + }, + }, + { + inputOrExpected: GET_INPUT, + blockHashesToHeights: []*pairingtypes.BlockHashToHeight{ + {Hash: "H1"}, + }, + }, + { + inputOrExpected: EXPECTED_FROM_GET, + blockHashesToHeights: []*pairingtypes.BlockHashToHeight{ + { + Hash: "H1", + Height: 1, + }, + }, + }, + { + inputOrExpected: GET_INPUT, + blockHashesToHeights: []*pairingtypes.BlockHashToHeight{ + {Hash: "H2"}, + }, + }, + { + inputOrExpected: EXPECTED_FROM_GET, + blockHashesToHeights: []*pairingtypes.BlockHashToHeight{ + { + Hash: "H2", + Height: spectypes.NOT_APPLICABLE, + }, + }, + }, + { + inputOrExpected: SET_INPUT, + blockHashesToHeights: []*pairingtypes.BlockHashToHeight{ + { + Hash: "H3", + Height: 3, + }, + }, + }, + { + inputOrExpected: GET_INPUT, + blockHashesToHeights: []*pairingtypes.BlockHashToHeight{ + {Hash: "H1"}, + {Hash: "H2"}, + }, + }, + { + inputOrExpected: EXPECTED_FROM_GET, + blockHashesToHeights: []*pairingtypes.BlockHashToHeight{ + { + Hash: "H1", + Height: 1, + }, + { + Hash: "H2", + Height: spectypes.NOT_APPLICABLE, + }, + }, + }, + { + inputOrExpected: GET_INPUT, + blockHashesToHeights: []*pairingtypes.BlockHashToHeight{ + {Hash: "H1"}, + {Hash: "H2"}, + {Hash: "H3"}, + }, + }, + { + inputOrExpected: EXPECTED_FROM_GET, + blockHashesToHeights: []*pairingtypes.BlockHashToHeight{ + { + Hash: "H1", + Height: 1, + }, + { + Hash: "H2", + Height: spectypes.NOT_APPLICABLE, + }, + { + Hash: "H3", + Height: 3, + }, + }, + }, + { + inputOrExpected: SET_INPUT, + blockHashesToHeights: []*pairingtypes.BlockHashToHeight{ + { + Hash: "H1", + Height: 4, + }, + { + Hash: "H2", + Height: 2, + }, + { + Hash: "H5", + Height: 7, + }, + }, + }, + { + inputOrExpected: GET_INPUT, + blockHashesToHeights: []*pairingtypes.BlockHashToHeight{ + {Hash: "H1"}, + {Hash: "H2"}, + {Hash: "H3"}, + {Hash: "H5"}, + }, + }, + { + inputOrExpected: EXPECTED_FROM_GET, + blockHashesToHeights: []*pairingtypes.BlockHashToHeight{ + { + Hash: "H1", + Height: 4, + }, + { + Hash: "H2", + Height: 2, + }, + { + Hash: "H3", + Height: 3, + }, + { + Hash: "H5", + Height: 7, + }, + }, + }, + } + + t.Run("run cache steps", func(t *testing.T) { + ctx, cacheServer := initTest() + request := getRequest(1230, []byte(StubSig), StubApiInterface) + + var lastCacheResult []*pairingtypes.BlockHashToHeight + for stepNum, step := range steps { + switch step.inputOrExpected { + case SET_INPUT: + messageSet := pairingtypes.RelayCacheSet{ + RequestHash: HashRequest(t, request, StubChainID), + BlockHash: []byte("123456789"), + ChainId: StubChainID, + Response: &pairingtypes.RelayReply{}, + Finalized: true, + RequestedBlock: request.RequestBlock, + BlocksHashesToHeights: step.blockHashesToHeights, + } + + _, err := cacheServer.SetRelay(ctx, &messageSet) + require.NoError(t, err, "step: %d", stepNum) + + // sleep to make sure it's in the cache + time.Sleep(3 * time.Millisecond) + case GET_INPUT: + messageGet := pairingtypes.RelayCacheGet{ + RequestHash: HashRequest(t, request, StubChainID), + BlockHash: []byte("123456789"), + ChainId: StubChainID, + Finalized: true, + RequestedBlock: request.RequestBlock, + BlocksHashesToHeights: step.blockHashesToHeights, + } + + cacheResult, err := cacheServer.GetRelay(ctx, &messageGet) + require.NoError(t, err, "step: %d", stepNum) + lastCacheResult = cacheResult.BlocksHashesToHeights + case EXPECTED_FROM_GET: + require.Equal(t, step.blockHashesToHeights, lastCacheResult, "step: %d", stepNum) + } + } + }) +} diff --git a/ecosystem/cache/command.go b/ecosystem/cache/command.go index 6a5c9972dd..c138a59134 100644 --- a/ecosystem/cache/command.go +++ b/ecosystem/cache/command.go @@ -44,6 +44,7 @@ longer DefaultExpirationForNonFinalized will reduce sync QoS for "latest" reques cacheCmd.Flags().Duration(ExpirationNonFinalizedFlagName, DefaultExpirationForNonFinalized, "how long does a cache entry lasts in the cache for a non finalized entry") cacheCmd.Flags().Float64(ExpirationTimeFinalizedMultiplierFlagName, DefaultExpirationTimeFinalizedMultiplier, "Multiplier for finalized cache entry expiration. 1 means no change (default), 1.2 means 20% longer.") cacheCmd.Flags().Float64(ExpirationTimeNonFinalizedMultiplierFlagName, DefaultExpirationTimeNonFinalizedMultiplier, "Multiplier for non-finalized cache entry expiration. 1 means no change (default), 1.2 means 20% longer.") + cacheCmd.Flags().Duration(ExpirationBlocksHashesToHeightsFlagName, DefaultExpirationBlocksHashesToHeights, "how long does the cache entry lasts in the cache for a block hash to height entry") cacheCmd.Flags().Duration(ExpirationNodeErrorsOnFinalizedFlagName, DefaultExpirationNodeErrors, "how long does a cache entry lasts in the cache for a finalized node error entry") cacheCmd.Flags().String(FlagMetricsAddress, DisabledFlagOption, "address to listen to prometheus metrics 127.0.0.1:5555, later you can curl http://127.0.0.1:5555/metrics") cacheCmd.Flags().Int64(FlagCacheSizeName, 2*1024*1024*1024, "the maximal amount of entries to save") diff --git a/ecosystem/cache/handlers.go b/ecosystem/cache/handlers.go index 9d4c3a3f73..49cbd6adba 100644 --- a/ecosystem/cache/handlers.go +++ b/ecosystem/cache/handlers.go @@ -85,12 +85,34 @@ func (s *RelayerCacheServer) getSeenBlockForSharedStateMode(chainId string, shar return 0 } +func (s *RelayerCacheServer) getBlockHeightsFromHashes(chainId string, hashes []*pairingtypes.BlockHashToHeight) []*pairingtypes.BlockHashToHeight { + for _, hashToHeight := range hashes { + formattedKey := s.formatChainIdWithHashKey(chainId, hashToHeight.Hash) + value, found := getNonExpiredFromCache(s.CacheServer.blocksHashesToHeightsCache, formattedKey) + if found { + if cacheValue, ok := value.(int64); ok { + hashToHeight.Height = cacheValue + } + } else { + hashToHeight.Height = spectypes.NOT_APPLICABLE + } + } + return hashes +} + func (s *RelayerCacheServer) GetRelay(ctx context.Context, relayCacheGet *pairingtypes.RelayCacheGet) (*pairingtypes.CacheRelayReply, error) { cacheReply := &pairingtypes.CacheRelayReply{} var cacheReplyTmp *pairingtypes.CacheRelayReply var err error var seenBlock int64 + // validating that if we had an error, we do not return a reply. + defer func() { + if err != nil { + cacheReply.Reply = nil + } + }() + originalRequestedBlock := relayCacheGet.RequestedBlock // save requested block prior to swap if originalRequestedBlock < 0 { // we need to fetch stored latest block information. getLatestBlock := s.getLatestBlock(latestBlockKey(relayCacheGet.ChainId, "")) @@ -104,11 +126,15 @@ func (s *RelayerCacheServer) GetRelay(ctx context.Context, relayCacheGet *pairin utils.Attribute{Key: "requested_block_parsed", Value: relayCacheGet.RequestedBlock}, utils.Attribute{Key: "seen_block", Value: relayCacheGet.SeenBlock}, ) + + var blockHashes []*pairingtypes.BlockHashToHeight if relayCacheGet.RequestedBlock >= 0 { // we can only fetch - // check seen block is larger than our requested block, we don't need to fetch seen block prior as its already larger than requested block + // we don't need to fetch seen block prior as its already larger than requested block waitGroup := sync.WaitGroup{} - waitGroup.Add(2) // currently we have two groups getRelayInner and getSeenBlock - // fetch all reads at the same time. + waitGroup.Add(3) // currently we have three groups: getRelayInner, getSeenBlock and getBlockHeightsFromHashes + + // fetch all reads at the same time: + // fetch the cache entry go func() { defer waitGroup.Done() cacheReplyTmp, err = s.getRelayInner(relayCacheGet) @@ -116,6 +142,8 @@ func (s *RelayerCacheServer) GetRelay(ctx context.Context, relayCacheGet *pairin cacheReply = cacheReplyTmp // set cache reply only if its not nil, as we need to store seen block in it. } }() + + // fetch seen block go func() { defer waitGroup.Done() // set seen block if required @@ -124,8 +152,16 @@ func (s *RelayerCacheServer) GetRelay(ctx context.Context, relayCacheGet *pairin relayCacheGet.SeenBlock = seenBlock // update state. } }() + + // fetch block hashes + go func() { + defer waitGroup.Done() + blockHashes = s.getBlockHeightsFromHashes(relayCacheGet.ChainId, relayCacheGet.BlocksHashesToHeights) + }() + // wait for all reads to complete before moving forward waitGroup.Wait() + if err == nil { // in case we got a hit validate seen block of the reply. // validate that the response seen block is larger or equal to our expectations. if cacheReply.SeenBlock < lavaslices.Min([]int64{relayCacheGet.SeenBlock, relayCacheGet.RequestedBlock}) { // TODO unitest this. @@ -138,6 +174,7 @@ func (s *RelayerCacheServer) GetRelay(ctx context.Context, relayCacheGet *pairin ) } } + // set seen block. if relayCacheGet.SeenBlock > cacheReply.SeenBlock { cacheReply.SeenBlock = relayCacheGet.SeenBlock @@ -148,22 +185,32 @@ func (s *RelayerCacheServer) GetRelay(ctx context.Context, relayCacheGet *pairin utils.LogAttr("requested block", relayCacheGet.RequestedBlock), utils.LogAttr("request_hash", string(relayCacheGet.RequestHash)), ) + // even if we don't have information on requested block, we can still check if we have data on the block hash array. + blockHashes = s.getBlockHeightsFromHashes(relayCacheGet.ChainId, relayCacheGet.BlocksHashesToHeights) + } + + cacheReply.BlocksHashesToHeights = blockHashes + if blockHashes != nil { + utils.LavaFormatDebug("block hashes:", utils.LogAttr("hashes", blockHashes)) } // add prometheus metrics asynchronously + cacheHit := cacheReply.Reply != nil go func() { cacheMetricsContext, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() - var hit bool - if err != nil { - s.cacheMiss(cacheMetricsContext, err) - } else { - hit = true + + if cacheHit { s.cacheHit(cacheMetricsContext) + } else { + s.cacheMiss(cacheMetricsContext, err) } - s.CacheServer.CacheMetrics.AddApiSpecific(originalRequestedBlock, relayCacheGet.ChainId, hit) + + s.CacheServer.CacheMetrics.AddApiSpecific(originalRequestedBlock, relayCacheGet.ChainId, cacheHit) }() - return cacheReply, err + // no matter what we return nil from cache. as we need additional info even if we had cache miss + // such as block hashes array, seen block, etc... + return cacheReply, nil } // formatHashKey formats the hash key by adding latestBlock information. @@ -173,6 +220,10 @@ func (s *RelayerCacheServer) formatHashKey(hash []byte, parsedRequestedBlock int return hash } +func (s *RelayerCacheServer) formatChainIdWithHashKey(chainId, hash string) string { + return chainId + "_" + hash +} + func (s *RelayerCacheServer) getRelayInner(relayCacheGet *pairingtypes.RelayCacheGet) (*pairingtypes.CacheRelayReply, error) { // cache key is compressed from: // 1. Request hash including all the information inside RelayPrivateData (Salt can cause issues if not dealt with on consumer side.) @@ -249,6 +300,15 @@ func (s *RelayerCacheServer) setSeenBlockOnSharedStateMode(chainId, sharedStateI s.performInt64WriteWithValidationAndRetry(get, set, seenBlock) } +func (s *RelayerCacheServer) setBlocksHashesToHeights(chainId string, blocksHashesToHeights []*pairingtypes.BlockHashToHeight) { + for _, hashToHeight := range blocksHashesToHeights { + if hashToHeight.Height >= 0 { + formattedKey := s.formatChainIdWithHashKey(chainId, hashToHeight.Hash) + s.CacheServer.blocksHashesToHeightsCache.SetWithTTL(formattedKey, hashToHeight.Height, 1, s.CacheServer.ExpirationBlocksHashesToHeights) + } + } +} + func (s *RelayerCacheServer) SetRelay(ctx context.Context, relayCacheSet *pairingtypes.RelayCacheSet) (*emptypb.Empty, error) { if relayCacheSet.RequestedBlock < 0 { return nil, utils.LavaFormatError("invalid relay cache set data, request block is negative", nil, utils.Attribute{Key: "requestBlock", Value: relayCacheSet.RequestedBlock}) @@ -265,6 +325,7 @@ func (s *RelayerCacheServer) SetRelay(ctx context.Context, relayCacheSet *pairin utils.Attribute{Key: "requestHash", Value: string(relayCacheSet.BlockHash)}, utils.Attribute{Key: "latestKnownBlock", Value: string(relayCacheSet.BlockHash)}, utils.Attribute{Key: "IsNodeError", Value: relayCacheSet.IsNodeError}, + utils.Attribute{Key: "BlocksHashesToHeights", Value: relayCacheSet.BlocksHashesToHeights}, ) // finalized entries can stay there if relayCacheSet.Finalized { @@ -282,6 +343,7 @@ func (s *RelayerCacheServer) SetRelay(ctx context.Context, relayCacheSet *pairin // Setting the seen block for shared state. s.setSeenBlockOnSharedStateMode(relayCacheSet.ChainId, relayCacheSet.SharedStateId, latestKnownBlock) s.setLatestBlock(latestBlockKey(relayCacheSet.ChainId, ""), latestKnownBlock) + s.setBlocksHashesToHeights(relayCacheSet.ChainId, relayCacheSet.BlocksHashesToHeights) return &emptypb.Empty{}, nil } diff --git a/ecosystem/cache/server.go b/ecosystem/cache/server.go index 53fcce34b2..d5e34e9717 100644 --- a/ecosystem/cache/server.go +++ b/ecosystem/cache/server.go @@ -25,47 +25,67 @@ import ( ) const ( - ExpirationFlagName = "expiration" - ExpirationTimeFinalizedMultiplierFlagName = "expiration-multiplier" - ExpirationNonFinalizedFlagName = "expiration-non-finalized" - ExpirationTimeNonFinalizedMultiplierFlagName = "expiration-non-finalized-multiplier" - ExpirationNodeErrorsOnFinalizedFlagName = "expiration-finalized-node-errors" - FlagCacheSizeName = "max-items" - DefaultExpirationForNonFinalized = 500 * time.Millisecond - DefaultExpirationTimeFinalizedMultiplier = 1.0 - DefaultExpirationTimeNonFinalizedMultiplier = 1.0 - DefaultExpirationTimeFinalized = time.Hour - DefaultExpirationNodeErrors = 250 * time.Millisecond - CacheNumCounters = 100000000 // expect 10M items - unixPrefix = "unix:" + ExpirationFlagName = "expiration" + ExpirationTimeFinalizedMultiplierFlagName = "expiration-multiplier" + ExpirationNonFinalizedFlagName = "expiration-non-finalized" + ExpirationTimeNonFinalizedMultiplierFlagName = "expiration-non-finalized-multiplier" + ExpirationBlocksHashesToHeightsFlagName = "expiration-blocks-hashes-to-heights" + ExpirationNodeErrorsOnFinalizedFlagName = "expiration-finalized-node-errors" + FlagCacheSizeName = "max-items" + DefaultExpirationForNonFinalized = 500 * time.Millisecond + DefaultExpirationTimeFinalizedMultiplier = 1.0 + DefaultExpirationTimeNonFinalizedMultiplier = 1.0 + DefaultExpirationBlocksHashesToHeights = 48 * time.Hour + DefaultExpirationBlocksHashesToHeightsMultiplier = 1.0 + DefaultExpirationTimeFinalized = time.Hour + DefaultExpirationNodeErrors = 250 * time.Millisecond + CacheNumCounters = 100000000 // expect 10M items + unixPrefix = "unix:" ) type CacheServer struct { - finalizedCache *ristretto.Cache - tempCache *ristretto.Cache - ExpirationFinalized time.Duration - ExpirationNonFinalized time.Duration - ExpirationNodeErrors time.Duration + finalizedCache *ristretto.Cache + tempCache *ristretto.Cache // cache for temporary inputs, such as latest blocks + blocksHashesToHeightsCache *ristretto.Cache + ExpirationFinalized time.Duration + ExpirationNonFinalized time.Duration + ExpirationNodeErrors time.Duration + ExpirationBlocksHashesToHeights time.Duration CacheMetrics *CacheMetrics CacheMaxCost int64 } -func (cs *CacheServer) InitCache(ctx context.Context, expiration time.Duration, expirationNonFinalized time.Duration, metricsAddr string, expirationFinalizedMultiplier float64, expirationNonFinalizedMultiplier float64) { +func (cs *CacheServer) InitCache( + ctx context.Context, + expiration time.Duration, + expirationNonFinalized time.Duration, + expirationNodeErrorsOnFinalized time.Duration, + expirationBlocksHashesToHeights time.Duration, + metricsAddr string, + expirationFinalizedMultiplier float64, + expirationNonFinalizedMultiplier float64, +) { cs.ExpirationFinalized = time.Duration(float64(expiration) * expirationFinalizedMultiplier) cs.ExpirationNonFinalized = time.Duration(float64(expirationNonFinalized) * expirationNonFinalizedMultiplier) + cs.ExpirationNodeErrors = expirationNodeErrorsOnFinalized + cs.ExpirationBlocksHashesToHeights = time.Duration(float64(expirationBlocksHashesToHeights)) - cache, err := ristretto.NewCache(&ristretto.Config{NumCounters: CacheNumCounters, MaxCost: cs.CacheMaxCost, BufferItems: 64}) + var err error + cs.tempCache, err = ristretto.NewCache(&ristretto.Config{NumCounters: CacheNumCounters, MaxCost: cs.CacheMaxCost, BufferItems: 64}) if err != nil { utils.LavaFormatFatal("could not create cache", err) } - cs.tempCache = cache - cache, err = ristretto.NewCache(&ristretto.Config{NumCounters: CacheNumCounters, MaxCost: cs.CacheMaxCost, BufferItems: 64}) + cs.finalizedCache, err = ristretto.NewCache(&ristretto.Config{NumCounters: CacheNumCounters, MaxCost: cs.CacheMaxCost, BufferItems: 64}) if err != nil { utils.LavaFormatFatal("could not create finalized cache", err) } - cs.finalizedCache = cache + + cs.blocksHashesToHeightsCache, err = ristretto.NewCache(&ristretto.Config{NumCounters: CacheNumCounters, MaxCost: cs.CacheMaxCost, BufferItems: 64}) + if err != nil { + utils.LavaFormatFatal("could not create blocks hashes to heights cache", err) + } // initialize prometheus cs.CacheMetrics = NewCacheMetricsServer(metricsAddr) @@ -183,6 +203,16 @@ func Server( utils.LavaFormatFatal("failed to read flag", err, utils.Attribute{Key: "flag", Value: ExpirationNonFinalizedFlagName}) } + expirationNodeErrorsOnFinalizedFlagName, err := flags.GetDuration(ExpirationNodeErrorsOnFinalizedFlagName) + if err != nil { + utils.LavaFormatFatal("failed to read flag", err, utils.Attribute{Key: "flag", Value: ExpirationNodeErrorsOnFinalizedFlagName}) + } + + expirationBlocksHashesToHeights, err := flags.GetDuration(ExpirationBlocksHashesToHeightsFlagName) + if err != nil { + utils.LavaFormatFatal("failed to read flag", err, utils.Attribute{Key: "flag", Value: ExpirationBlocksHashesToHeightsFlagName}) + } + expirationFinalizedMultiplier, err := flags.GetFloat64(ExpirationTimeFinalizedMultiplierFlagName) if err != nil { utils.LavaFormatFatal("failed to read flag", err, utils.Attribute{Key: "flag", Value: ExpirationTimeFinalizedMultiplierFlagName}) @@ -199,7 +229,7 @@ func Server( } cs := CacheServer{CacheMaxCost: cacheMaxCost} - cs.InitCache(ctx, expiration, expirationNonFinalized, metricsAddr, expirationFinalizedMultiplier, expirationNonFinalizedMultiplier) + cs.InitCache(ctx, expiration, expirationNonFinalized, expirationNodeErrorsOnFinalizedFlagName, expirationBlocksHashesToHeights, metricsAddr, expirationFinalizedMultiplier, expirationNonFinalizedMultiplier) // TODO: have a state tracker cs.Serve(ctx, listenAddr) } diff --git a/protocol/chainlib/chain_message.go b/protocol/chainlib/chain_message.go index 9df468f55c..0b438206d3 100644 --- a/protocol/chainlib/chain_message.go +++ b/protocol/chainlib/chain_message.go @@ -2,12 +2,15 @@ package chainlib import ( "math" + "sort" "time" "github.com/lavanet/lava/v2/protocol/chainlib/chainproxy/rpcInterfaceMessages" "github.com/lavanet/lava/v2/protocol/chainlib/chainproxy/rpcclient" "github.com/lavanet/lava/v2/protocol/chainlib/extensionslib" + "github.com/lavanet/lava/v2/protocol/lavasession" "github.com/lavanet/lava/v2/utils" + "github.com/lavanet/lava/v2/utils/lavaslices" pairingtypes "github.com/lavanet/lava/v2/x/pairing/types" spectypes "github.com/lavanet/lava/v2/x/spec/types" ) @@ -38,102 +41,134 @@ type baseChainMessageContainer struct { resultErrorParsingMethod func(data []byte, httpStatusCode int) (hasError bool, errorMessage string) } -func (bcnc *baseChainMessageContainer) SubscriptionIdExtractor(reply *rpcclient.JsonrpcMessage) string { - return bcnc.msg.SubscriptionIdExtractor(reply) +// Used to create the key for used providers so all extensions are +// always in the same order. e.g. "archive;ws" +func (bcmc *baseChainMessageContainer) sortExtensions() { + if len(bcmc.extensions) <= 1 { // nothing to sort + return + } + + sort.SliceStable(bcmc.extensions, func(i, j int) bool { + return bcmc.extensions[i].Name < bcmc.extensions[j].Name + }) +} + +func (bcmc *baseChainMessageContainer) GetRequestedBlocksHashes() []string { + return bcmc.requestedBlockHashes +} + +func (bcmc *baseChainMessageContainer) SubscriptionIdExtractor(reply *rpcclient.JsonrpcMessage) string { + return bcmc.msg.SubscriptionIdExtractor(reply) } // returning parse directive for the api. can be nil. -func (bcnc *baseChainMessageContainer) GetParseDirective() *spectypes.ParseDirective { - return bcnc.parseDirective +func (bcmc *baseChainMessageContainer) GetParseDirective() *spectypes.ParseDirective { + return bcmc.parseDirective } -func (pm *baseChainMessageContainer) GetRawRequestHash() ([]byte, error) { - if len(pm.inputHashCache) > 0 { +func (bcmc *baseChainMessageContainer) GetRawRequestHash() ([]byte, error) { + if len(bcmc.inputHashCache) > 0 { // Get the cached value - return pm.inputHashCache, nil + return bcmc.inputHashCache, nil } - hash, err := pm.msg.GetRawRequestHash() + hash, err := bcmc.msg.GetRawRequestHash() if err == nil { // Now we have the hash cached so we call it only once. - pm.inputHashCache = hash + bcmc.inputHashCache = hash } return hash, err } // not necessary for base chain message. -func (bcnc *baseChainMessageContainer) CheckResponseError(data []byte, httpStatusCode int) (hasError bool, errorMessage string) { - if bcnc.resultErrorParsingMethod == nil { +func (bcmc *baseChainMessageContainer) CheckResponseError(data []byte, httpStatusCode int) (hasError bool, errorMessage string) { + if bcmc.resultErrorParsingMethod == nil { utils.LavaFormatError("tried calling resultErrorParsingMethod when it is not set", nil) return false, "" } - return bcnc.resultErrorParsingMethod(data, httpStatusCode) + return bcmc.resultErrorParsingMethod(data, httpStatusCode) } -func (bcnc *baseChainMessageContainer) TimeoutOverride(override ...time.Duration) time.Duration { +func (bcmc *baseChainMessageContainer) TimeoutOverride(override ...time.Duration) time.Duration { if len(override) > 0 { - bcnc.timeoutOverride = override[0] + bcmc.timeoutOverride = override[0] } - return bcnc.timeoutOverride + return bcmc.timeoutOverride +} + +func (bcmc *baseChainMessageContainer) SetForceCacheRefresh(force bool) bool { + bcmc.forceCacheRefresh = force + return bcmc.forceCacheRefresh } -func (bcnc *baseChainMessageContainer) SetForceCacheRefresh(force bool) bool { - bcnc.forceCacheRefresh = force - return bcnc.forceCacheRefresh +func (bcmc *baseChainMessageContainer) GetForceCacheRefresh() bool { + return bcmc.forceCacheRefresh } -func (bcnc *baseChainMessageContainer) GetForceCacheRefresh() bool { - return bcnc.forceCacheRefresh +func (bcmc *baseChainMessageContainer) DisableErrorHandling() { + bcmc.msg.DisableErrorHandling() } -func (bcnc *baseChainMessageContainer) DisableErrorHandling() { - bcnc.msg.DisableErrorHandling() +func (bcmc baseChainMessageContainer) AppendHeader(metadata []pairingtypes.Metadata) { + bcmc.msg.AppendHeader(metadata) } -func (bcnc baseChainMessageContainer) AppendHeader(metadata []pairingtypes.Metadata) { - bcnc.msg.AppendHeader(metadata) +func (bcmc baseChainMessageContainer) GetApi() *spectypes.Api { + return bcmc.api } -func (bcnc baseChainMessageContainer) GetApi() *spectypes.Api { - return bcnc.api +func (bcmc baseChainMessageContainer) GetApiCollection() *spectypes.ApiCollection { + return bcmc.apiCollection } -func (bcnc baseChainMessageContainer) GetApiCollection() *spectypes.ApiCollection { - return bcnc.apiCollection +func (bcmc *baseChainMessageContainer) UpdateEarliestInMessage(incomingEarliest int64) bool { + swapped := false + if bcmc.earliestRequestedBlock != spectypes.EARLIEST_BLOCK { + // check earliest is not unset (0) or incoming is lower than current value + if bcmc.earliestRequestedBlock == 0 || bcmc.earliestRequestedBlock > incomingEarliest { + bcmc.earliestRequestedBlock = incomingEarliest + swapped = true + } + } + return swapped } -func (bcnc baseChainMessageContainer) RequestedBlock() (latest int64, earliest int64) { - if bcnc.earliestRequestedBlock == 0 { +func (bcmc *baseChainMessageContainer) RequestedBlock() (latest int64, earliest int64) { + if bcmc.earliestRequestedBlock == 0 { // earliest is optional and not set here - return bcnc.latestRequestedBlock, bcnc.latestRequestedBlock + return bcmc.latestRequestedBlock, bcmc.latestRequestedBlock } - return bcnc.latestRequestedBlock, bcnc.earliestRequestedBlock + return bcmc.latestRequestedBlock, bcmc.earliestRequestedBlock } -func (bcnc baseChainMessageContainer) GetRPCMessage() rpcInterfaceMessages.GenericMessage { - return bcnc.msg +func (bcmc baseChainMessageContainer) GetRPCMessage() rpcInterfaceMessages.GenericMessage { + return bcmc.msg } -func (bcnc *baseChainMessageContainer) UpdateLatestBlockInMessage(latestBlock int64, modifyContent bool) (modifiedOnLatestReq bool) { - requestedBlock, _ := bcnc.RequestedBlock() +func (bcmc *baseChainMessageContainer) UpdateLatestBlockInMessage(latestBlock int64, modifyContent bool) (modifiedOnLatestReq bool) { + requestedBlock, _ := bcmc.RequestedBlock() if latestBlock <= spectypes.NOT_APPLICABLE || requestedBlock != spectypes.LATEST_BLOCK { return false } - success := bcnc.msg.UpdateLatestBlockInMessage(uint64(latestBlock), modifyContent) + success := bcmc.msg.UpdateLatestBlockInMessage(uint64(latestBlock), modifyContent) if success { - bcnc.latestRequestedBlock = latestBlock + bcmc.latestRequestedBlock = latestBlock return true } return false } -func (bcnc *baseChainMessageContainer) GetExtensions() []*spectypes.Extension { - return bcnc.extensions +func (bcmc *baseChainMessageContainer) GetExtensions() []*spectypes.Extension { + return bcmc.extensions +} + +func (bcmc *baseChainMessageContainer) GetConcatenatedExtensions() string { + return string(lavasession.NewRouterKeyFromExtensions(bcmc.extensions)) } // adds the following extensions -func (bcnc *baseChainMessageContainer) OverrideExtensions(extensionNames []string, extensionParser *extensionslib.ExtensionParser) { +func (bcmc *baseChainMessageContainer) OverrideExtensions(extensionNames []string, extensionParser *extensionslib.ExtensionParser) { existingExtensions := map[string]struct{}{} - for _, extension := range bcnc.extensions { + for _, extension := range bcmc.extensions { existingExtensions[extension.Name] = struct{}{} } for _, extensionName := range extensionNames { @@ -141,38 +176,57 @@ func (bcnc *baseChainMessageContainer) OverrideExtensions(extensionNames []strin existingExtensions[extensionName] = struct{}{} extensionKey := extensionslib.ExtensionKey{ Extension: extensionName, - ConnectionType: bcnc.apiCollection.CollectionData.Type, - InternalPath: bcnc.apiCollection.CollectionData.InternalPath, - Addon: bcnc.apiCollection.CollectionData.AddOn, + ConnectionType: bcmc.apiCollection.CollectionData.Type, + InternalPath: bcmc.apiCollection.CollectionData.InternalPath, + Addon: bcmc.apiCollection.CollectionData.AddOn, } extension := extensionParser.GetExtension(extensionKey) if extension != nil { - bcnc.extensions = append(bcnc.extensions, extension) - bcnc.updateCUForApi(extension) + bcmc.extensions = append(bcmc.extensions, extension) + bcmc.addExtensionCu(extension) + bcmc.sortExtensions() } } } } -func (bcnc *baseChainMessageContainer) SetExtension(extension *spectypes.Extension) { - if len(bcnc.extensions) > 0 { - for _, ext := range bcnc.extensions { +func (bcmc *baseChainMessageContainer) SetExtension(extension *spectypes.Extension) { + if len(bcmc.extensions) > 0 { + for _, ext := range bcmc.extensions { if ext.Name == extension.Name { // already existing, no need to add return } } - bcnc.extensions = append(bcnc.extensions, extension) + bcmc.extensions = append(bcmc.extensions, extension) + bcmc.sortExtensions() } else { - bcnc.extensions = []*spectypes.Extension{extension} + bcmc.extensions = []*spectypes.Extension{extension} } - bcnc.updateCUForApi(extension) + bcmc.addExtensionCu(extension) } -func (bcnc *baseChainMessageContainer) updateCUForApi(extension *spectypes.Extension) { - copyApi := *bcnc.api // we can't modify this because it points to an object inside the chainParser +func (bcmc *baseChainMessageContainer) RemoveExtension(extensionName string) { + for _, ext := range bcmc.extensions { + if ext.Name == extensionName { + bcmc.extensions, _ = lavaslices.Remove(bcmc.extensions, ext) + bcmc.removeExtensionCu(ext) + break + } + } + bcmc.sortExtensions() +} + +func (bcmc *baseChainMessageContainer) addExtensionCu(extension *spectypes.Extension) { + copyApi := *bcmc.api // we can't modify this because it points to an object inside the chainParser copyApi.ComputeUnits = uint64(math.Floor(float64(extension.GetCuMultiplier()) * float64(copyApi.ComputeUnits))) - bcnc.api = ©Api + bcmc.api = ©Api +} + +func (bcmc *baseChainMessageContainer) removeExtensionCu(extension *spectypes.Extension) { + copyApi := *bcmc.api // we can't modify this because it points to an object inside the chainParser + copyApi.ComputeUnits = uint64(math.Floor(float64(copyApi.ComputeUnits) / float64(extension.GetCuMultiplier()))) + bcmc.api = ©Api } type CraftData struct { diff --git a/protocol/chainlib/chainlib.go b/protocol/chainlib/chainlib.go index 83aa8e1e30..b22ee97a2a 100644 --- a/protocol/chainlib/chainlib.go +++ b/protocol/chainlib/chainlib.go @@ -80,7 +80,10 @@ type ChainMessage interface { RequestedBlock() (latest int64, earliest int64) UpdateLatestBlockInMessage(latestBlock int64, modifyContent bool) (modified bool) AppendHeader(metadata []pairingtypes.Metadata) + SetExtension(extension *spectypes.Extension) GetExtensions() []*spectypes.Extension + GetConcatenatedExtensions() string + RemoveExtension(extensionName string) OverrideExtensions(extensionNames []string, extensionParser *extensionslib.ExtensionParser) DisableErrorHandling() TimeoutOverride(...time.Duration) time.Duration @@ -88,6 +91,8 @@ type ChainMessage interface { SetForceCacheRefresh(force bool) bool CheckResponseError(data []byte, httpStatusCode int) (hasError bool, errorMessage string) GetRawRequestHash() ([]byte, error) + GetRequestedBlocksHashes() []string + UpdateEarliestInMessage(incomingEarliest int64) bool ChainMessageForSend } diff --git a/protocol/chainlib/consumer_ws_subscription_manager_test.go b/protocol/chainlib/consumer_ws_subscription_manager_test.go index 02de8604e5..eca308f27b 100644 --- a/protocol/chainlib/consumer_ws_subscription_manager_test.go +++ b/protocol/chainlib/consumer_ws_subscription_manager_test.go @@ -151,7 +151,7 @@ func TestConsumerWSSubscriptionManagerParallelSubscriptionsOnSameDappIdIp(t *tes firstReply, repliesChan, err = manager.StartSubscription(ctx, chainMessage1, nil, nil, dapp, ip, uniqueIdentifiers[index], nil) go func() { for subMsg := range repliesChan { - utils.LavaFormatInfo("got reply for index", utils.LogAttr("index", index)) + // utils.LavaFormatDebug("got reply for index", utils.LogAttr("index", index)) require.Equal(t, string(play.subscriptionFirstReply1), string(subMsg.Data)) } }() diff --git a/protocol/chainlib/extensionslib/extension_parser.go b/protocol/chainlib/extensionslib/extension_parser.go index f9ddcbdea0..532f1ed011 100644 --- a/protocol/chainlib/extensionslib/extension_parser.go +++ b/protocol/chainlib/extensionslib/extension_parser.go @@ -1,9 +1,14 @@ package extensionslib import ( + "github.com/lavanet/lava/v2/utils/maps" spectypes "github.com/lavanet/lava/v2/x/spec/types" ) +const ( + ExtensionTypeArchive = "archive" +) + type ExtensionInfo struct { ExtensionOverride []string LatestBlock uint64 @@ -31,6 +36,23 @@ type ExtensionParser struct { configuredExtensions map[ExtensionKey]*spectypes.Extension } +func (ep *ExtensionParser) GetExtensionByName(extensionName string) *spectypes.Extension { + if extensionName == "" { + return nil + } + + findExtensionsPredicate := func(key ExtensionKey, _ *spectypes.Extension) bool { + return key.Extension == extensionName + } + + _, archiveExt, found := maps.FindInMap(ep.configuredExtensions, findExtensionsPredicate) + if found { + return archiveExt + } + + return nil +} + func (ep *ExtensionParser) GetExtension(extension ExtensionKey) *spectypes.Extension { if extension.Extension == "" { return nil @@ -58,9 +80,9 @@ func (ep *ExtensionParser) SetConfiguredExtensions(configuredExtensions map[Exte ep.configuredExtensions = configuredExtensions } -func (ep *ExtensionParser) ExtensionParsing(addon string, extensionsChainMessage ExtensionsChainMessage, latestBlock uint64) { +func (ep *ExtensionParser) ExtensionParsing(addon string, extensionsChainMessage ExtensionsChainMessage, latestBlock uint64) bool { if len(ep.configuredExtensions) == 0 { - return + return false } for extensionKey, extension := range ep.configuredExtensions { @@ -71,13 +93,15 @@ func (ep *ExtensionParser) ExtensionParsing(addon string, extensionsChainMessage extensionParserRule := NewExtensionParserRule(extension) if extensionParserRule.isPassingRule(extensionsChainMessage, latestBlock) { extensionsChainMessage.SetExtension(extension) + return true } } + return false } func NewExtensionParserRule(extension *spectypes.Extension) ExtensionParserRule { switch extension.Name { - case "archive": + case ExtensionTypeArchive: return ArchiveParserRule{extension: extension} default: // unsupported rule diff --git a/protocol/common/cobra_common.go b/protocol/common/cobra_common.go index 19a64487d2..d7daa95e59 100644 --- a/protocol/common/cobra_common.go +++ b/protocol/common/cobra_common.go @@ -32,6 +32,7 @@ const ( // Disable relay retries when we get node errors. // This feature is suppose to help with successful relays in some chains that return node errors on rare race conditions on the serviced chains. DisableRetryOnNodeErrorsFlag = "disable-retry-on-node-error" + UseOfflineSpecFlag = "use-offline-spec" // allows the user to manually load a spec providing a path, this is useful to test spec changes before they hit the blockchain ) const ( @@ -55,6 +56,7 @@ type ConsumerCmdFlags struct { DebugRelays bool // enables debug mode for relays DisableConflictTransactions bool // disable conflict transactions DisableRetryOnNodeErrors bool // disable retries on node errors + OfflineSpecPath string // path to the spec file, works only when bootstrapping a single chain. } // default rolling logs behavior (if enabled) will store 3 files each 100MB for up to 1 day every time. diff --git a/protocol/common/endpoints.go b/protocol/common/endpoints.go index 6025f6bcb0..f9a3afbccf 100644 --- a/protocol/common/endpoints.go +++ b/protocol/common/endpoints.go @@ -33,6 +33,7 @@ const ( EXTENSION_OVERRIDE_HEADER_NAME = "lava-extension" FORCE_CACHE_REFRESH_HEADER_NAME = "lava-force-cache-refresh" LAVA_DEBUG_RELAY = "lava-debug-relay" + LAVA_EXTENSION_FORCED = "lava-extension-forced" LAVA_LB_UNIQUE_ID_HEADER = "lava-lb-unique-id" // send http request to /lava/health to see if the process is up - (ret code 200) DEFAULT_HEALTH_PATH = "/lava/health" diff --git a/protocol/integration/mocks.go b/protocol/integration/mocks.go index f1e31746a5..28fa1c2617 100644 --- a/protocol/integration/mocks.go +++ b/protocol/integration/mocks.go @@ -14,6 +14,7 @@ import ( "github.com/lavanet/lava/v2/protocol/lavasession" "github.com/lavanet/lava/v2/protocol/rpcprovider" "github.com/lavanet/lava/v2/protocol/rpcprovider/reliabilitymanager" + "github.com/lavanet/lava/v2/protocol/statetracker" "github.com/lavanet/lava/v2/protocol/statetracker/updaters" "github.com/lavanet/lava/v2/utils" conflicttypes "github.com/lavanet/lava/v2/x/conflict/types" @@ -34,7 +35,7 @@ func (m *mockConsumerStateTracker) RegisterForVersionUpdates(ctx context.Context func (m *mockConsumerStateTracker) RegisterConsumerSessionManagerForPairingUpdates(ctx context.Context, consumerSessionManager *lavasession.ConsumerSessionManager) { } -func (m *mockConsumerStateTracker) RegisterForSpecUpdates(ctx context.Context, specUpdatable updaters.SpecUpdatable, endpoint lavasession.RPCEndpoint) error { +func (m *mockConsumerStateTracker) RegisterForSpecUpdates(ctx context.Context, specUpdatable updaters.SpecUpdatable, endpoint lavasession.RPCEndpoint, offlineSpecOptions *statetracker.OfflineSpecOptions) error { return nil } diff --git a/protocol/lavasession/router_key.go b/protocol/lavasession/router_key.go index 441bdc6660..4edaed23e0 100644 --- a/protocol/lavasession/router_key.go +++ b/protocol/lavasession/router_key.go @@ -3,6 +3,8 @@ package lavasession import ( "sort" "strings" + + spectypes "github.com/lavanet/lava/v2/x/spec/types" ) const ( @@ -11,18 +13,31 @@ const ( type RouterKey string +func newRouterKeyInner(uniqueExtensions map[string]struct{}) RouterKey { + uniqueExtensionsSlice := []string{} + for addon := range uniqueExtensions { // we are sorting this anyway so we don't have to keep order + uniqueExtensionsSlice = append(uniqueExtensionsSlice, addon) + } + sort.Strings(uniqueExtensionsSlice) + return RouterKey(sep + strings.Join(uniqueExtensionsSlice, sep) + sep) +} + func NewRouterKey(extensions []string) RouterKey { // make sure addons have no repetitions uniqueExtensions := map[string]struct{}{} for _, extension := range extensions { uniqueExtensions[extension] = struct{}{} } - uniqueExtensionsSlice := []string{} - for addon := range uniqueExtensions { // we are sorting this anyway so we don't have to keep order - uniqueExtensionsSlice = append(uniqueExtensionsSlice, addon) + return newRouterKeyInner(uniqueExtensions) +} + +func NewRouterKeyFromExtensions(extensions []*spectypes.Extension) RouterKey { + // make sure addons have no repetitions + uniqueExtensions := map[string]struct{}{} + for _, extension := range extensions { + uniqueExtensions[extension.Name] = struct{}{} } - sort.Strings(uniqueExtensionsSlice) - return RouterKey(sep + strings.Join(uniqueExtensionsSlice, sep) + sep) + return newRouterKeyInner(uniqueExtensions) } func GetEmptyRouterKey() RouterKey { diff --git a/protocol/lavasession/used_providers.go b/protocol/lavasession/used_providers.go index 854e4823ac..3476549ed3 100644 --- a/protocol/lavasession/used_providers.go +++ b/protocol/lavasession/used_providers.go @@ -10,7 +10,14 @@ import ( "github.com/lavanet/lava/v2/utils" ) -const MaximumNumberOfSelectionLockAttempts = 500 +const ( + MaximumNumberOfSelectionLockAttempts = 500 + DefaultExtensionsKey = "" +) + +func NewDefaultUsedProvidersMap(directiveHeaders map[string]string) map[string]*UsedProviders { + return map[string]*UsedProviders{DefaultExtensionsKey: NewUsedProviders(directiveHeaders)} +} func NewUsedProviders(directiveHeaders map[string]string) *UsedProviders { unwantedProviders := map[string]struct{}{} diff --git a/protocol/parser/parser_test.go b/protocol/parser/parser_test.go index c08db60dd8..916c6a5a1c 100644 --- a/protocol/parser/parser_test.go +++ b/protocol/parser/parser_test.go @@ -563,7 +563,6 @@ func TestParseBlockFromParams(t *testing.T) { }, expected: spectypes.LATEST_BLOCK, }, - { name: "generic_parser_no_generic_parser", rpcInput: &RPCInputTest{ diff --git a/protocol/rpcconsumer/consumer_state_tracker_mock.go b/protocol/rpcconsumer/consumer_state_tracker_mock.go index 7fa930f992..75fb0c879c 100644 --- a/protocol/rpcconsumer/consumer_state_tracker_mock.go +++ b/protocol/rpcconsumer/consumer_state_tracker_mock.go @@ -16,6 +16,7 @@ import ( common "github.com/lavanet/lava/v2/protocol/common" finalizationconsensus "github.com/lavanet/lava/v2/protocol/lavaprotocol/finalizationconsensus" lavasession "github.com/lavanet/lava/v2/protocol/lavasession" + statetracker "github.com/lavanet/lava/v2/protocol/statetracker" updaters "github.com/lavanet/lava/v2/protocol/statetracker/updaters" types "github.com/lavanet/lava/v2/x/conflict/types" types0 "github.com/lavanet/lava/v2/x/plans/types" @@ -129,7 +130,7 @@ func (mr *MockConsumerStateTrackerInfMockRecorder) RegisterForDowntimeParamsUpda } // RegisterForSpecUpdates mocks base method. -func (m *MockConsumerStateTrackerInf) RegisterForSpecUpdates(ctx context.Context, specUpdatable updaters.SpecUpdatable, endpoint lavasession.RPCEndpoint) error { +func (m *MockConsumerStateTrackerInf) RegisterForSpecUpdates(ctx context.Context, specUpdatable updaters.SpecUpdatable, endpoint lavasession.RPCEndpoint, offlineSpecOptions *statetracker.OfflineSpecOptions) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "RegisterForSpecUpdates", ctx, specUpdatable, endpoint) ret0, _ := ret[0].(error) diff --git a/protocol/rpcconsumer/relay_extension_manager.go b/protocol/rpcconsumer/relay_extension_manager.go new file mode 100644 index 0000000000..d3e2eeec6d --- /dev/null +++ b/protocol/rpcconsumer/relay_extension_manager.go @@ -0,0 +1,80 @@ +package rpcconsumer + +import ( + "github.com/lavanet/lava/v2/protocol/chainlib" + common "github.com/lavanet/lava/v2/protocol/common" + "github.com/lavanet/lava/v2/utils" + "github.com/lavanet/lava/v2/utils/lavaslices" + pairingtypes "github.com/lavanet/lava/v2/x/pairing/types" + spectypes "github.com/lavanet/lava/v2/x/spec/types" +) + +type RelayExtensionManager struct { + chainMessage chainlib.ChainMessage + relayRequestData *pairingtypes.RelayPrivateData + managedExtension *spectypes.Extension + extensionWasActiveOriginally bool +} + +func NewRelayExtensionManager(chainMessage chainlib.ChainMessage, relayRequestData *pairingtypes.RelayPrivateData, managedExtension *spectypes.Extension) *RelayExtensionManager { + relayExtensionManager := &RelayExtensionManager{ + chainMessage: chainMessage, + relayRequestData: relayRequestData, + managedExtension: managedExtension, + } + relayExtensionManager.SetExtensionWasActiveOriginally() + return relayExtensionManager +} + +func (rem *RelayExtensionManager) SetExtensionWasActiveOriginally() { + if lavaslices.ContainsPredicate(rem.chainMessage.GetExtensions(), rem.matchManagedExtension) { + rem.extensionWasActiveOriginally = true + } +} + +func (rem *RelayExtensionManager) matchManagedExtension(extension *spectypes.Extension) bool { + return extension.Name == rem.managedExtension.Name +} + +func (rem *RelayExtensionManager) GetManagedExtensionName() string { + return rem.managedExtension.Name +} + +func (rem *RelayExtensionManager) SetManagedExtension() { + if rem.extensionWasActiveOriginally { + return // no need to set the extension as it was originally enabled + } + + // set extensions on both relayRequestData and chainMessage. + rem.chainMessage.SetExtension(rem.managedExtension) + if lavaslices.Contains(rem.relayRequestData.Extensions, rem.managedExtension.Name) { + utils.LavaFormatError("relayRequestData already contains extension", nil, + utils.LogAttr("rem.relayRequestData.Extensions", rem.relayRequestData.Extensions), + utils.LogAttr("rem.managedExtension.Name", rem.managedExtension.Name), + ) + } + // reset extension names to currently supported extensions + rem.SetRelayDataExtensionsToChainMessageValues() +} + +func (rem *RelayExtensionManager) SetRelayDataExtensionsToChainMessageValues() { + rem.relayRequestData.Extensions = common.GetExtensionNames(rem.chainMessage.GetExtensions()) +} + +func (rem *RelayExtensionManager) IsExtensionActiveByDefault() bool { + return rem.extensionWasActiveOriginally +} + +func (rem *RelayExtensionManager) RemoveManagedExtension() { + if !rem.extensionWasActiveOriginally { + rem.chainMessage.RemoveExtension(rem.managedExtension.Name) + if !lavaslices.Contains(rem.relayRequestData.Extensions, rem.managedExtension.Name) { + utils.LavaFormatError("Asked to remove missing extension from relay request data", nil, + utils.LogAttr("rem.relayRequestData.Extensions", rem.relayRequestData.Extensions), + utils.LogAttr("rem.managedExtension.Name", rem.managedExtension.Name), + ) + } + // reset extension names to currently supported extensions + rem.SetRelayDataExtensionsToChainMessageValues() + } +} diff --git a/protocol/rpcconsumer/relay_processor.go b/protocol/rpcconsumer/relay_processor.go index f18b276663..86f1e337ec 100644 --- a/protocol/rpcconsumer/relay_processor.go +++ b/protocol/rpcconsumer/relay_processor.go @@ -15,12 +15,14 @@ import ( "github.com/lavanet/lava/v2/protocol/common" "github.com/lavanet/lava/v2/protocol/lavasession" "github.com/lavanet/lava/v2/utils" + pairingtypes "github.com/lavanet/lava/v2/x/pairing/types" spectypes "github.com/lavanet/lava/v2/x/spec/types" ) const ( - MaxCallsPerRelay = 50 - NumberOfRetriesAllowedOnNodeErrors = 2 // we will try maximum additional 2 relays on node errors + MaxCallsPerRelay = 50 + NumberOfRetriesAllowedOnNodeErrors = 2 // we will try maximum additional 2 relays on node errors + NumberOfRetriesAllowedOnNodeErrorsForArchiveExtension = 1 // we will try maximum additional 1 relay on node errors for archive nodes ) type Selection int @@ -41,7 +43,7 @@ type chainIdAndApiInterfaceGetter interface { } type RelayProcessor struct { - usedProviders *lavasession.UsedProviders + usedProviders map[string]*lavasession.UsedProviders responses chan *relayResponse requiredSuccesses int nodeResponseErrors RelayErrors @@ -61,11 +63,14 @@ type RelayProcessor struct { chainIdAndApiInterfaceGetter chainIdAndApiInterfaceGetter disableRelayRetry bool relayRetriesManager *RelayRetriesManager + relayExtensionManager *RelayExtensionManager + userReturnHeaders []pairingtypes.Metadata + directiveHeaders map[string]string } func NewRelayProcessor( ctx context.Context, - usedProviders *lavasession.UsedProviders, + usedProviders map[string]*lavasession.UsedProviders, requiredSuccesses int, chainMessage chainlib.ChainMessage, consumerConsistency *ConsumerConsistency, @@ -76,6 +81,8 @@ func NewRelayProcessor( chainIdAndApiInterfaceGetter chainIdAndApiInterfaceGetter, disableRelayRetry bool, relayRetriesManager *RelayRetriesManager, + relayExtensionManager *RelayExtensionManager, + directiveHeaders map[string]string, ) *RelayProcessor { guid, _ := utils.GetUniqueIdentifier(ctx) selection := Quorum // select the majority of node responses @@ -85,6 +92,7 @@ func NewRelayProcessor( if requiredSuccesses <= 0 { utils.LavaFormatFatal("invalid requirement, successes count must be greater than 0", nil, utils.LogAttr("requiredSuccesses", requiredSuccesses)) } + return &RelayProcessor{ usedProviders: usedProviders, requiredSuccesses: requiredSuccesses, @@ -102,6 +110,9 @@ func NewRelayProcessor( chainIdAndApiInterfaceGetter: chainIdAndApiInterfaceGetter, disableRelayRetry: disableRelayRetry, relayRetriesManager: relayRetriesManager, + relayExtensionManager: relayExtensionManager, + userReturnHeaders: []pairingtypes.Metadata{}, + directiveHeaders: directiveHeaders, } } @@ -145,20 +156,89 @@ func (rp *RelayProcessor) String() string { usedProviders := rp.usedProviders rp.lock.RUnlock() - currentlyUsedAddresses := usedProviders.CurrentlyUsedAddresses() - unwantedAddresses := usedProviders.UnwantedAddresses() - return fmt.Sprintf("relayProcessor {results:%d, nodeErrors:%d, protocolErrors:%d,unwantedAddresses: %s,currentlyUsedAddresses:%s}", - results, nodeErrors, protocolErrors, strings.Join(unwantedAddresses, ";"), strings.Join(currentlyUsedAddresses, ";")) + extensionNameToUsedProvidersLogs := []string{} + for extensionName, extensionUsedProviders := range usedProviders { + currentlyUsedAddresses := extensionUsedProviders.CurrentlyUsedAddresses() + unwantedAddresses := extensionUsedProviders.UnwantedAddresses() + + extensionNameToUsedProvidersLog := fmt.Sprintf("%s: {currentlyUsedAddresses: %s, unwantedAddresses: %s}", extensionName, strings.Join(unwantedAddresses, ";"), strings.Join(currentlyUsedAddresses, ";")) + extensionNameToUsedProvidersLogs = append(extensionNameToUsedProvidersLogs, extensionNameToUsedProvidersLog) + } + return fmt.Sprintf("relayProcessor {results:%d, nodeErrors:%d, protocolErrors:%d, usedProviders:[%s]}", results, nodeErrors, protocolErrors, strings.Join(extensionNameToUsedProvidersLogs, ", ")) } -func (rp *RelayProcessor) GetUsedProviders() *lavasession.UsedProviders { +func (rp *RelayProcessor) getUsedProvidersInner(usedProvidersKey string) (*lavasession.UsedProviders, error) { + usedProviders, ok := rp.usedProviders[usedProvidersKey] + if !ok { + return nil, utils.LavaFormatError("usedProvidersKey was not found in used providers map, returning default", nil, utils.LogAttr("usedProvidersKey", usedProvidersKey)) + } + return usedProviders, nil +} + +func (rp *RelayProcessor) GetCurrentUsedProviders() (*lavasession.UsedProviders, error) { if rp == nil { - utils.LavaFormatError("RelayProcessor.GetUsedProviders is nil, misuse detected", nil) - return nil + return nil, utils.LavaFormatError("RelayProcessor.GetCurrentUsedProviders is nil, misuse detected", nil) + } + rp.lock.RLock() + defer rp.lock.RUnlock() + return rp.getUsedProvidersInner(rp.chainMessage.GetConcatenatedExtensions()) +} + +func (rp *RelayProcessor) GetUsedProviders(extension string) (*lavasession.UsedProviders, error) { + if rp == nil { + return nil, utils.LavaFormatError("RelayProcessor.GetUsedProviders is nil, misuse detected", nil) + } + rp.lock.RLock() + defer rp.lock.RUnlock() + return rp.getUsedProvidersInner(extension) +} + +func (rp *RelayProcessor) GetAllUsedProviderAddresses() []string { + if rp == nil { + utils.LavaFormatError("RelayProcessor.GetAllErroredProviders is nil, misuse detected", nil) + return []string{} } rp.lock.RLock() defer rp.lock.RUnlock() - return rp.usedProviders + allUsedProviderAddresses := make([]string, 0) + + for _, usedProviders := range rp.usedProviders { + allUsedProviderAddresses = append(allUsedProviderAddresses, usedProviders.UnwantedAddresses()...) + } + return allUsedProviderAddresses +} + +func (rp *RelayProcessor) GetNumberOfAllCurrentlyUsedProviders() int { + if rp == nil { + utils.LavaFormatError("RelayProcessor.GetAllCurrentlyUsedProviders is nil, misuse detected", nil) + return 0 + } + rp.lock.RLock() + defer rp.lock.RUnlock() + var numberOfCurrentlyUsedProviders int + for _, usedProviders := range rp.usedProviders { + numberOfCurrentlyUsedProviders += usedProviders.CurrentlyUsed() + } + return numberOfCurrentlyUsedProviders +} + +func (rp *RelayProcessor) GetAllErroredProviders() map[string]struct{} { + if rp == nil { + utils.LavaFormatError("RelayProcessor.GetAllErroredProviders is nil, misuse detected", nil) + return map[string]struct{}{} + } + rp.lock.RLock() + defer rp.lock.RUnlock() + erroredProvidersMap := make(map[string]struct{}) + for extension, usedProviders := range rp.usedProviders { + for key, erroredProviders := range usedProviders.GetErroredProviders() { + if extension != "" { + key = extension + "_" + key + } + erroredProvidersMap[key] = erroredProviders + } + } + return erroredProvidersMap } // this function returns all results that came from a node, meaning success, and node errors @@ -283,8 +363,15 @@ func (rp *RelayProcessor) checkEndProcessing(responsesCount int) bool { return true } } + + usedProviders, ok := rp.usedProviders[rp.chainMessage.GetConcatenatedExtensions()] + if !ok { + utils.LavaFormatError("usedProviders not found", nil, utils.LogAttr("extension", rp.chainMessage.GetConcatenatedExtensions())) + return false + } + // check if we got all of the responses - if responsesCount >= rp.usedProviders.SessionsLatestBatch() { + if responsesCount >= usedProviders.SessionsLatestBatch() { // no active sessions, and we read all the responses, we can return return true } @@ -313,6 +400,67 @@ func (rp *RelayProcessor) getInputMsgInfoHashString() (string, error) { return hashString, err } +func (rp *RelayProcessor) ForceManagedExtensionIfNeeded() { + rp.lock.Lock() + defer rp.lock.Unlock() + rp.forceManagedExtensionIfNeededInner() +} + +// used while locked +func (rp *RelayProcessor) addUsedProvidersIfNecessaryInner() { + extensionsKey := rp.chainMessage.GetConcatenatedExtensions() + if _, ok := rp.usedProviders[extensionsKey]; !ok { + rp.usedProviders[extensionsKey] = lavasession.NewUsedProviders(rp.directiveHeaders) + } +} + +func (rp *RelayProcessor) HandleExtensionChangesIfNecessary() { + // 1. add used providers key if necessary + // 2. reset relayData extensions to current chain message values + // 3. change the extension originally active status so we don't mess with it when we retry + rp.AddUsedProvidersKeyIfNecessary() + rp.relayExtensionManager.SetRelayDataExtensionsToChainMessageValues() + rp.relayExtensionManager.SetExtensionWasActiveOriginally() +} + +func (rp *RelayProcessor) AddUsedProvidersKeyIfNecessary() { + rp.lock.Lock() + defer rp.lock.Unlock() + rp.addUsedProvidersIfNecessaryInner() +} + +func (rp *RelayProcessor) forceManagedExtensionIfNeededInner() { + if rp.relayExtensionManager == nil || rp.relayExtensionManager.IsExtensionActiveByDefault() { + // We don't have an extension updater or we already have an extension enabled by the user, we don't need to force it + return + } + + if len(rp.chainMessage.GetRequestedBlocksHashes()) > 0 { + // Following scenarios: + // First retry will use managed extension. + // Second retry will turn off managed extension and try again until retry attempts are exhausted + + numberOfRetriesHappened := len(rp.nodeResponseErrors.relayErrors) - 1 + if numberOfRetriesHappened < NumberOfRetriesAllowedOnNodeErrorsForArchiveExtension { + // If we have a hash and we are not in archive mode, we can retry with archive node + rp.relayExtensionManager.SetManagedExtension() + managedExtension := rp.relayExtensionManager.GetManagedExtensionName() + // after adding the extension we can use the new extension key for the used providers map. + rp.userReturnHeaders = append(rp.userReturnHeaders, pairingtypes.Metadata{Name: common.LAVA_EXTENSION_FORCED, Value: managedExtension}) + rp.addUsedProvidersIfNecessaryInner() + } else if numberOfRetriesHappened == NumberOfRetriesAllowedOnNodeErrorsForArchiveExtension { + // We already tried extension node, we can reset the flag and try a regular node again. + rp.relayExtensionManager.RemoveManagedExtension() + } + } +} + +func (rp *RelayProcessor) GetUserHeaders() []pairingtypes.Metadata { + rp.lock.RLock() + defer rp.lock.RUnlock() + return rp.userReturnHeaders +} + // Deciding wether we should send a relay retry attempt based on the node error func (rp *RelayProcessor) shouldRetryRelay(resultsCount int, hashErr error, nodeErrors int, hash string) bool { // Retries will be performed based on the following scenarios: @@ -322,13 +470,13 @@ func (rp *RelayProcessor) shouldRetryRelay(resultsCount int, hashErr error, node // 4. Number of retries < NumberOfRetriesAllowedOnNodeErrors. if !rp.disableRelayRetry && resultsCount == 0 && hashErr == nil { if nodeErrors <= NumberOfRetriesAllowedOnNodeErrors { - // TODO: check chain message retry on archive. (this feature will be added in the generic parsers feature) - // Check hash already exist, if it does, we don't want to retry if !rp.relayRetriesManager.CheckHashInCache(hash) { // If we didn't find the hash in the hash map we can retry utils.LavaFormatTrace("retrying on relay error", utils.LogAttr("retry_number", nodeErrors), utils.LogAttr("hash", hash)) go rp.metricsInf.SetNodeErrorAttemptMetric(rp.chainIdAndApiInterfaceGetter.GetChainIdAndApiInterface()) + // check wether to retry using a managed extension + rp.forceManagedExtensionIfNeededInner() return false } utils.LavaFormatTrace("found hash in map wont retry", utils.LogAttr("hash", hash)) @@ -502,7 +650,7 @@ func (rp *RelayProcessor) ProcessingResult() (returnedResult *common.RelayResult } // this must be here before the lock because this function locks - allProvidersAddresses := rp.GetUsedProviders().UnwantedAddresses() + allProvidersAddresses := rp.GetAllUsedProviderAddresses() rp.lock.RLock() defer rp.lock.RUnlock() diff --git a/protocol/rpcconsumer/relay_processor_test.go b/protocol/rpcconsumer/relay_processor_test.go index bb21a8eda3..fc832e3e0d 100644 --- a/protocol/rpcconsumer/relay_processor_test.go +++ b/protocol/rpcconsumer/relay_processor_test.go @@ -11,11 +11,14 @@ import ( "github.com/lavanet/lava/v2/protocol/chainlib/extensionslib" "github.com/lavanet/lava/v2/protocol/common" "github.com/lavanet/lava/v2/protocol/lavasession" + "github.com/lavanet/lava/v2/utils/lavaslices" pairingtypes "github.com/lavanet/lava/v2/x/pairing/types" spectypes "github.com/lavanet/lava/v2/x/spec/types" "github.com/stretchr/testify/require" ) +const paramsWithHash32Bits = `{"jsonrpc":"2.0","id":1,"method":"block","params":["HASH123456789123456789234567879123456789"]}` + type relayProcessorMetricsMock struct{} func (romm *relayProcessorMetricsMock) SetRelayNodeErrorMetric(chainId string, apiInterface string) {} @@ -35,9 +38,13 @@ var ( relayProcessorMetrics = &relayProcessorMetricsMock{} ) -func sendSuccessResp(relayProcessor *RelayProcessor, provider string, delay time.Duration) { +const nodeError = `{"error":{"message": "bad"}, "message":"bad","code":123}` + +func sendSuccessResp(t *testing.T, relayProcessor *RelayProcessor, provider string, delay time.Duration) { time.Sleep(delay) - relayProcessor.GetUsedProviders().RemoveUsed(provider, nil) + usedProviders, err := relayProcessor.GetUsedProviders(relayProcessor.chainMessage.GetConcatenatedExtensions()) + require.NoError(t, err) + usedProviders.RemoveUsed(provider, nil) response := &relayResponse{ relayResult: common.RelayResult{ Request: &pairingtypes.RelayRequest{ @@ -53,9 +60,11 @@ func sendSuccessResp(relayProcessor *RelayProcessor, provider string, delay time relayProcessor.SetResponse(response) } -func sendProtocolError(relayProcessor *RelayProcessor, provider string, delay time.Duration, err error) { +func sendProtocolError(t *testing.T, relayProcessor *RelayProcessor, provider string, delay time.Duration, err error) { time.Sleep(delay) - relayProcessor.GetUsedProviders().RemoveUsed(provider, err) + usedProviders, errGet := relayProcessor.GetUsedProviders(relayProcessor.chainMessage.GetConcatenatedExtensions()) + require.NoError(t, errGet) + usedProviders.RemoveUsed(provider, nil) response := &relayResponse{ relayResult: common.RelayResult{ Request: &pairingtypes.RelayRequest{ @@ -71,16 +80,18 @@ func sendProtocolError(relayProcessor *RelayProcessor, provider string, delay ti relayProcessor.SetResponse(response) } -func sendNodeError(relayProcessor *RelayProcessor, provider string, delay time.Duration) { +func sendNodeError(t *testing.T, relayProcessor *RelayProcessor, provider string, delay time.Duration) { time.Sleep(delay) - relayProcessor.GetUsedProviders().RemoveUsed(provider, nil) + usedProviders, err := relayProcessor.GetUsedProviders(relayProcessor.chainMessage.GetConcatenatedExtensions()) + require.NoError(t, err) + usedProviders.RemoveUsed(provider, nil) response := &relayResponse{ relayResult: common.RelayResult{ Request: &pairingtypes.RelayRequest{ RelaySession: &pairingtypes.RelaySession{}, RelayData: &pairingtypes.RelayPrivateData{}, }, - Reply: &pairingtypes.RelayReply{Data: []byte(`{"message":"bad","code":123}`)}, + Reply: &pairingtypes.RelayReply{Data: []byte(nodeError)}, ProviderInfo: common.ProviderInfo{ProviderAddress: provider}, StatusCode: http.StatusInternalServerError, }, @@ -89,6 +100,93 @@ func sendNodeError(relayProcessor *RelayProcessor, provider string, delay time.D relayProcessor.SetResponse(response) } +func TestRelayProcessorNodeErrorRetryFlowForceArchiveMessageWhenInitialIsArchive(t *testing.T) { + ctx := context.Background() + serverHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Handle the incoming request and provide the desired response + w.WriteHeader(http.StatusOK) + }) + specId := "NEAR" + chainParser, _, _, closeServer, _, err := chainlib.CreateChainLibMocks(ctx, specId, spectypes.APIInterfaceJsonRPC, serverHandler, nil, "../../", nil) + if closeServer != nil { + defer closeServer() + } + + testExtensionName := "banana" + require.NoError(t, err) + chainMsg, err := chainParser.ParseMsg("", []byte(paramsWithHash32Bits), http.MethodPost, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) + require.NoError(t, err) + chainMsg.SetExtension(&spectypes.Extension{Name: extensionslib.ExtensionTypeArchive}) + chainMsg.SetExtension(&spectypes.Extension{Name: testExtensionName}) // add this extension to make sure it is not removed when we remove the archive extension + + relayPrivateData := &pairingtypes.RelayPrivateData{Extensions: []string{testExtensionName, extensionslib.ExtensionTypeArchive}} // add this extension to make sure it is not removed when we remove the archive extension + archiveExtension := &spectypes.Extension{Name: extensionslib.ExtensionTypeArchive} + relayExtensionManager := NewRelayExtensionManager(chainMsg, relayPrivateData, archiveExtension) + newUsedProvidersMap := map[string]*lavasession.UsedProviders{chainMsg.GetConcatenatedExtensions(): lavasession.NewUsedProviders(nil)} + relayProcessor := NewRelayProcessor(ctx, newUsedProvidersMap, 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance, relayExtensionManager, nil) + + usedProviders, err := relayProcessor.GetUsedProviders(chainMsg.GetConcatenatedExtensions()) + require.NoError(t, err) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + canUse := usedProviders.TryLockSelection(ctx) + require.NoError(t, ctx.Err()) + require.Nil(t, canUse) + require.Zero(t, usedProviders.CurrentlyUsed()) + require.Zero(t, usedProviders.SessionsLatestBatch()) + consumerSessionsMap := lavasession.ConsumerSessionsMap{"lava@test": &lavasession.SessionInfo{}, "lava@test2": &lavasession.SessionInfo{}} + usedProviders.AddUsed(consumerSessionsMap, nil) + + // first node error + go sendNodeError(t, relayProcessor, "lava@test", time.Millisecond*5) + + // check first reply + err = relayProcessor.WaitForResults(context.Background()) + require.NoError(t, err) + + resultsOk := relayProcessor.HasResults() + require.True(t, resultsOk) + + requiredNodeResults := relayProcessor.HasRequiredNodeResults() + require.False(t, requiredNodeResults) + + // verify that the archive is now forced + require.True(t, lavaslices.ContainsPredicate(chainMsg.GetExtensions(), func(ext *spectypes.Extension) bool { return ext.Name == extensionslib.ExtensionTypeArchive })) + require.True(t, lavaslices.Contains(relayPrivateData.Extensions, extensionslib.ExtensionTypeArchive)) + + // verify that the original extensions are still there + require.True(t, lavaslices.ContainsPredicate(chainMsg.GetExtensions(), func(ext *spectypes.Extension) bool { return ext.Name == testExtensionName })) + require.True(t, lavaslices.Contains(relayPrivateData.Extensions, testExtensionName)) + + // verify that the header + require.NotContains(t, relayProcessor.GetUserHeaders(), pairingtypes.Metadata{Name: common.LAVA_EXTENSION_FORCED, Value: extensionslib.ExtensionTypeArchive}) + + // second node error + go sendNodeError(t, relayProcessor, "lava@test", time.Millisecond*5) + + // check first retry + err = relayProcessor.WaitForResults(context.Background()) + require.NoError(t, err) + + resultsOk = relayProcessor.HasResults() + require.True(t, resultsOk) + + requiredNodeResults = relayProcessor.HasRequiredNodeResults() + require.False(t, requiredNodeResults) + + // verify that the archive is now not in the extensions list + require.True(t, lavaslices.ContainsPredicate(chainMsg.GetExtensions(), relayExtensionManager.matchManagedExtension), chainMsg.GetExtensions()) + require.True(t, lavaslices.Contains(relayPrivateData.Extensions, extensionslib.ExtensionTypeArchive), relayPrivateData.Extensions) + + // verify that the original extensions are still there + require.True(t, lavaslices.ContainsPredicate(chainMsg.GetExtensions(), func(ext *spectypes.Extension) bool { return ext.Name == testExtensionName }), chainMsg.GetExtensions()) + require.True(t, lavaslices.Contains(relayPrivateData.Extensions, testExtensionName), relayPrivateData.Extensions) + + // verify that the header + require.NotContains(t, relayProcessor.GetUserHeaders(), pairingtypes.Metadata{Name: common.LAVA_EXTENSION_FORCED, Value: extensionslib.ExtensionTypeArchive}) +} + func TestRelayProcessorHappyFlow(t *testing.T) { t.Run("happy", func(t *testing.T) { ctx := context.Background() @@ -104,9 +202,10 @@ func TestRelayProcessorHappyFlow(t *testing.T) { require.NoError(t, err) chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) + relayProcessor := NewRelayProcessor(ctx, lavasession.NewDefaultUsedProvidersMap(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance, nil, nil) - usedProviders := relayProcessor.GetUsedProviders() + usedProviders, err := relayProcessor.GetUsedProviders(lavasession.DefaultExtensionsKey) + require.NoError(t, err) ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) defer cancel() canUse := usedProviders.TryLockSelection(ctx) @@ -118,7 +217,7 @@ func TestRelayProcessorHappyFlow(t *testing.T) { usedProviders.AddUsed(consumerSessionsMap, nil) ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*10) defer cancel() - go sendSuccessResp(relayProcessor, "lava@test", time.Millisecond*5) + go sendSuccessResp(t, relayProcessor, "lava@test", time.Millisecond*5) err = relayProcessor.WaitForResults(ctx) require.NoError(t, err) resultsOk := relayProcessor.HasResults() @@ -132,180 +231,278 @@ func TestRelayProcessorHappyFlow(t *testing.T) { } func TestRelayProcessorNodeErrorRetryFlow(t *testing.T) { - t.Run("retry_flow", func(t *testing.T) { - ctx := context.Background() - serverHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - // Handle the incoming request and provide the desired response - w.WriteHeader(http.StatusOK) - }) - specId := "LAV1" - chainParser, _, _, closeServer, _, err := chainlib.CreateChainLibMocks(ctx, specId, spectypes.APIInterfaceRest, serverHandler, nil, "../../", nil) - if closeServer != nil { - defer closeServer() + ctx := context.Background() + serverHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Handle the incoming request and provide the desired response + w.WriteHeader(http.StatusOK) + }) + specId := "LAV1" + chainParser, _, _, closeServer, _, err := chainlib.CreateChainLibMocks(ctx, specId, spectypes.APIInterfaceRest, serverHandler, nil, "../../", nil) + if closeServer != nil { + defer closeServer() + } + require.NoError(t, err) + chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) + require.NoError(t, err) + relayProcessor := NewRelayProcessor(ctx, lavasession.NewDefaultUsedProvidersMap(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance, nil, nil) + + usedProviders, err := relayProcessor.GetUsedProviders(lavasession.DefaultExtensionsKey) + require.NoError(t, err) + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) + defer cancel() + canUse := usedProviders.TryLockSelection(ctx) + require.NoError(t, ctx.Err()) + require.Nil(t, canUse) + require.Zero(t, usedProviders.CurrentlyUsed()) + require.Zero(t, usedProviders.SessionsLatestBatch()) + consumerSessionsMap := lavasession.ConsumerSessionsMap{"lava@test": &lavasession.SessionInfo{}, "lava@test2": &lavasession.SessionInfo{}} + usedProviders.AddUsed(consumerSessionsMap, nil) + // check first reply + go sendNodeError(t, relayProcessor, "lava@test", time.Millisecond*5) + err = relayProcessor.WaitForResults(context.Background()) + require.NoError(t, err) + resultsOk := relayProcessor.HasResults() + require.True(t, resultsOk) + requiredNodeResults := relayProcessor.HasRequiredNodeResults() + require.False(t, requiredNodeResults) + // check first retry + go sendNodeError(t, relayProcessor, "lava@test", time.Millisecond*5) + err = relayProcessor.WaitForResults(context.Background()) + require.NoError(t, err) + resultsOk = relayProcessor.HasResults() + require.True(t, resultsOk) + requiredNodeResults = relayProcessor.HasRequiredNodeResults() + require.False(t, requiredNodeResults) + + // check first second retry + go sendNodeError(t, relayProcessor, "lava@test", time.Millisecond*5) + err = relayProcessor.WaitForResults(context.Background()) + require.NoError(t, err) + resultsOk = relayProcessor.HasResults() + require.True(t, resultsOk) + requiredNodeResults = relayProcessor.HasRequiredNodeResults() + require.True(t, requiredNodeResults) + + // 2nd relay, same inputs + // check hash map flow: + chainMsg, err = chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) + require.NoError(t, err) + relayProcessor = NewRelayProcessor(ctx, lavasession.NewDefaultUsedProvidersMap(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance, nil, nil) + usedProviders, err = relayProcessor.GetUsedProviders(lavasession.DefaultExtensionsKey) + require.NoError(t, err) + ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*10) + defer cancel() + canUse = usedProviders.TryLockSelection(ctx) + require.NoError(t, ctx.Err()) + require.Nil(t, canUse) + require.Zero(t, usedProviders.CurrentlyUsed()) + require.Zero(t, usedProviders.SessionsLatestBatch()) + consumerSessionsMap = lavasession.ConsumerSessionsMap{"lava@test": &lavasession.SessionInfo{}, "lava@test2": &lavasession.SessionInfo{}} + usedProviders.AddUsed(consumerSessionsMap, nil) + // check first reply, this time we have hash in map, so we don't retry node errors. + go sendNodeError(t, relayProcessor, "lava@test", time.Millisecond*5) + err = relayProcessor.WaitForResults(context.Background()) + require.NoError(t, err) + resultsOk = relayProcessor.HasResults() + require.True(t, resultsOk) + requiredNodeResults = relayProcessor.HasRequiredNodeResults() + require.True(t, requiredNodeResults) + + // 3nd relay, different inputs + // check hash map flow: + chainMsg, err = chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/18", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) + require.NoError(t, err) + relayProcessor = NewRelayProcessor(ctx, lavasession.NewDefaultUsedProvidersMap(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance, nil, nil) + usedProviders, err = relayProcessor.GetUsedProviders(lavasession.DefaultExtensionsKey) + require.NoError(t, err) + ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*10) + defer cancel() + canUse = usedProviders.TryLockSelection(ctx) + require.NoError(t, ctx.Err()) + require.Nil(t, canUse) + require.Zero(t, usedProviders.CurrentlyUsed()) + require.Zero(t, usedProviders.SessionsLatestBatch()) + consumerSessionsMap = lavasession.ConsumerSessionsMap{"lava@test": &lavasession.SessionInfo{}, "lava@test2": &lavasession.SessionInfo{}} + usedProviders.AddUsed(consumerSessionsMap, nil) + // check first reply, this time we have hash in map, so we don't retry node errors. + go sendNodeError(t, relayProcessor, "lava@test", time.Millisecond*5) + err = relayProcessor.WaitForResults(context.Background()) + require.NoError(t, err) + resultsOk = relayProcessor.HasResults() + require.True(t, resultsOk) + requiredNodeResults = relayProcessor.HasRequiredNodeResults() + // check our hashing mechanism works with different inputs + require.False(t, requiredNodeResults) + + // 4th relay, same inputs, this time a successful relay, should remove the hash from the map + chainMsg, err = chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) + require.NoError(t, err) + relayProcessor = NewRelayProcessor(ctx, lavasession.NewDefaultUsedProvidersMap(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance, nil, nil) + usedProviders, err = relayProcessor.GetUsedProviders(lavasession.DefaultExtensionsKey) + require.NoError(t, err) + ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*10) + defer cancel() + canUse = usedProviders.TryLockSelection(ctx) + require.NoError(t, ctx.Err()) + require.Nil(t, canUse) + require.Zero(t, usedProviders.CurrentlyUsed()) + require.Zero(t, usedProviders.SessionsLatestBatch()) + consumerSessionsMap = lavasession.ConsumerSessionsMap{"lava@test": &lavasession.SessionInfo{}, "lava@test2": &lavasession.SessionInfo{}} + usedProviders.AddUsed(consumerSessionsMap, nil) + // check first reply, this time we have hash in map, so we don't retry node errors. + hash, err := relayProcessor.getInputMsgInfoHashString() + require.NoError(t, err) + require.True(t, relayProcessor.relayRetriesManager.CheckHashInCache(hash)) + go sendSuccessResp(t, relayProcessor, "lava@test", time.Millisecond*5) + err = relayProcessor.WaitForResults(context.Background()) + require.NoError(t, err) + resultsOk = relayProcessor.HasResults() + require.True(t, resultsOk) + requiredNodeResults = relayProcessor.HasRequiredNodeResults() + require.True(t, requiredNodeResults) + + // A way for us to break early from sleep, just waiting up to 5 seconds and breaking as soon as the value we expect is there. + // After 5 seconds if its not there test will fail + for i := 0; i < 100; i++ { + if !relayProcessor.relayRetriesManager.CheckHashInCache(hash) { + break } - require.NoError(t, err) - chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) - require.NoError(t, err) - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) - - usedProviders := relayProcessor.GetUsedProviders() - ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) - defer cancel() - canUse := usedProviders.TryLockSelection(ctx) - require.NoError(t, ctx.Err()) - require.Nil(t, canUse) - require.Zero(t, usedProviders.CurrentlyUsed()) - require.Zero(t, usedProviders.SessionsLatestBatch()) - consumerSessionsMap := lavasession.ConsumerSessionsMap{"lava@test": &lavasession.SessionInfo{}, "lava@test2": &lavasession.SessionInfo{}} - usedProviders.AddUsed(consumerSessionsMap, nil) - // check first reply - go sendNodeError(relayProcessor, "lava@test", time.Millisecond*5) - err = relayProcessor.WaitForResults(context.Background()) - require.NoError(t, err) - resultsOk := relayProcessor.HasResults() - require.True(t, resultsOk) - requiredNodeResults := relayProcessor.HasRequiredNodeResults() - require.False(t, requiredNodeResults) - // check first retry - go sendNodeError(relayProcessor, "lava@test", time.Millisecond*5) - err = relayProcessor.WaitForResults(context.Background()) - require.NoError(t, err) - resultsOk = relayProcessor.HasResults() - require.True(t, resultsOk) - requiredNodeResults = relayProcessor.HasRequiredNodeResults() - require.False(t, requiredNodeResults) - - // check first second retry - go sendNodeError(relayProcessor, "lava@test", time.Millisecond*5) - err = relayProcessor.WaitForResults(context.Background()) - require.NoError(t, err) - resultsOk = relayProcessor.HasResults() - require.True(t, resultsOk) - requiredNodeResults = relayProcessor.HasRequiredNodeResults() - require.True(t, requiredNodeResults) - - // 2nd relay, same inputs - // check hash map flow: - chainMsg, err = chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) - require.NoError(t, err) - relayProcessor = NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) - usedProviders = relayProcessor.GetUsedProviders() - ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*10) - defer cancel() - canUse = usedProviders.TryLockSelection(ctx) - require.NoError(t, ctx.Err()) - require.Nil(t, canUse) - require.Zero(t, usedProviders.CurrentlyUsed()) - require.Zero(t, usedProviders.SessionsLatestBatch()) - consumerSessionsMap = lavasession.ConsumerSessionsMap{"lava@test": &lavasession.SessionInfo{}, "lava@test2": &lavasession.SessionInfo{}} - usedProviders.AddUsed(consumerSessionsMap, nil) - // check first reply, this time we have hash in map, so we don't retry node errors. - go sendNodeError(relayProcessor, "lava@test", time.Millisecond*5) - err = relayProcessor.WaitForResults(context.Background()) - require.NoError(t, err) - resultsOk = relayProcessor.HasResults() - require.True(t, resultsOk) - requiredNodeResults = relayProcessor.HasRequiredNodeResults() - require.True(t, requiredNodeResults) - - // 3nd relay, different inputs - // check hash map flow: - chainMsg, err = chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/18", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) - require.NoError(t, err) - relayProcessor = NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) - usedProviders = relayProcessor.GetUsedProviders() - ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*10) - defer cancel() - canUse = usedProviders.TryLockSelection(ctx) - require.NoError(t, ctx.Err()) - require.Nil(t, canUse) - require.Zero(t, usedProviders.CurrentlyUsed()) - require.Zero(t, usedProviders.SessionsLatestBatch()) - consumerSessionsMap = lavasession.ConsumerSessionsMap{"lava@test": &lavasession.SessionInfo{}, "lava@test2": &lavasession.SessionInfo{}} - usedProviders.AddUsed(consumerSessionsMap, nil) - // check first reply, this time we have hash in map, so we don't retry node errors. - go sendNodeError(relayProcessor, "lava@test", time.Millisecond*5) - err = relayProcessor.WaitForResults(context.Background()) - require.NoError(t, err) - resultsOk = relayProcessor.HasResults() - require.True(t, resultsOk) - requiredNodeResults = relayProcessor.HasRequiredNodeResults() - // check our hashing mechanism works with different inputs - require.False(t, requiredNodeResults) + time.Sleep(time.Millisecond * 50) // sleep up to 5 seconds + } + // after the sleep we should not have the hash anymore in the map as it was removed by a successful relay. + require.False(t, relayProcessor.relayRetriesManager.CheckHashInCache(hash)) +} - // 4th relay, same inputs, this time a successful relay, should remove the hash from the map - chainMsg, err = chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) - require.NoError(t, err) - relayProcessor = NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) - usedProviders = relayProcessor.GetUsedProviders() - ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*10) - defer cancel() - canUse = usedProviders.TryLockSelection(ctx) - require.NoError(t, ctx.Err()) - require.Nil(t, canUse) - require.Zero(t, usedProviders.CurrentlyUsed()) - require.Zero(t, usedProviders.SessionsLatestBatch()) - consumerSessionsMap = lavasession.ConsumerSessionsMap{"lava@test": &lavasession.SessionInfo{}, "lava@test2": &lavasession.SessionInfo{}} - usedProviders.AddUsed(consumerSessionsMap, nil) - // check first reply, this time we have hash in map, so we don't retry node errors. - hash, err := relayProcessor.getInputMsgInfoHashString() - require.NoError(t, err) - require.True(t, relayProcessor.relayRetriesManager.CheckHashInCache(hash)) - go sendSuccessResp(relayProcessor, "lava@test", time.Millisecond*5) - err = relayProcessor.WaitForResults(context.Background()) - require.NoError(t, err) - resultsOk = relayProcessor.HasResults() - require.True(t, resultsOk) - requiredNodeResults = relayProcessor.HasRequiredNodeResults() - require.True(t, requiredNodeResults) - - // A way for us to break early from sleep, just waiting up to 5 seconds and breaking as soon as the value we expect is there. - // After 5 seconds if its not there test will fail - for i := 0; i < 100; i++ { - if !relayProcessor.relayRetriesManager.CheckHashInCache(hash) { - break - } - time.Sleep(time.Millisecond * 50) // sleep up to 5 seconds - } - // after the sleep we should not have the hash anymore in the map as it was removed by a successful relay. - require.False(t, relayProcessor.relayRetriesManager.CheckHashInCache(hash)) +func TestRelayProcessorNodeErrorRetryFlowDisabled(t *testing.T) { + ctx := context.Background() + serverHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Handle the incoming request and provide the desired response + w.WriteHeader(http.StatusOK) }) + specId := "LAV1" + chainParser, _, _, closeServer, _, err := chainlib.CreateChainLibMocks(ctx, specId, spectypes.APIInterfaceRest, serverHandler, nil, "../../", nil) + if closeServer != nil { + defer closeServer() + } + require.NoError(t, err) + chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) + require.NoError(t, err) + relayProcessor := NewRelayProcessor(ctx, lavasession.NewDefaultUsedProvidersMap(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance, nil, nil) + relayProcessor.disableRelayRetry = true + usedProviders, err := relayProcessor.GetUsedProviders(lavasession.DefaultExtensionsKey) + require.NoError(t, err) + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) + defer cancel() + canUse := usedProviders.TryLockSelection(ctx) + require.NoError(t, ctx.Err()) + require.Nil(t, canUse) + require.Zero(t, usedProviders.CurrentlyUsed()) + require.Zero(t, usedProviders.SessionsLatestBatch()) + consumerSessionsMap := lavasession.ConsumerSessionsMap{"lava@test": &lavasession.SessionInfo{}, "lava@test2": &lavasession.SessionInfo{}} + usedProviders.AddUsed(consumerSessionsMap, nil) + // check first reply + go sendNodeError(t, relayProcessor, "lava@test", time.Millisecond*5) + err = relayProcessor.WaitForResults(context.Background()) + require.NoError(t, err) + resultsOk := relayProcessor.HasResults() + require.True(t, resultsOk) + requiredNodeResults := relayProcessor.HasRequiredNodeResults() + require.True(t, requiredNodeResults) +} - t.Run("retry_flow_disabled", func(t *testing.T) { - ctx := context.Background() - serverHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - // Handle the incoming request and provide the desired response - w.WriteHeader(http.StatusOK) - }) - specId := "LAV1" - chainParser, _, _, closeServer, _, err := chainlib.CreateChainLibMocks(ctx, specId, spectypes.APIInterfaceRest, serverHandler, nil, "../../", nil) - if closeServer != nil { - defer closeServer() - } - require.NoError(t, err) - chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) - require.NoError(t, err) - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) - relayProcessor.disableRelayRetry = true - usedProviders := relayProcessor.GetUsedProviders() - ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) - defer cancel() - canUse := usedProviders.TryLockSelection(ctx) - require.NoError(t, ctx.Err()) - require.Nil(t, canUse) - require.Zero(t, usedProviders.CurrentlyUsed()) - require.Zero(t, usedProviders.SessionsLatestBatch()) - consumerSessionsMap := lavasession.ConsumerSessionsMap{"lava@test": &lavasession.SessionInfo{}, "lava@test2": &lavasession.SessionInfo{}} - usedProviders.AddUsed(consumerSessionsMap, nil) - // check first reply - go sendNodeError(relayProcessor, "lava@test", time.Millisecond*5) - err = relayProcessor.WaitForResults(context.Background()) - require.NoError(t, err) - resultsOk := relayProcessor.HasResults() - require.True(t, resultsOk) - requiredNodeResults := relayProcessor.HasRequiredNodeResults() - require.True(t, requiredNodeResults) +func TestRelayProcessorNodeErrorRetryFlowForceArchiveLaterNoArchiveMessage(t *testing.T) { + ctx := context.Background() + serverHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Handle the incoming request and provide the desired response + w.WriteHeader(http.StatusOK) }) + specId := "NEAR" + chainParser, _, _, closeServer, _, err := chainlib.CreateChainLibMocks(ctx, specId, spectypes.APIInterfaceJsonRPC, serverHandler, nil, "../../", nil) + if closeServer != nil { + defer closeServer() + } + + testExtensionName := "banana" + require.NoError(t, err) + chainMsg, err := chainParser.ParseMsg("", []byte(paramsWithHash32Bits), http.MethodPost, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) + require.NoError(t, err) + chainMsg.SetExtension(&spectypes.Extension{Name: testExtensionName}) // add this extension to make sure it is not removed when we remove the archive extension + + relayPrivateData := &pairingtypes.RelayPrivateData{Extensions: []string{testExtensionName}} // add this extension to make sure it is not removed when we remove the archive extension + archiveExtension := &spectypes.Extension{Name: extensionslib.ExtensionTypeArchive} + relayExtensionManager := NewRelayExtensionManager(chainMsg, relayPrivateData, archiveExtension) + newUsedProvidersMap := map[string]*lavasession.UsedProviders{chainMsg.GetConcatenatedExtensions(): lavasession.NewUsedProviders(nil)} + relayProcessor := NewRelayProcessor(ctx, newUsedProvidersMap, 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance, relayExtensionManager, nil) + + usedProviders, err := relayProcessor.GetUsedProviders(chainMsg.GetConcatenatedExtensions()) + require.NoError(t, err) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + canUse := usedProviders.TryLockSelection(ctx) + require.NoError(t, ctx.Err()) + require.Nil(t, canUse) + require.Zero(t, usedProviders.CurrentlyUsed()) + require.Zero(t, usedProviders.SessionsLatestBatch()) + consumerSessionsMap := lavasession.ConsumerSessionsMap{"lava@test": &lavasession.SessionInfo{}, "lava@test2": &lavasession.SessionInfo{}} + usedProviders.AddUsed(consumerSessionsMap, nil) + + // first node error + go sendNodeError(t, relayProcessor, "lava@test", time.Millisecond*5) + + // check first reply + err = relayProcessor.WaitForResults(context.Background()) + require.NoError(t, err) + + resultsOk := relayProcessor.HasResults() + require.True(t, resultsOk) + + requiredNodeResults := relayProcessor.HasRequiredNodeResults() + require.False(t, requiredNodeResults) + + // verify that the archive is now forced + require.True(t, lavaslices.ContainsPredicate(chainMsg.GetExtensions(), func(ext *spectypes.Extension) bool { return ext.Name == extensionslib.ExtensionTypeArchive })) + require.True(t, lavaslices.Contains(relayPrivateData.Extensions, extensionslib.ExtensionTypeArchive)) + + // verify that the original extensions are still there + require.True(t, lavaslices.ContainsPredicate(chainMsg.GetExtensions(), func(ext *spectypes.Extension) bool { return ext.Name == testExtensionName })) + require.True(t, lavaslices.Contains(relayPrivateData.Extensions, testExtensionName)) + + // verify that the header + require.Contains(t, relayProcessor.GetUserHeaders(), pairingtypes.Metadata{Name: common.LAVA_EXTENSION_FORCED, Value: extensionslib.ExtensionTypeArchive}) + + // second node error + go sendNodeError(t, relayProcessor, "lava@test", time.Millisecond*5) + + // check first retry + err = relayProcessor.WaitForResults(context.Background()) + require.NoError(t, err) + + resultsOk = relayProcessor.HasResults() + require.True(t, resultsOk) + + requiredNodeResults = relayProcessor.HasRequiredNodeResults() + require.False(t, requiredNodeResults) + + // verify that the archive is now not in the extensions list + require.False(t, lavaslices.ContainsPredicate(chainMsg.GetExtensions(), relayExtensionManager.matchManagedExtension), chainMsg.GetExtensions()) + require.False(t, lavaslices.Contains(relayPrivateData.Extensions, extensionslib.ExtensionTypeArchive), relayPrivateData.Extensions) + + // verify that the original extensions are still there + require.True(t, lavaslices.ContainsPredicate(chainMsg.GetExtensions(), func(ext *spectypes.Extension) bool { return ext.Name == testExtensionName }), chainMsg.GetExtensions()) + require.True(t, lavaslices.Contains(relayPrivateData.Extensions, testExtensionName), relayPrivateData.Extensions) + + // verify that the header + require.Contains(t, relayProcessor.GetUserHeaders(), pairingtypes.Metadata{Name: common.LAVA_EXTENSION_FORCED, Value: extensionslib.ExtensionTypeArchive}) + + // third node error + go sendNodeError(t, relayProcessor, "lava@test", time.Millisecond*5) + // check third retry + err = relayProcessor.WaitForResults(context.Background()) + require.NoError(t, err) + // should have enough results now. + requiredNodeResults = relayProcessor.HasRequiredNodeResults() + require.True(t, requiredNodeResults) } func TestRelayProcessorTimeout(t *testing.T) { @@ -323,9 +520,10 @@ func TestRelayProcessorTimeout(t *testing.T) { require.NoError(t, err) chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) + relayProcessor := NewRelayProcessor(ctx, lavasession.NewDefaultUsedProvidersMap(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance, nil, nil) - usedProviders := relayProcessor.GetUsedProviders() + usedProviders, err := relayProcessor.GetUsedProviders(lavasession.DefaultExtensionsKey) + require.NoError(t, err) ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) defer cancel() canUse := usedProviders.TryLockSelection(ctx) @@ -345,7 +543,7 @@ func TestRelayProcessorTimeout(t *testing.T) { consumerSessionsMap := lavasession.ConsumerSessionsMap{"lava@test3": &lavasession.SessionInfo{}, "lava@test4": &lavasession.SessionInfo{}} usedProviders.AddUsed(consumerSessionsMap, nil) }() - go sendSuccessResp(relayProcessor, "lava@test", time.Millisecond*20) + go sendSuccessResp(t, relayProcessor, "lava@test", time.Millisecond*20) ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*200) defer cancel() err = relayProcessor.WaitForResults(ctx) @@ -375,9 +573,10 @@ func TestRelayProcessorRetry(t *testing.T) { require.NoError(t, err) chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) + relayProcessor := NewRelayProcessor(ctx, lavasession.NewDefaultUsedProvidersMap(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance, nil, nil) - usedProviders := relayProcessor.GetUsedProviders() + usedProviders, err := relayProcessor.GetUsedProviders(lavasession.DefaultExtensionsKey) + require.NoError(t, err) ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) defer cancel() canUse := usedProviders.TryLockSelection(ctx) @@ -388,8 +587,8 @@ func TestRelayProcessorRetry(t *testing.T) { consumerSessionsMap := lavasession.ConsumerSessionsMap{"lava@test": &lavasession.SessionInfo{}, "lava@test2": &lavasession.SessionInfo{}} usedProviders.AddUsed(consumerSessionsMap, nil) - go sendProtocolError(relayProcessor, "lava@test", time.Millisecond*5, fmt.Errorf("bad")) - go sendSuccessResp(relayProcessor, "lava@test2", time.Millisecond*20) + go sendProtocolError(t, relayProcessor, "lava@test", time.Millisecond*5, fmt.Errorf("bad")) + go sendSuccessResp(t, relayProcessor, "lava@test2", time.Millisecond*20) ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*200) defer cancel() err = relayProcessor.WaitForResults(ctx) @@ -419,9 +618,10 @@ func TestRelayProcessorRetryNodeError(t *testing.T) { require.NoError(t, err) chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) + relayProcessor := NewRelayProcessor(ctx, lavasession.NewDefaultUsedProvidersMap(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance, nil, nil) - usedProviders := relayProcessor.GetUsedProviders() + usedProviders, err := relayProcessor.GetUsedProviders(lavasession.DefaultExtensionsKey) + require.NoError(t, err) ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) defer cancel() canUse := usedProviders.TryLockSelection(ctx) @@ -432,8 +632,8 @@ func TestRelayProcessorRetryNodeError(t *testing.T) { consumerSessionsMap := lavasession.ConsumerSessionsMap{"lava@test": &lavasession.SessionInfo{}, "lava@test2": &lavasession.SessionInfo{}} usedProviders.AddUsed(consumerSessionsMap, nil) - go sendProtocolError(relayProcessor, "lava@test", time.Millisecond*5, fmt.Errorf("bad")) - go sendNodeError(relayProcessor, "lava@test2", time.Millisecond*20) + go sendProtocolError(t, relayProcessor, "lava@test", time.Millisecond*5, fmt.Errorf("bad")) + go sendNodeError(t, relayProcessor, "lava@test2", time.Millisecond*20) ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*200) defer cancel() err = relayProcessor.WaitForResults(ctx) @@ -444,8 +644,8 @@ func TestRelayProcessorRetryNodeError(t *testing.T) { require.Equal(t, uint64(1), protocolErrors) returnedResult, err := relayProcessor.ProcessingResult() require.NoError(t, err) - require.Equal(t, string(returnedResult.Reply.Data), `{"message":"bad","code":123}`) - require.Equal(t, returnedResult.StatusCode, http.StatusInternalServerError) + require.Equal(t, nodeError, string(returnedResult.Reply.Data)) + require.Equal(t, http.StatusInternalServerError, returnedResult.StatusCode) }) } @@ -464,8 +664,9 @@ func TestRelayProcessorStatefulApi(t *testing.T) { require.NoError(t, err) chainMsg, err := chainParser.ParseMsg("/cosmos/tx/v1beta1/txs", []byte("data"), http.MethodPost, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) - usedProviders := relayProcessor.GetUsedProviders() + relayProcessor := NewRelayProcessor(ctx, lavasession.NewDefaultUsedProvidersMap(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance, nil, nil) + usedProviders, err := relayProcessor.GetUsedProviders(lavasession.DefaultExtensionsKey) + require.NoError(t, err) ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) defer cancel() canUse := usedProviders.TryLockSelection(ctx) @@ -475,10 +676,10 @@ func TestRelayProcessorStatefulApi(t *testing.T) { require.Zero(t, usedProviders.SessionsLatestBatch()) consumerSessionsMap := lavasession.ConsumerSessionsMap{"lava4@test": &lavasession.SessionInfo{}, "lava3@test": &lavasession.SessionInfo{}, "lava@test": &lavasession.SessionInfo{}, "lava2@test": &lavasession.SessionInfo{}} usedProviders.AddUsed(consumerSessionsMap, nil) - go sendProtocolError(relayProcessor, "lava@test", time.Millisecond*5, fmt.Errorf("bad")) - go sendNodeError(relayProcessor, "lava2@test", time.Millisecond*20) - go sendNodeError(relayProcessor, "lava3@test", time.Millisecond*25) - go sendSuccessResp(relayProcessor, "lava4@test", time.Millisecond*100) + go sendProtocolError(t, relayProcessor, "lava@test", time.Millisecond*5, fmt.Errorf("bad")) + go sendNodeError(t, relayProcessor, "lava2@test", time.Millisecond*20) + go sendNodeError(t, relayProcessor, "lava3@test", time.Millisecond*25) + go sendSuccessResp(t, relayProcessor, "lava4@test", time.Millisecond*100) ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*200) defer cancel() err = relayProcessor.WaitForResults(ctx) @@ -509,8 +710,9 @@ func TestRelayProcessorStatefulApiErr(t *testing.T) { require.NoError(t, err) chainMsg, err := chainParser.ParseMsg("/cosmos/tx/v1beta1/txs", []byte("data"), http.MethodPost, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) - usedProviders := relayProcessor.GetUsedProviders() + relayProcessor := NewRelayProcessor(ctx, lavasession.NewDefaultUsedProvidersMap(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance, nil, nil) + usedProviders, err := relayProcessor.GetUsedProviders(lavasession.DefaultExtensionsKey) + require.NoError(t, err) ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) defer cancel() canUse := usedProviders.TryLockSelection(ctx) @@ -520,9 +722,9 @@ func TestRelayProcessorStatefulApiErr(t *testing.T) { require.Zero(t, usedProviders.SessionsLatestBatch()) consumerSessionsMap := lavasession.ConsumerSessionsMap{"lava4@test": &lavasession.SessionInfo{}, "lava3@test": &lavasession.SessionInfo{}, "lava@test": &lavasession.SessionInfo{}, "lava2@test": &lavasession.SessionInfo{}} usedProviders.AddUsed(consumerSessionsMap, nil) - go sendProtocolError(relayProcessor, "lava@test", time.Millisecond*5, fmt.Errorf("bad")) - go sendNodeError(relayProcessor, "lava2@test", time.Millisecond*20) - go sendNodeError(relayProcessor, "lava3@test", time.Millisecond*25) + go sendProtocolError(t, relayProcessor, "lava@test", time.Millisecond*5, fmt.Errorf("bad")) + go sendNodeError(t, relayProcessor, "lava2@test", time.Millisecond*20) + go sendNodeError(t, relayProcessor, "lava3@test", time.Millisecond*25) ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*50) defer cancel() err = relayProcessor.WaitForResults(ctx) @@ -533,8 +735,8 @@ func TestRelayProcessorStatefulApiErr(t *testing.T) { require.Equal(t, uint64(1), protocolErrors) returnedResult, err := relayProcessor.ProcessingResult() require.NoError(t, err) - require.Equal(t, string(returnedResult.Reply.Data), `{"message":"bad","code":123}`) - require.Equal(t, returnedResult.StatusCode, http.StatusInternalServerError) + require.Equal(t, nodeError, string(returnedResult.Reply.Data)) + require.Equal(t, http.StatusInternalServerError, returnedResult.StatusCode) }) } @@ -553,8 +755,9 @@ func TestRelayProcessorLatest(t *testing.T) { require.NoError(t, err) chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/latest", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) - usedProviders := relayProcessor.GetUsedProviders() + relayProcessor := NewRelayProcessor(ctx, lavasession.NewDefaultUsedProvidersMap(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance, nil, nil) + usedProviders, err := relayProcessor.GetUsedProviders(lavasession.DefaultExtensionsKey) + require.NoError(t, err) ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) defer cancel() canUse := usedProviders.TryLockSelection(ctx) @@ -566,8 +769,8 @@ func TestRelayProcessorLatest(t *testing.T) { consumerSessionsMap := lavasession.ConsumerSessionsMap{"lava@test": &lavasession.SessionInfo{}, "lava@test2": &lavasession.SessionInfo{}} usedProviders.AddUsed(consumerSessionsMap, nil) - go sendProtocolError(relayProcessor, "lava@test", time.Millisecond*5, fmt.Errorf("bad")) - go sendSuccessResp(relayProcessor, "lava@test2", time.Millisecond*20) + go sendProtocolError(t, relayProcessor, "lava@test", time.Millisecond*5, fmt.Errorf("bad")) + go sendSuccessResp(t, relayProcessor, "lava@test2", time.Millisecond*20) ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*200) defer cancel() err = relayProcessor.WaitForResults(ctx) diff --git a/protocol/rpcconsumer/rpcconsumer.go b/protocol/rpcconsumer/rpcconsumer.go index fc01203918..c46ade721d 100644 --- a/protocol/rpcconsumer/rpcconsumer.go +++ b/protocol/rpcconsumer/rpcconsumer.go @@ -90,7 +90,7 @@ func (s *strategyValue) Type() string { type ConsumerStateTrackerInf interface { RegisterForVersionUpdates(ctx context.Context, version *protocoltypes.Version, versionValidator updaters.VersionValidationInf) RegisterConsumerSessionManagerForPairingUpdates(ctx context.Context, consumerSessionManager *lavasession.ConsumerSessionManager) - RegisterForSpecUpdates(ctx context.Context, specUpdatable updaters.SpecUpdatable, endpoint lavasession.RPCEndpoint) error + RegisterForSpecUpdates(ctx context.Context, specUpdatable updaters.SpecUpdatable, endpoint lavasession.RPCEndpoint, offlineSpecOptions *statetracker.OfflineSpecOptions) error RegisterFinalizationConsensusForUpdates(context.Context, *finalizationconsensus.FinalizationConsensus) RegisterForDowntimeParamsUpdates(ctx context.Context, downtimeParamsUpdatable updaters.DowntimeParamsUpdatable) error TxConflictDetection(ctx context.Context, finalizationConflict *conflicttypes.FinalizationConflict, responseConflict *conflicttypes.ResponseConflict, conflictHandler common.ConflictHandlerInterface) error @@ -213,7 +213,7 @@ func (rpcc *RPCConsumer) Start(ctx context.Context, options *rpcConsumerStartOpt policyUpdaters.Store(rpcEndpoint.ChainID, updaters.NewPolicyUpdater(chainID, consumerStateTracker, consumerAddr.String(), chainParser, *rpcEndpoint)) } // register for spec updates - err = rpcc.consumerStateTracker.RegisterForSpecUpdates(ctx, chainParser, *rpcEndpoint) + err = rpcc.consumerStateTracker.RegisterForSpecUpdates(ctx, chainParser, *rpcEndpoint, &statetracker.OfflineSpecOptions{SpecFilePath: options.cmdFlags.OfflineSpecPath, SpecId: rpcEndpoint.ChainID}) if err != nil { err = utils.LavaFormatError("failed registering for spec updates", err, utils.Attribute{Key: "endpoint", Value: rpcEndpoint}) errCh <- err @@ -557,6 +557,12 @@ rpcconsumer consumer_examples/full_consumer_example.yml --cache-be "127.0.0.1:77 DebugRelays: viper.GetBool(DebugRelaysFlagName), DisableConflictTransactions: viper.GetBool(common.DisableConflictTransactionsFlag), DisableRetryOnNodeErrors: viper.GetBool(common.DisableRetryOnNodeErrorsFlag), + OfflineSpecPath: viper.GetString(common.UseOfflineSpecFlag), + } + + // validate user is does not provide multi chain setup when using the offline spec feature. + if consumerPropagatedFlags.OfflineSpecPath != "" && len(rpcEndpoints) > 1 { + utils.LavaFormatFatal("offline spec modifications are supported only in single chain bootstrapping", nil, utils.LogAttr("len(rpcEndpoints)", len(rpcEndpoints)), utils.LogAttr("rpcEndpoints", rpcEndpoints)) } rpcConsumerSharedState := viper.GetBool(common.SharedStateFlag) @@ -599,6 +605,7 @@ rpcconsumer consumer_examples/full_consumer_example.yml --cache-be "127.0.0.1:77 cmdRPCConsumer.Flags().Bool(common.DisableConflictTransactionsFlag, false, "disabling conflict transactions, this flag should not be used as it harms the network's data reliability and therefore the service.") cmdRPCConsumer.Flags().DurationVar(&updaters.TimeOutForFetchingLavaBlocks, common.TimeOutForFetchingLavaBlocksFlag, time.Second*5, "setting the timeout for fetching lava blocks") cmdRPCConsumer.Flags().Bool(common.DisableRetryOnNodeErrorsFlag, false, "Disable relay retries on node errors, prevent the rpcconsumer trying a different provider") + cmdRPCConsumer.Flags().String(common.UseOfflineSpecFlag, "", "load offline spec provided path to spec file, used to test specs before they are proposed on chain") common.AddRollingLogConfig(cmdRPCConsumer) return cmdRPCConsumer diff --git a/protocol/rpcconsumer/rpcconsumer_server.go b/protocol/rpcconsumer/rpcconsumer_server.go index fb56e3c533..8eb0f790f4 100644 --- a/protocol/rpcconsumer/rpcconsumer_server.go +++ b/protocol/rpcconsumer/rpcconsumer_server.go @@ -9,11 +9,10 @@ import ( "sync" "time" - "github.com/goccy/go-json" - sdkerrors "cosmossdk.io/errors" "github.com/btcsuite/btcd/btcec/v2" sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/goccy/go-json" "github.com/lavanet/lava/v2/protocol/chainlib" "github.com/lavanet/lava/v2/protocol/chainlib/chainproxy/rpcclient" "github.com/lavanet/lava/v2/protocol/chainlib/extensionslib" @@ -26,6 +25,7 @@ import ( "github.com/lavanet/lava/v2/protocol/metrics" "github.com/lavanet/lava/v2/protocol/performance" "github.com/lavanet/lava/v2/utils" + "github.com/lavanet/lava/v2/utils/lavaslices" "github.com/lavanet/lava/v2/utils/protocopy" "github.com/lavanet/lava/v2/utils/rand" conflicttypes "github.com/lavanet/lava/v2/x/conflict/types" @@ -226,12 +226,28 @@ func (rpccs *RPCConsumerServer) craftRelay(ctx context.Context) (ok bool, relay func (rpccs *RPCConsumerServer) sendRelayWithRetries(ctx context.Context, retries int, initialRelays bool, relay *pairingtypes.RelayPrivateData, chainMessage chainlib.ChainMessage) (bool, error) { success := false var err error - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMessage, rpccs.consumerConsistency, "-init-", "", rpccs.debugRelays, rpccs.rpcConsumerLogs, rpccs, rpccs.disableNodeErrorRetry, rpccs.relayRetriesManager) + extensionsKey := chainMessage.GetConcatenatedExtensions() + usedProviders := lavasession.NewUsedProviders(nil) + relayProcessor := NewRelayProcessor(ctx, + map[string]*lavasession.UsedProviders{extensionsKey: usedProviders}, + 1, + chainMessage, + rpccs.consumerConsistency, + "-init-", + "", + rpccs.debugRelays, + rpccs.rpcConsumerLogs, + rpccs, + rpccs.disableNodeErrorRetry, + rpccs.relayRetriesManager, + nil, + nil, + ) for i := 0; i < retries; i++ { err = rpccs.sendRelayToProvider(ctx, chainMessage, relay, "-init-", "", relayProcessor, nil) if lavasession.PairingListEmptyError.Is(err) { // we don't have pairings anymore, could be related to unwanted providers - relayProcessor.GetUsedProviders().ClearUnwanted() + usedProviders.ClearUnwanted() err = rpccs.sendRelayToProvider(ctx, chainMessage, relay, "-init-", "", relayProcessor, nil) } if err != nil { @@ -383,11 +399,12 @@ func (rpccs *RPCConsumerServer) SendParsedRelay( if found { dataReliabilityContext = utils.WithUniqueIdentifier(dataReliabilityContext, guid) } - go rpccs.sendDataReliabilityRelayIfApplicable(dataReliabilityContext, dappID, consumerIp, chainMessage, dataReliabilityThreshold, relayProcessor) // runs asynchronously + + go rpccs.sendDataReliabilityRelayIfApplicable(dataReliabilityContext, dappID, consumerIp, chainMessage, dataReliabilityThreshold, relayProcessor, directiveHeaders) // runs asynchronously } returnedResult, err := relayProcessor.ProcessingResult() - rpccs.appendHeadersToRelayResult(ctx, returnedResult, relayProcessor.ProtocolErrors(), relayProcessor, directiveHeaders) + rpccs.appendHeadersToRelayResult(ctx, returnedResult, relayProcessor.ProtocolErrors(), relayProcessor, directiveHeaders, chainMessage) if err != nil { return returnedResult, utils.LavaFormatError("failed processing responses from providers", err, utils.Attribute{Key: "GUID", Value: ctx}, utils.LogAttr("endpoint", rpccs.listenEndpoint.Key())) } @@ -411,7 +428,32 @@ func (rpccs *RPCConsumerServer) ProcessRelaySend(ctx context.Context, directiveH // make sure all of the child contexts are cancelled when we exit ctx, cancel := context.WithCancel(ctx) defer cancel() - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(directiveHeaders), rpccs.requiredResponses, chainMessage, rpccs.consumerConsistency, dappID, consumerIp, rpccs.debugRelays, rpccs.rpcConsumerLogs, rpccs, rpccs.disableNodeErrorRetry, rpccs.relayRetriesManager) + + var relayExtensionManager *RelayExtensionManager + archiveExtension := rpccs.chainParser.ExtensionsParser().GetExtensionByName(extensionslib.ExtensionTypeArchive) + if archiveExtension != nil { // archive extension is configured + relayExtensionManager = NewRelayExtensionManager(chainMessage, relayRequestData, archiveExtension) + } + + newUsedProvidersMap := map[string]*lavasession.UsedProviders{chainMessage.GetConcatenatedExtensions(): lavasession.NewUsedProviders(directiveHeaders)} + + relayProcessor := NewRelayProcessor( + ctx, + newUsedProvidersMap, + rpccs.requiredResponses, + chainMessage, + rpccs.consumerConsistency, + dappID, + consumerIp, + rpccs.debugRelays, + rpccs.rpcConsumerLogs, + rpccs, + rpccs.disableNodeErrorRetry, + rpccs.relayRetriesManager, + relayExtensionManager, + directiveHeaders, + ) + var err error // try sending a relay 3 times. if failed return the error for retryFirstRelayAttempt := 0; retryFirstRelayAttempt < SendRelayAttempts; retryFirstRelayAttempt++ { @@ -460,7 +502,7 @@ func (rpccs *RPCConsumerServer) ProcessRelaySend(ctx context.Context, directiveH currentlyUsedIsEmptyCounter := 0 if err != nil { for validateNoProvidersAreUsed := 0; validateNoProvidersAreUsed < numberOfTimesToCheckCurrentlyUsedIsEmpty; validateNoProvidersAreUsed++ { - if relayProcessor.usedProviders.CurrentlyUsed() == 0 { + if relayProcessor.GetNumberOfAllCurrentlyUsedProviders() == 0 { currentlyUsedIsEmptyCounter++ } time.Sleep(5 * time.Millisecond) @@ -471,9 +513,11 @@ func (rpccs *RPCConsumerServer) ProcessRelaySend(ctx context.Context, directiveH } } } + // every relay timeout we send a new batch startNewBatchTicker := time.NewTicker(relayTimeout) defer startNewBatchTicker.Stop() + numberOfRetriesLaunched := 0 for { select { @@ -485,6 +529,7 @@ func (rpccs *RPCConsumerServer) ProcessRelaySend(ctx context.Context, directiveH if !relayProcessor.ShouldRetry(numberOfRetriesLaunched) { return relayProcessor, nil } + // otherwise continue sending another relay err := rpccs.sendRelayToProvider(processingCtx, chainMessage, relayRequestData, dappID, consumerIp, relayProcessor, nil) go validateReturnCondition(err) @@ -497,6 +542,7 @@ func (rpccs *RPCConsumerServer) ProcessRelaySend(ctx context.Context, directiveH case <-startNewBatchTicker.C: // only trigger another batch for non BestResult relays or if we didn't pass the retry limit. if relayProcessor.ShouldRetry(numberOfRetriesLaunched) { + relayProcessor.ForceManagedExtensionIfNeeded() // limit the number of retries called from the new batch ticker flow. // if we pass the limit we just wait for the relays we sent to return. err := rpccs.sendRelayToProvider(processingCtx, chainMessage, relayRequestData, dappID, consumerIp, relayProcessor, nil) @@ -583,6 +629,8 @@ func (rpccs *RPCConsumerServer) sendRelayToProvider( // Get Session. we get session here so we can use the epoch in the callbacks reqBlock, _ := chainMessage.RequestedBlock() + earliestBlockHashRequested := spectypes.NOT_APPLICABLE + latestBlockHashRequested := spectypes.NOT_APPLICABLE // try using cache before sending relay var cacheError error if rpccs.cache.CacheActive() { // use cache only if its defined. @@ -595,13 +643,14 @@ func (rpccs *RPCConsumerServer) sendRelayToProvider( } else { cacheCtx, cancel := context.WithTimeout(ctx, common.CacheTimeout) cacheReply, cacheError = rpccs.cache.GetEntry(cacheCtx, &pairingtypes.RelayCacheGet{ - RequestHash: hashKey, - RequestedBlock: relayRequestData.RequestBlock, - ChainId: chainId, - BlockHash: nil, - Finalized: false, - SharedStateId: sharedStateId, - SeenBlock: relayRequestData.SeenBlock, + RequestedBlock: relayRequestData.RequestBlock, + RequestHash: hashKey, + ChainId: chainId, + BlockHash: nil, + Finalized: false, + SharedStateId: sharedStateId, + SeenBlock: relayRequestData.SeenBlock, + BlocksHashesToHeights: rpccs.newBlocksHashesToHeightsSliceFromRequestedBlockHashes(chainMessage.GetRequestedBlocksHashes()), }) // caching in the portal doesn't care about hashes, and we don't have data on finalization yet cancel() reply := cacheReply.GetReply() @@ -637,6 +686,9 @@ func (rpccs *RPCConsumerServer) sendRelayToProvider( }) return nil } + + latestBlockHashRequested, earliestBlockHashRequested = rpccs.getEarliestBlockHashRequestedFromCacheReply(cacheReply) + // cache failed, move on to regular relay if performance.NotConnectedError.Is(cacheError) { utils.LavaFormatDebug("cache not connected", utils.LogAttr("error", cacheError)) @@ -648,15 +700,15 @@ func (rpccs *RPCConsumerServer) sendRelayToProvider( } } - if reqBlock == spectypes.LATEST_BLOCK && relayRequestData.SeenBlock != 0 { - // make optimizer select a provider that is likely to have the latest seen block - reqBlock = relayRequestData.SeenBlock - } // consumerEmergencyTracker always use latest virtual epoch virtualEpoch := rpccs.consumerTxSender.GetLatestVirtualEpoch() addon := chainlib.GetAddon(chainMessage) + reqBlock = rpccs.resolveRequestedBlockAndUpdateExtensionIfNeeded(reqBlock, relayRequestData, latestBlockHashRequested, earliestBlockHashRequested, addon, relayProcessor, chainMessage) extensions := chainMessage.GetExtensions() - usedProviders := relayProcessor.GetUsedProviders() + usedProviders, err := relayProcessor.GetCurrentUsedProviders() + if err != nil { + return err + } sessions, err := rpccs.consumerSessionManager.GetSessions(ctx, chainlib.GetComputeUnits(chainMessage), usedProviders, reqBlock, addon, extensions, chainlib.GetStateful(chainMessage), virtualEpoch) if err != nil { if lavasession.PairingListEmptyError.Is(err) { @@ -691,6 +743,7 @@ func (rpccs *RPCConsumerServer) sendRelayToProvider( utils.LogAttr("addons", addon), utils.LogAttr("extensions", extensions), utils.LogAttr("AllowSessionDegradation", relayProcessor.GetAllowSessionDegradation()), + utils.LogAttr("relayRequestData_extensions", relayRequestData.Extensions), ) } @@ -853,6 +906,7 @@ func (rpccs *RPCConsumerServer) sendRelayToProvider( requestedBlock := localRelayResult.Request.RelayData.RequestBlock // get requested block before removing it from the data seenBlock := localRelayResult.Request.RelayData.SeenBlock // get seen block before removing it from the data hashKey, _, hashErr := chainlib.HashCacheRequest(localRelayResult.Request.RelayData, chainId) // get the hash (this changes the data) + finalizedBlockHashes := localRelayResult.Reply.FinalizedBlocksHashes go func() { // deal with copying error. @@ -863,27 +917,45 @@ func (rpccs *RPCConsumerServer) sendRelayToProvider( ) return } + chainMessageRequestedBlock, _ := chainMessage.RequestedBlock() if chainMessageRequestedBlock == spectypes.NOT_APPLICABLE { return } + blockHashesToHeights := make([]*pairingtypes.BlockHashToHeight, 0) + + var finalizedBlockHashesObj map[int64]string + err := json.Unmarshal(finalizedBlockHashes, &finalizedBlockHashesObj) + if err != nil { + utils.LavaFormatError("failed unmarshalling finalizedBlockHashes", err, + utils.LogAttr("GUID", ctx), + utils.LogAttr("finalizedBlockHashes", finalizedBlockHashes), + utils.LogAttr("providerAddr", providerPublicAddress), + ) + } else { + blockHashesToHeights = rpccs.newBlocksHashesToHeightsSliceFromFinalizationConsensus(finalizedBlockHashesObj) + } + + blockHashesToHeights = rpccs.updateBlocksHashesToHeightsIfNeeded(extensions, chainMessage, blockHashesToHeights, latestBlock) + new_ctx := context.Background() new_ctx, cancel := context.WithTimeout(new_ctx, common.DataReliabilityTimeoutIncrease) defer cancel() _, averageBlockTime, _, _ := rpccs.chainParser.ChainBlockStats() err2 := rpccs.cache.SetEntry(new_ctx, &pairingtypes.RelayCacheSet{ - RequestHash: hashKey, - ChainId: chainId, - RequestedBlock: requestedBlock, - SeenBlock: seenBlock, - BlockHash: nil, // consumer cache doesn't care about block hashes - Response: copyReply, - Finalized: localRelayResult.Finalized, - OptionalMetadata: nil, - SharedStateId: sharedStateId, - AverageBlockTime: int64(averageBlockTime), // by using average block time we can set longer TTL - IsNodeError: isNodeError, + RequestHash: hashKey, + ChainId: chainId, + RequestedBlock: requestedBlock, + SeenBlock: seenBlock, + BlockHash: nil, // consumer cache doesn't care about block hashes + Response: copyReply, + Finalized: localRelayResult.Finalized, + OptionalMetadata: nil, + SharedStateId: sharedStateId, + AverageBlockTime: int64(averageBlockTime), // by using average block time we can set longer TTL + IsNodeError: isNodeError, + BlocksHashesToHeights: blockHashesToHeights, }) if err2 != nil { utils.LavaFormatWarning("error updating cache with new entry", err2) @@ -898,6 +970,91 @@ func (rpccs *RPCConsumerServer) sendRelayToProvider( return nil } +func (rpccs *RPCConsumerServer) updateBlocksHashesToHeightsIfNeeded(extensions []*spectypes.Extension, chainMessage chainlib.ChainMessage, blockHashesToHeights []*pairingtypes.BlockHashToHeight, latestBlock int64) []*pairingtypes.BlockHashToHeight { + // This function will add the requested block hash with the height of the block that will force it to be archive on the following conditions: + // 1. The current extension is archive. + // 2. The user requested a single block hash. + // 3. The archive extension rule is set. + + predicate := func(ext *spectypes.Extension) bool { return ext.Name == extensionslib.ExtensionTypeArchive } + isCurrentlyArchiveExtension := lavaslices.ContainsPredicate(extensions, predicate) + requestedBlocksHashes := chainMessage.GetRequestedBlocksHashes() + isUserRequestedSingleBlocksHashes := len(requestedBlocksHashes) == 1 + archiveExtension := rpccs.chainParser.ExtensionsParser().GetExtensionByName(extensionslib.ExtensionTypeArchive) + + if isCurrentlyArchiveExtension && isUserRequestedSingleBlocksHashes && archiveExtension != nil && archiveExtension.Rule != nil { + ruleBlock := int64(archiveExtension.Rule.Block) + if ruleBlock >= 0 { + height := latestBlock - ruleBlock - 1 + if height < 0 { + height = 0 + } + blockHashesToHeights = append(blockHashesToHeights, &pairingtypes.BlockHashToHeight{ + Hash: requestedBlocksHashes[0], + Height: height, + }) + } + } + + return blockHashesToHeights +} + +func (rpccs *RPCConsumerServer) newBlocksHashesToHeightsSliceFromRequestedBlockHashes(requestedBlockHashes []string) []*pairingtypes.BlockHashToHeight { + var blocksHashesToHeights []*pairingtypes.BlockHashToHeight + for _, blockHash := range requestedBlockHashes { + blocksHashesToHeights = append(blocksHashesToHeights, &pairingtypes.BlockHashToHeight{Hash: blockHash, Height: spectypes.NOT_APPLICABLE}) + } + return blocksHashesToHeights +} + +func (rpccs *RPCConsumerServer) newBlocksHashesToHeightsSliceFromFinalizationConsensus(finalizedBlockHashes map[int64]string) []*pairingtypes.BlockHashToHeight { + var blocksHashesToHeights []*pairingtypes.BlockHashToHeight + for height, blockHash := range finalizedBlockHashes { + blocksHashesToHeights = append(blocksHashesToHeights, &pairingtypes.BlockHashToHeight{Hash: blockHash, Height: height}) + } + return blocksHashesToHeights +} + +func (rpccs *RPCConsumerServer) resolveRequestedBlockAndUpdateExtensionIfNeeded(reqBlock int64, relayRequestData *pairingtypes.RelayPrivateData, latestBlockHashRequested, earliestBlockHashRequested int64, addon string, relayProcessor *RelayProcessor, chainMessage chainlib.ChainMessage) int64 { + if reqBlock == spectypes.LATEST_BLOCK && relayRequestData.SeenBlock != 0 { + // make optimizer select a provider that is likely to have the latest seen block + reqBlock = relayRequestData.SeenBlock + } + // If latestBlockHashRequested provides more info on the relay we can set it as reqBlock + if latestBlockHashRequested >= 0 && (reqBlock == spectypes.LATEST_BLOCK || reqBlock < latestBlockHashRequested) { + reqBlock = latestBlockHashRequested + } + + if earliestBlockHashRequested >= 0 { + // change earliest requested block if applicable + success := chainMessage.UpdateEarliestInMessage(earliestBlockHashRequested) + if success { + rulePassed := rpccs.chainParser.ExtensionsParser().ExtensionParsing(addon, chainMessage, uint64(relayRequestData.SeenBlock)) + if rulePassed { + // handle extension changes + relayProcessor.HandleExtensionChangesIfNecessary() + } + } + } + return reqBlock +} + +func (rpccs *RPCConsumerServer) getEarliestBlockHashRequestedFromCacheReply(cacheReply *pairingtypes.CacheRelayReply) (int64, int64) { + blocksHashesToHeights := cacheReply.GetBlocksHashesToHeights() + earliestRequestedBlock := spectypes.NOT_APPLICABLE + latestRequestedBlock := spectypes.NOT_APPLICABLE + + for _, blockHashToHeight := range blocksHashesToHeights { + if blockHashToHeight.Height >= 0 && (earliestRequestedBlock == spectypes.NOT_APPLICABLE || blockHashToHeight.Height < earliestRequestedBlock) { + earliestRequestedBlock = blockHashToHeight.Height + } + if blockHashToHeight.Height >= 0 && (latestRequestedBlock == spectypes.NOT_APPLICABLE || blockHashToHeight.Height > latestRequestedBlock) { + latestRequestedBlock = blockHashToHeight.Height + } + } + return latestRequestedBlock, earliestRequestedBlock +} + func (rpccs *RPCConsumerServer) relayInner(ctx context.Context, singleConsumerSession *lavasession.SingleConsumerSession, relayResult *common.RelayResult, relayTimeout time.Duration, chainMessage chainlib.ChainMessage, consumerToken string, analytics *metrics.RelayMetrics) (relayLatency time.Duration, err error, needsBackoff bool) { existingSessionLatestBlock := singleConsumerSession.LatestBlock // we read it now because singleConsumerSession is locked, and later it's not endpointClient := singleConsumerSession.EndpointConnection.Client @@ -1187,10 +1344,10 @@ func (rpccs *RPCConsumerServer) getFirstSubscriptionReply(ctx context.Context, h return &reply, nil } -func (rpccs *RPCConsumerServer) sendDataReliabilityRelayIfApplicable(ctx context.Context, dappID string, consumerIp string, chainMessage chainlib.ChainMessage, dataReliabilityThreshold uint32, relayProcessor *RelayProcessor) error { +func (rpccs *RPCConsumerServer) sendDataReliabilityRelayIfApplicable(ctx context.Context, dappID string, consumerIp string, chainMessage chainlib.ChainMessage, dataReliabilityThreshold uint32, relayProcessor *RelayProcessor, directiveHeaders map[string]string) error { processingTimeout, expectedRelayTimeout := rpccs.getProcessingTimeout(chainMessage) // Wait another relayTimeout duration to maybe get additional relay results - if relayProcessor.usedProviders.CurrentlyUsed() > 0 { + if relayProcessor.GetNumberOfAllCurrentlyUsedProviders() > 0 { time.Sleep(expectedRelayTimeout) } @@ -1228,7 +1385,7 @@ func (rpccs *RPCConsumerServer) sendDataReliabilityRelayIfApplicable(ctx context relayResult := results[0] if len(results) < 2 { relayRequestData := lavaprotocol.NewRelayData(ctx, relayResult.Request.RelayData.ConnectionType, relayResult.Request.RelayData.ApiUrl, relayResult.Request.RelayData.Data, relayResult.Request.RelayData.SeenBlock, reqBlock, relayResult.Request.RelayData.ApiInterface, chainMessage.GetRPCMessage().GetHeaders(), relayResult.Request.RelayData.Addon, relayResult.Request.RelayData.Extensions) - relayProcessorDataReliability := NewRelayProcessor(ctx, relayProcessor.usedProviders, 1, chainMessage, rpccs.consumerConsistency, dappID, consumerIp, rpccs.debugRelays, rpccs.rpcConsumerLogs, rpccs, rpccs.disableNodeErrorRetry, rpccs.relayRetriesManager) + relayProcessorDataReliability := NewRelayProcessor(ctx, relayProcessor.usedProviders, 1, chainMessage, rpccs.consumerConsistency, dappID, consumerIp, rpccs.debugRelays, rpccs.rpcConsumerLogs, rpccs, rpccs.disableNodeErrorRetry, rpccs.relayRetriesManager, nil, directiveHeaders) err := rpccs.sendRelayToProvider(ctx, chainMessage, relayRequestData, dappID, consumerIp, relayProcessorDataReliability, nil) if err != nil { return utils.LavaFormatWarning("failed data reliability relay to provider", err, utils.LogAttr("relayProcessorDataReliability", relayProcessorDataReliability)) @@ -1327,7 +1484,7 @@ func (rpccs *RPCConsumerServer) HandleDirectiveHeadersForMessage(chainMessage ch chainMessage.SetForceCacheRefresh(ok) } -func (rpccs *RPCConsumerServer) appendHeadersToRelayResult(ctx context.Context, relayResult *common.RelayResult, protocolErrors uint64, relayProcessor *RelayProcessor, directiveHeaders map[string]string) { +func (rpccs *RPCConsumerServer) appendHeadersToRelayResult(ctx context.Context, relayResult *common.RelayResult, protocolErrors uint64, relayProcessor *RelayProcessor, directiveHeaders map[string]string, chainMessage chainlib.ChainMessage) { if relayResult == nil { return } @@ -1382,9 +1539,11 @@ func (rpccs *RPCConsumerServer) appendHeadersToRelayResult(ctx context.Context, relayResult.Reply.Metadata = append(relayResult.Reply.Metadata, extensionMD) } + relayResult.Reply.Metadata = append(relayResult.Reply.Metadata, relayProcessor.GetUserHeaders()...) + _, debugRelays := directiveHeaders[common.LAVA_DEBUG_RELAY] if debugRelays { - erroredProviders := relayProcessor.GetUsedProviders().GetErroredProviders() + erroredProviders := relayProcessor.GetAllErroredProviders() if len(erroredProviders) > 0 { erroredProvidersArray := make([]string, len(erroredProviders)) idx := 0 diff --git a/protocol/statetracker/consumer_state_tracker.go b/protocol/statetracker/consumer_state_tracker.go index 26dc8fd01f..dea0acb1be 100644 --- a/protocol/statetracker/consumer_state_tracker.go +++ b/protocol/statetracker/consumer_state_tracker.go @@ -12,12 +12,18 @@ import ( "github.com/lavanet/lava/v2/protocol/lavasession" "github.com/lavanet/lava/v2/protocol/metrics" updaters "github.com/lavanet/lava/v2/protocol/statetracker/updaters" + specutils "github.com/lavanet/lava/v2/testutil/keeper" "github.com/lavanet/lava/v2/utils" conflicttypes "github.com/lavanet/lava/v2/x/conflict/types" plantypes "github.com/lavanet/lava/v2/x/plans/types" protocoltypes "github.com/lavanet/lava/v2/x/protocol/types" ) +type OfflineSpecOptions struct { + SpecId string + SpecFilePath string +} + type ConsumerTxSenderInf interface { TxSenderConflictDetection(ctx context.Context, finalizationConflict *conflicttypes.FinalizationConflict, responseConflict *conflicttypes.ResponseConflict) error } @@ -118,7 +124,17 @@ func (cst *ConsumerStateTracker) TxConflictDetection(ctx context.Context, finali return err } -func (cst *ConsumerStateTracker) RegisterForSpecUpdates(ctx context.Context, specUpdatable updaters.SpecUpdatable, endpoint lavasession.RPCEndpoint) error { +func (cst *ConsumerStateTracker) RegisterForSpecUpdates(ctx context.Context, specUpdatable updaters.SpecUpdatable, endpoint lavasession.RPCEndpoint, offlineSpecOptions *OfflineSpecOptions) error { + if offlineSpecOptions.SpecFilePath != "" { + parsedOfflineSpec, err := specutils.GetSpecFromPath(offlineSpecOptions.SpecFilePath, offlineSpecOptions.SpecId, nil, nil) + if err != nil { + utils.LavaFormatFatal("failed loading offline spec", err, utils.LogAttr("spec_path", offlineSpecOptions.SpecFilePath), utils.LogAttr("spec_id", offlineSpecOptions.SpecId)) + } + utils.LavaFormatInfo("Loaded offline spec successfully", utils.LogAttr("spec_path", offlineSpecOptions.SpecFilePath), utils.LogAttr("chain_id", parsedOfflineSpec.Index)) + specUpdatable.SetSpec(parsedOfflineSpec) + return nil + } + // register for spec updates sets spec and updates when a spec has been modified specUpdater := updaters.NewSpecUpdater(endpoint.ChainID, cst.stateQuery, cst.EventTracker) specUpdaterRaw := cst.StateTracker.RegisterForUpdates(ctx, specUpdater) diff --git a/testutil/keeper/spec.go b/testutil/keeper/spec.go index 3574f15040..f76965c5a4 100644 --- a/testutil/keeper/spec.go +++ b/testutil/keeper/spec.go @@ -5,7 +5,6 @@ import ( "encoding/json" "fmt" "os" - "strings" "testing" tmdb "github.com/cometbft/cometbft-db" @@ -67,7 +66,20 @@ func specKeeper() (*keeper.Keeper, sdk.Context, error) { return k, ctx, nil } -func GetASpec(specIndex, getToTopMostPath string, ctxArg *sdk.Context, keeper *keeper.Keeper) (specRet spectypes.Spec, err error) { +func decodeProposal(path string) (utils.SpecAddProposalJSON, error) { + proposal := utils.SpecAddProposalJSON{} + contents, err := os.ReadFile(path) + if err != nil { + return proposal, err + } + decoder := json.NewDecoder(bytes.NewReader(contents)) + decoder.DisallowUnknownFields() // This will make the unmarshal fail if there are unused fields + + err = decoder.Decode(&proposal) + return proposal, err +} + +func GetSpecFromPath(path string, specIndex string, ctxArg *sdk.Context, keeper *keeper.Keeper) (specRet spectypes.Spec, err error) { var ctx sdk.Context if keeper == nil || ctxArg == nil { keeper, ctx, err = specKeeper() @@ -77,31 +89,48 @@ func GetASpec(specIndex, getToTopMostPath string, ctxArg *sdk.Context, keeper *k } else { ctx = *ctxArg } - proposalFile := "./cookbook/specs/ibc.json,./cookbook/specs/cosmoswasm.json,./cookbook/specs/tendermint.json,./cookbook/specs/cosmossdk.json,./cookbook/specs/cosmossdk_full.json,./cookbook/specs/ethereum.json,./cookbook/specs/cosmoshub.json,./cookbook/specs/lava.json,./cookbook/specs/osmosis.json,./cookbook/specs/fantom.json,./cookbook/specs/celo.json,./cookbook/specs/optimism.json,./cookbook/specs/arbitrum.json,./cookbook/specs/starknet.json,./cookbook/specs/aptos.json,./cookbook/specs/juno.json,./cookbook/specs/polygon.json,./cookbook/specs/evmos.json,./cookbook/specs/base.json,./cookbook/specs/canto.json,./cookbook/specs/sui.json,./cookbook/specs/solana.json,./cookbook/specs/bsc.json,./cookbook/specs/axelar.json,./cookbook/specs/avalanche.json,./cookbook/specs/fvm.json" - for _, fileName := range strings.Split(proposalFile, ",") { - proposal := utils.SpecAddProposalJSON{} - contents, err := os.ReadFile(getToTopMostPath + fileName) + proposal, err := decodeProposal(path) + if err != nil { + return spectypes.Spec{}, err + } + + for _, spec := range proposal.Proposal.Specs { + keeper.SetSpec(ctx, spec) + if specIndex != spec.Index { + continue + } + fullspec, err := keeper.ExpandSpec(ctx, spec) if err != nil { return spectypes.Spec{}, err } - decoder := json.NewDecoder(bytes.NewReader(contents)) - decoder.DisallowUnknownFields() // This will make the unmarshal fail if there are unused fields + return fullspec, nil + } + return spectypes.Spec{}, fmt.Errorf("spec not found %s", path) +} - if err := decoder.Decode(&proposal); err != nil { +func GetASpec(specIndex, getToTopMostPath string, ctxArg *sdk.Context, keeper *keeper.Keeper) (specRet spectypes.Spec, err error) { + var ctx sdk.Context + if keeper == nil || ctxArg == nil { + keeper, ctx, err = specKeeper() + if err != nil { return spectypes.Spec{}, err } - - for _, spec := range proposal.Proposal.Specs { - keeper.SetSpec(ctx, spec) - if specIndex != spec.Index { - continue - } - fullspec, err := keeper.ExpandSpec(ctx, spec) - if err != nil { - return spectypes.Spec{}, err - } - return fullspec, nil + } else { + ctx = *ctxArg + } + proposalDirectory := "cookbook/specs/" + proposalFiles := []string{ + "ibc.json", "cosmoswasm.json", "tendermint.json", "cosmossdk.json", "cosmossdk_full.json", + "ethereum.json", "cosmoshub.json", "lava.json", "osmosis.json", "fantom.json", "celo.json", + "optimism.json", "arbitrum.json", "starknet.json", "aptos.json", "juno.json", "polygon.json", + "evmos.json", "base.json", "canto.json", "sui.json", "solana.json", "bsc.json", "axelar.json", + "avalanche.json", "fvm.json", "near.json", + } + for _, fileName := range proposalFiles { + spec, err := GetSpecFromPath(getToTopMostPath+proposalDirectory+fileName, specIndex, &ctx, keeper) + if err == nil { + return spec, nil } } return spectypes.Spec{}, fmt.Errorf("spec not found %s", specIndex) diff --git a/utils/lavaslices/slices.go b/utils/lavaslices/slices.go index 747b5fc3f0..827ac9b167 100644 --- a/utils/lavaslices/slices.go +++ b/utils/lavaslices/slices.go @@ -131,6 +131,15 @@ func Contains[T comparable](slice []T, elem T) bool { return false } +func ContainsPredicate[T comparable](slice []T, predicate func(elem T) bool) bool { + for _, e := range slice { + if predicate(e) { + return true + } + } + return false +} + // Remove removes the first instance (if exists) of elem from the slice, and // returns the new slice and indication if removal took place. func Remove[T comparable](slice []T, elem T) ([]T, bool) { diff --git a/utils/maps/maps.go b/utils/maps/maps.go index 293646c458..64d5040db7 100644 --- a/utils/maps/maps.go +++ b/utils/maps/maps.go @@ -21,6 +21,18 @@ func FindLargestIntValueInMap[K comparable](myMap map[K]int) (K, int) { return maxKey, maxVal } +func FindInMap[K comparable, V any](m map[K]V, predicate func(K, V) bool) (K, V, bool) { + for k, v := range m { + if predicate(k, v) { + return k, v, true + } + } + + var zeroK K + var zeroV V + return zeroK, zeroV, false +} + func StableSortedKeys[T constraints.Ordered, V any](m map[T]V) []T { keys := make([]T, 0, len(m)) for k := range m {