From c5542b23e2f3da290ba735b4150537501b86964c Mon Sep 17 00:00:00 2001 From: Ran Mishael <106548467+ranlavanet@users.noreply.github.com> Date: Sun, 8 Sep 2024 14:04:05 +0200 Subject: [PATCH] feat: PRT - Consumer state machine (#1668) * WIP - new state machine * WIP2 * Did I just finish this redesign in 2 hours? :O * fix v3 issues * fixing statistical tests. * fixing batch update issue and adding unitests * remove spam * commit changes wip * add used providers to relay state machine. * return on should retry false. * fix state machine. * reduce warning level to context done as it now can happen * fix test * merged --- .../policy_all_chains_with_extension.yml | 74 +++++ protocol/lavasession/used_providers.go | 13 + .../consumer_relay_state_machine.go | 254 ++++++++++++++++++ .../consumer_relay_state_machine_test.go | 152 +++++++++++ protocol/rpcconsumer/relay_processor.go | 46 ++-- protocol/rpcconsumer/relay_processor_test.go | 73 +++-- protocol/rpcconsumer/rpcconsumer_server.go | 156 +++-------- .../rpcprovider/provider_state_machine.go | 6 +- 8 files changed, 594 insertions(+), 180 deletions(-) create mode 100644 protocol/rpcconsumer/consumer_relay_state_machine.go create mode 100644 protocol/rpcconsumer/consumer_relay_state_machine_test.go diff --git a/cookbook/projects/policy_all_chains_with_extension.yml b/cookbook/projects/policy_all_chains_with_extension.yml index 25ef52bbd1..491e9bd047 100644 --- a/cookbook/projects/policy_all_chains_with_extension.yml +++ b/cookbook/projects/policy_all_chains_with_extension.yml @@ -22,6 +22,14 @@ Policy: extensions: - "archive" mixed: true + - chain_id: NEART + requirements: + - collection: + api_interface: "jsonrpc" + type: "POST" + extensions: + - "archive" + mixed: true - chain_id: SEP1 requirements: - collection: @@ -56,6 +64,32 @@ Policy: extensions: - "archive" mixed: true + - chain_id: EVMOST + requirements: + - collection: + api_interface: "jsonrpc" + type: "POST" + extensions: + - "archive" + mixed: true + - collection: + api_interface: "rest" + type: "GET" + extensions: + - "archive" + mixed: true + - collection: + api_interface: "grpc" + type: "" + extensions: + - "archive" + mixed: true + - collection: + api_interface: "tendermintrpc" + type: "" + extensions: + - "archive" + mixed: true - chain_id: LAV1 requirements: - collection: @@ -96,4 +130,44 @@ Policy: extensions: - "archive" mixed: true + - chain_id: AXELAR + requirements: + - collection: + api_interface: "rest" + type: "GET" + extensions: + - "archive" + mixed: true + - collection: + api_interface: "grpc" + type: "" + extensions: + - "archive" + mixed: true + - collection: + api_interface: "tendermintrpc" + type: "" + extensions: + - "archive" + mixed: true + - chain_id: AXELART + requirements: + - collection: + api_interface: "rest" + type: "GET" + extensions: + - "archive" + mixed: true + - collection: + api_interface: "grpc" + type: "" + extensions: + - "archive" + mixed: true + - collection: + api_interface: "tendermintrpc" + type: "" + extensions: + - "archive" + mixed: true - chain_id: "*" # allows all other chains without specifying \ No newline at end of file diff --git a/protocol/lavasession/used_providers.go b/protocol/lavasession/used_providers.go index caf3c78d8e..dcf95d951e 100644 --- a/protocol/lavasession/used_providers.go +++ b/protocol/lavasession/used_providers.go @@ -40,6 +40,7 @@ type UsedProviders struct { erroredProviders map[string]struct{} // providers who returned protocol errors (used to debug relays for now) blockOnSyncLoss map[string]struct{} sessionsLatestBatch int + batchNumber int } func (up *UsedProviders) CurrentlyUsed() int { @@ -62,6 +63,16 @@ func (up *UsedProviders) SessionsLatestBatch() int { return up.sessionsLatestBatch } +func (up *UsedProviders) BatchNumber() int { + if up == nil { + utils.LavaFormatError("UsedProviders.BatchNumber is nil, misuse detected", nil) + return 0 + } + up.lock.RLock() + defer up.lock.RUnlock() + return up.batchNumber +} + func (up *UsedProviders) CurrentlyUsedAddresses() []string { if up == nil { utils.LavaFormatError("UsedProviders.CurrentlyUsedAddresses is nil, misuse detected", nil) @@ -149,6 +160,8 @@ func (up *UsedProviders) AddUsed(sessions ConsumerSessionsMap, err error) { up.providers[provider] = struct{}{} up.sessionsLatestBatch++ } + // increase batch number + up.batchNumber++ } up.selecting = false } diff --git a/protocol/rpcconsumer/consumer_relay_state_machine.go b/protocol/rpcconsumer/consumer_relay_state_machine.go new file mode 100644 index 0000000000..83f7d535b0 --- /dev/null +++ b/protocol/rpcconsumer/consumer_relay_state_machine.go @@ -0,0 +1,254 @@ +package rpcconsumer + +import ( + context "context" + "time" + + "github.com/lavanet/lava/v3/protocol/chainlib" + common "github.com/lavanet/lava/v3/protocol/common" + lavasession "github.com/lavanet/lava/v3/protocol/lavasession" + "github.com/lavanet/lava/v3/protocol/metrics" + "github.com/lavanet/lava/v3/utils" +) + +type RelayStateMachine interface { + GetProtocolMessage() chainlib.ProtocolMessage + ShouldRetry(numberOfRetriesLaunched int) bool + GetDebugState() bool + GetRelayTaskChannel() chan RelayStateSendInstructions + UpdateBatch(err error) + GetSelection() Selection + GetUsedProviders() *lavasession.UsedProviders + SetRelayProcessor(relayProcessor *RelayProcessor) +} + +type ConsumerRelaySender interface { + sendRelayToProvider(ctx context.Context, protocolMessage chainlib.ProtocolMessage, relayProcessor *RelayProcessor, analytics *metrics.RelayMetrics) (errRet error) + getProcessingTimeout(chainMessage chainlib.ChainMessage) (processingTimeout time.Duration, relayTimeout time.Duration) + GetChainIdAndApiInterface() (string, string) +} + +type tickerMetricSetterInf interface { + SetRelaySentByNewBatchTickerMetric(chainId string, apiInterface string) +} + +type ConsumerRelayStateMachine struct { + ctx context.Context // same context as user context. + relaySender ConsumerRelaySender + parentRelayProcessor *RelayProcessor + protocolMessage chainlib.ProtocolMessage // only one should make changes to protocol message is ConsumerRelayStateMachine. + analytics *metrics.RelayMetrics // first relay metrics + selection Selection + debugRelays bool + tickerMetricSetter tickerMetricSetterInf + batchUpdate chan error + usedProviders *lavasession.UsedProviders +} + +func NewRelayStateMachine( + ctx context.Context, + usedProviders *lavasession.UsedProviders, + relaySender ConsumerRelaySender, + protocolMessage chainlib.ProtocolMessage, + analytics *metrics.RelayMetrics, + debugRelays bool, + tickerMetricSetter tickerMetricSetterInf, +) RelayStateMachine { + selection := Quorum // select the majority of node responses + if chainlib.GetStateful(protocolMessage) == common.CONSISTENCY_SELECT_ALL_PROVIDERS { + selection = BestResult // select the majority of node successes + } + + return &ConsumerRelayStateMachine{ + ctx: ctx, + usedProviders: usedProviders, + relaySender: relaySender, + protocolMessage: protocolMessage, + analytics: analytics, + selection: selection, + debugRelays: debugRelays, + tickerMetricSetter: tickerMetricSetter, + batchUpdate: make(chan error, MaximumNumberOfTickerRelayRetries), + } +} + +func (crsm *ConsumerRelayStateMachine) SetRelayProcessor(relayProcessor *RelayProcessor) { + crsm.parentRelayProcessor = relayProcessor +} + +func (crsm *ConsumerRelayStateMachine) GetUsedProviders() *lavasession.UsedProviders { + return crsm.usedProviders +} + +func (crsm *ConsumerRelayStateMachine) GetSelection() Selection { + return crsm.selection +} + +func (crsm *ConsumerRelayStateMachine) ShouldRetry(numberOfRetriesLaunched int) bool { + if numberOfRetriesLaunched >= MaximumNumberOfTickerRelayRetries { + return false + } + // best result sends to top 10 providers anyway. + return crsm.selection != BestResult +} + +func (crsm *ConsumerRelayStateMachine) GetDebugState() bool { + return crsm.debugRelays +} + +func (crsm *ConsumerRelayStateMachine) GetProtocolMessage() chainlib.ProtocolMessage { + return crsm.protocolMessage +} + +type RelayStateSendInstructions struct { + protocolMessage chainlib.ProtocolMessage + analytics *metrics.RelayMetrics + err error + done bool +} + +func (rssi *RelayStateSendInstructions) IsDone() bool { + return rssi.done || rssi.err != nil +} + +func (crsm *ConsumerRelayStateMachine) GetRelayTaskChannel() chan RelayStateSendInstructions { + relayTaskChannel := make(chan RelayStateSendInstructions) + go func() { + batchNumber := 0 // Set batch number + // A channel to be notified processing was done, true means we have results and can return + gotResults := make(chan bool, 1) + processingTimeout, relayTimeout := crsm.relaySender.getProcessingTimeout(crsm.GetProtocolMessage()) + if crsm.debugRelays { + utils.LavaFormatDebug("Relay initiated with the following timeout schedule", utils.LogAttr("processingTimeout", processingTimeout), utils.LogAttr("newRelayTimeout", relayTimeout)) + } + // Create the processing timeout prior to entering the method so it wont reset every time + processingCtx, processingCtxCancel := context.WithTimeout(crsm.ctx, processingTimeout) + defer processingCtxCancel() + + readResultsFromProcessor := func() { + // ProcessResults is reading responses while blocking until the conditions are met + crsm.parentRelayProcessor.WaitForResults(processingCtx) + // Decide if we need to resend or not + if crsm.parentRelayProcessor.HasRequiredNodeResults() { + gotResults <- true + } else { + gotResults <- false + } + } + go readResultsFromProcessor() + returnCondition := make(chan error, 1) + // Used for checking whether to return an error to the user or to allow other channels return their result first see detailed description on the switch case below + validateReturnCondition := func(err error) { + currentlyUsedIsEmptyCounter := 0 + for validateNoProvidersAreUsed := 0; validateNoProvidersAreUsed < numberOfTimesToCheckCurrentlyUsedIsEmpty; validateNoProvidersAreUsed++ { + if crsm.usedProviders.CurrentlyUsed() == 0 { + currentlyUsedIsEmptyCounter++ + } + time.Sleep(5 * time.Millisecond) + } + // we failed to send a batch of relays, if there are no active sends we can terminate after validating X amount of times to make sure no racing channels + if currentlyUsedIsEmptyCounter >= numberOfTimesToCheckCurrentlyUsedIsEmpty { + returnCondition <- err + } + } + + // Send First Message, with analytics and without waiting for batch update. + relayTaskChannel <- RelayStateSendInstructions{ + protocolMessage: crsm.GetProtocolMessage(), + analytics: crsm.analytics, + } + + // Initialize parameters + startNewBatchTicker := time.NewTicker(relayTimeout) // Every relay timeout we send a new batch + defer startNewBatchTicker.Stop() + consecutiveBatchErrors := 0 + + // Start the relay state machine + for { + select { + // Getting batch update for either errors sending message or successful batches + case err := <-crsm.batchUpdate: + if err != nil { // Error handling + // Sending a new batch failed (consumer's protocol side), handling the state machine + consecutiveBatchErrors++ // Increase consecutive error counter + if consecutiveBatchErrors > SendRelayAttempts { // If we failed sending a message more than "SendRelayAttempts" time in a row. + if batchNumber == 0 && consecutiveBatchErrors == SendRelayAttempts+1 { // First relay attempt. print on first failure only. + utils.LavaFormatWarning("Failed Sending First Message", err, utils.LogAttr("consecutive errors", consecutiveBatchErrors)) + } + go validateReturnCondition(err) // Check if we have ongoing messages pending return. + } else { + // Failed sending message, but we still want to attempt sending more. + relayTaskChannel <- RelayStateSendInstructions{ + protocolMessage: crsm.GetProtocolMessage(), + } + } + continue + } + // Successfully sent message. + batchNumber++ + // Reset consecutiveBatchErrors + consecutiveBatchErrors = 0 + // Batch number validation, should never happen. + if batchNumber != crsm.usedProviders.BatchNumber() { + // Mismatch, return error + relayTaskChannel <- RelayStateSendInstructions{ + err: utils.LavaFormatError("Batch Number mismatch between state machine and used providers", nil, utils.LogAttr("batchNumber", batchNumber), utils.LogAttr("crsm.parentRelayProcessor.usedProviders.BatchNumber()", crsm.usedProviders.BatchNumber())), + done: true, + } + return + } + case success := <-gotResults: + // If we had a successful result return what we currently have + // Or we are done sending relays, and we have no other relays pending results. + if success { // Check wether we can return the valid results or we need to send another relay + relayTaskChannel <- RelayStateSendInstructions{done: true} + return + } + // If should retry == true, send a new batch. (success == false) + if crsm.ShouldRetry(batchNumber) { + relayTaskChannel <- RelayStateSendInstructions{protocolMessage: crsm.GetProtocolMessage()} + } else { + go validateReturnCondition(nil) + } + go readResultsFromProcessor() + case <-startNewBatchTicker.C: + // Only trigger another batch for non BestResult relays or if we didn't pass the retry limit. + if crsm.ShouldRetry(batchNumber) { + relayTaskChannel <- RelayStateSendInstructions{protocolMessage: crsm.GetProtocolMessage()} + // Add ticker launch metrics + go crsm.tickerMetricSetter.SetRelaySentByNewBatchTickerMetric(crsm.relaySender.GetChainIdAndApiInterface()) + } + case returnErr := <-returnCondition: + // we use this channel because there could be a race condition between us releasing the provider and about to send the return + // to an error happening on another relay processor's routine. this can cause an error that returns to the user + // if we don't release the case, it will cause the success case condition to not be executed + // detailed scenario: + // sending first relay -> waiting -> sending second relay -> getting an error on the second relay (not returning yet) -> + // -> (in parallel) first relay finished, removing from CurrentlyUsed providers -> checking currently used (on second failed relay) -> returning error instead of the successful relay. + // by releasing the case we allow the channel to be chosen again by the successful case. + relayTaskChannel <- RelayStateSendInstructions{err: returnErr, done: true} + return + case <-processingCtx.Done(): + // In case we got a processing timeout we return context deadline exceeded to the user. + userData := crsm.GetProtocolMessage().GetUserData() + utils.LavaFormatWarning("Relay Got processingCtx timeout", nil, + utils.LogAttr("processingTimeout", processingTimeout), + utils.LogAttr("dappId", userData.DappId), + utils.LogAttr("consumerIp", userData.ConsumerIp), + utils.LogAttr("protocolMessage.GetApi().Name", crsm.GetProtocolMessage().GetApi().Name), + utils.LogAttr("GUID", crsm.ctx), + utils.LogAttr("batchNumber", batchNumber), + utils.LogAttr("consecutiveBatchErrors", consecutiveBatchErrors), + ) + // returning the context error + relayTaskChannel <- RelayStateSendInstructions{err: processingCtx.Err(), done: true} + return + } + } + }() + return relayTaskChannel +} + +func (crsm *ConsumerRelayStateMachine) UpdateBatch(err error) { + crsm.batchUpdate <- err +} diff --git a/protocol/rpcconsumer/consumer_relay_state_machine_test.go b/protocol/rpcconsumer/consumer_relay_state_machine_test.go new file mode 100644 index 0000000000..c42d003714 --- /dev/null +++ b/protocol/rpcconsumer/consumer_relay_state_machine_test.go @@ -0,0 +1,152 @@ +package rpcconsumer + +import ( + context "context" + "fmt" + "net/http" + "testing" + "time" + + "github.com/lavanet/lava/v3/protocol/chainlib" + "github.com/lavanet/lava/v3/protocol/chainlib/extensionslib" + lavasession "github.com/lavanet/lava/v3/protocol/lavasession" + "github.com/lavanet/lava/v3/protocol/metrics" + spectypes "github.com/lavanet/lava/v3/x/spec/types" + "github.com/stretchr/testify/require" +) + +type ConsumerRelaySenderMock struct { + retValue error + tickerValue time.Duration +} + +func (crsm *ConsumerRelaySenderMock) sendRelayToProvider(ctx context.Context, protocolMessage chainlib.ProtocolMessage, relayProcessor *RelayProcessor, analytics *metrics.RelayMetrics) (errRet error) { + return crsm.retValue +} + +func (crsm *ConsumerRelaySenderMock) getProcessingTimeout(chainMessage chainlib.ChainMessage) (processingTimeout time.Duration, relayTimeout time.Duration) { + if crsm.tickerValue != 0 { + return time.Second * 50000, crsm.tickerValue + } + return time.Second * 50000, 100 * time.Millisecond +} + +func (crsm *ConsumerRelaySenderMock) GetChainIdAndApiInterface() (string, string) { + return "testUno", "testDos" +} + +func TestConsumerStateMachineHappyFlow(t *testing.T) { + t.Run("happy", func(t *testing.T) { + ctx := context.Background() + serverHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Handle the incoming request and provide the desired response + w.WriteHeader(http.StatusOK) + }) + specId := "LAV1" + chainParser, _, _, closeServer, _, err := chainlib.CreateChainLibMocks(ctx, specId, spectypes.APIInterfaceRest, serverHandler, nil, "../../", nil) + if closeServer != nil { + defer closeServer() + } + require.NoError(t, err) + chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) + require.NoError(t, err) + dappId := "dapp" + consumerIp := "123.11" + protocolMessage := chainlib.NewProtocolMessage(chainMsg, nil, nil, dappId, consumerIp) + consistency := NewConsumerConsistency(specId) + usedProviders := lavasession.NewUsedProviders(nil) + relayProcessor := NewRelayProcessor(ctx, 1, consistency, relayProcessorMetrics, relayProcessorMetrics, relayRetriesManagerInstance, NewRelayStateMachine(ctx, usedProviders, &ConsumerRelaySenderMock{retValue: nil}, protocolMessage, nil, false, relayProcessorMetrics)) + + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) + defer cancel() + canUse := usedProviders.TryLockSelection(ctx) + require.NoError(t, ctx.Err()) + require.Nil(t, canUse) + require.Zero(t, usedProviders.CurrentlyUsed()) + require.Zero(t, usedProviders.SessionsLatestBatch()) + consumerSessionsMap := lavasession.ConsumerSessionsMap{"lava@test": &lavasession.SessionInfo{}, "lava@test2": &lavasession.SessionInfo{}} + + relayTaskChannel := relayProcessor.GetRelayTaskChannel() + taskNumber := 0 + for task := range relayTaskChannel { + switch taskNumber { + case 0: + require.False(t, task.IsDone()) + usedProviders.AddUsed(consumerSessionsMap, nil) + relayProcessor.UpdateBatch(nil) + sendProtocolError(relayProcessor, "lava@test", time.Millisecond*1, fmt.Errorf("bad")) + case 1: + require.False(t, task.IsDone()) + usedProviders.AddUsed(consumerSessionsMap, nil) + relayProcessor.UpdateBatch(nil) + sendNodeError(relayProcessor, "lava2@test", time.Millisecond*1) + case 2: + require.False(t, task.IsDone()) + usedProviders.AddUsed(consumerSessionsMap, nil) + relayProcessor.UpdateBatch(nil) + sendNodeError(relayProcessor, "lava2@test", time.Millisecond*1) + case 3: + require.False(t, task.IsDone()) + usedProviders.AddUsed(consumerSessionsMap, nil) + relayProcessor.UpdateBatch(nil) + sendSuccessResp(relayProcessor, "lava4@test", time.Millisecond*1) + case 4: + require.True(t, task.IsDone()) + require.True(t, relayProcessor.HasRequiredNodeResults()) + returnedResult, err := relayProcessor.ProcessingResult() + require.NoError(t, err) + require.Equal(t, string(returnedResult.Reply.Data), "ok") + require.Equal(t, http.StatusOK, returnedResult.StatusCode) + return // end test. + } + taskNumber++ + } + }) +} + +func TestConsumerStateMachineExhaustRetries(t *testing.T) { + t.Run("retries", func(t *testing.T) { + ctx := context.Background() + serverHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Handle the incoming request and provide the desired response + w.WriteHeader(http.StatusOK) + }) + specId := "LAV1" + chainParser, _, _, closeServer, _, err := chainlib.CreateChainLibMocks(ctx, specId, spectypes.APIInterfaceRest, serverHandler, nil, "../../", nil) + if closeServer != nil { + defer closeServer() + } + require.NoError(t, err) + chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) + require.NoError(t, err) + dappId := "dapp" + consumerIp := "123.11" + protocolMessage := chainlib.NewProtocolMessage(chainMsg, nil, nil, dappId, consumerIp) + consistency := NewConsumerConsistency(specId) + usedProviders := lavasession.NewUsedProviders(nil) + relayProcessor := NewRelayProcessor(ctx, 1, consistency, relayProcessorMetrics, relayProcessorMetrics, relayRetriesManagerInstance, NewRelayStateMachine(ctx, usedProviders, &ConsumerRelaySenderMock{retValue: nil, tickerValue: 100 * time.Second}, protocolMessage, nil, false, relayProcessorMetrics)) + + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) + defer cancel() + canUse := usedProviders.TryLockSelection(ctx) + require.NoError(t, ctx.Err()) + require.Nil(t, canUse) + require.Zero(t, usedProviders.CurrentlyUsed()) + require.Zero(t, usedProviders.SessionsLatestBatch()) + + relayTaskChannel := relayProcessor.GetRelayTaskChannel() + taskNumber := 0 + for task := range relayTaskChannel { + switch taskNumber { + case 0, 1, 2, 3: + require.False(t, task.IsDone()) + relayProcessor.UpdateBatch(fmt.Errorf("failed sending message")) + case 4: + require.True(t, task.IsDone()) + require.Error(t, task.err) + return + } + taskNumber++ + } + }) +} diff --git a/protocol/rpcconsumer/relay_processor.go b/protocol/rpcconsumer/relay_processor.go index 3eb50e49b7..75c408985f 100644 --- a/protocol/rpcconsumer/relay_processor.go +++ b/protocol/rpcconsumer/relay_processor.go @@ -11,7 +11,6 @@ import ( "sync/atomic" sdktypes "github.com/cosmos/cosmos-sdk/types" - "github.com/lavanet/lava/v3/protocol/chainlib" "github.com/lavanet/lava/v3/protocol/common" "github.com/lavanet/lava/v3/protocol/lavaprotocol" "github.com/lavanet/lava/v3/protocol/lavasession" @@ -47,7 +46,6 @@ type RelayProcessor struct { responses chan *relayResponse requiredSuccesses int lock sync.RWMutex - protocolMessage chainlib.ProtocolMessage guid uint64 selection Selection consumerConsistency *ConsumerConsistency @@ -58,41 +56,38 @@ type RelayProcessor struct { chainIdAndApiInterfaceGetter chainIdAndApiInterfaceGetter relayRetriesManager *lavaprotocol.RelayRetriesManager ResultsManager + RelayStateMachine } func NewRelayProcessor( ctx context.Context, - usedProviders *lavasession.UsedProviders, requiredSuccesses int, - protocolMessage chainlib.ProtocolMessage, consumerConsistency *ConsumerConsistency, - debugRelay bool, metricsInf MetricsInterface, chainIdAndApiInterfaceGetter chainIdAndApiInterfaceGetter, relayRetriesManager *lavaprotocol.RelayRetriesManager, + relayStateMachine RelayStateMachine, ) *RelayProcessor { guid, _ := utils.GetUniqueIdentifier(ctx) - selection := Quorum // select the majority of node responses - if chainlib.GetStateful(protocolMessage) == common.CONSISTENCY_SELECT_ALL_PROVIDERS { - selection = BestResult // select the majority of node successes - } if requiredSuccesses <= 0 { utils.LavaFormatFatal("invalid requirement, successes count must be greater than 0", nil, utils.LogAttr("requiredSuccesses", requiredSuccesses)) } - return &RelayProcessor{ - usedProviders: usedProviders, + relayProcessor := &RelayProcessor{ requiredSuccesses: requiredSuccesses, responses: make(chan *relayResponse, MaxCallsPerRelay), // we set it as buffered so it is not blocking ResultsManager: NewResultsManager(guid), - protocolMessage: protocolMessage, guid: guid, - selection: selection, consumerConsistency: consumerConsistency, - debugRelay: debugRelay, + debugRelay: relayStateMachine.GetDebugState(), metricsInf: metricsInf, chainIdAndApiInterfaceGetter: chainIdAndApiInterfaceGetter, relayRetriesManager: relayRetriesManager, + RelayStateMachine: relayStateMachine, + selection: relayStateMachine.GetSelection(), + usedProviders: relayStateMachine.GetUsedProviders(), } + relayProcessor.RelayStateMachine.SetRelayProcessor(relayProcessor) + return relayProcessor } // true if we never got an extension. (default value) @@ -117,13 +112,6 @@ func (rp *RelayProcessor) getSkipDataReliability() bool { return rp.skipDataReliability } -func (rp *RelayProcessor) ShouldRetry(numberOfRetriesLaunched int) bool { - if numberOfRetriesLaunched >= MaximumNumberOfTickerRelayRetries { - return false - } - return rp.selection != BestResult -} - func (rp *RelayProcessor) String() string { if rp == nil { return "" @@ -181,7 +169,7 @@ func (rp *RelayProcessor) checkEndProcessing(responsesCount int) bool { } func (rp *RelayProcessor) getInputMsgInfoHashString() (string, error) { - hash, err := rp.protocolMessage.GetRawRequestHash() + hash, err := rp.RelayStateMachine.GetProtocolMessage().GetRawRequestHash() hashString := "" if err == nil { hashString = string(hash) @@ -213,8 +201,8 @@ func (rp *RelayProcessor) shouldRetryRelay(resultsCount int, hashErr error, node // We failed enough times. we need to add this to our hash map so we don't waste time on it again. chainId, apiInterface := rp.chainIdAndApiInterfaceGetter.GetChainIdAndApiInterface() utils.LavaFormatWarning("Failed to recover retries on node errors, might be an invalid input", nil, - utils.LogAttr("api", rp.protocolMessage.GetApi().Name), - utils.LogAttr("params", rp.protocolMessage.GetRPCMessage().GetParams()), + utils.LogAttr("api", rp.RelayStateMachine.GetProtocolMessage().GetApi().Name), + utils.LogAttr("params", rp.RelayStateMachine.GetProtocolMessage().GetRPCMessage().GetParams()), utils.LogAttr("chainId", chainId), utils.LogAttr("apiInterface", apiInterface), utils.LogAttr("hash", hash), @@ -262,17 +250,17 @@ func (rp *RelayProcessor) HasRequiredNodeResults() bool { } func (rp *RelayProcessor) handleResponse(response *relayResponse) { - nodeError := rp.ResultsManager.SetResponse(response, rp.protocolMessage) + nodeError := rp.ResultsManager.SetResponse(response, rp.RelayStateMachine.GetProtocolMessage()) // send relay error metrics only on non stateful queries, as stateful queries always return X-1/X errors. if nodeError != nil && rp.selection != BestResult { go rp.metricsInf.SetRelayNodeErrorMetric(rp.chainIdAndApiInterfaceGetter.GetChainIdAndApiInterface()) - utils.LavaFormatInfo("Relay received a node error", utils.LogAttr("Error", nodeError), utils.LogAttr("provider", response.relayResult.ProviderInfo), utils.LogAttr("Request", rp.protocolMessage.GetApi().Name)) + utils.LavaFormatInfo("Relay received a node error", utils.LogAttr("Error", nodeError), utils.LogAttr("provider", response.relayResult.ProviderInfo), utils.LogAttr("Request", rp.RelayStateMachine.GetProtocolMessage().GetApi().Name)) } if response != nil && response.relayResult.GetReply().GetLatestBlock() > 0 { // set consumer consistency when possible blockSeen := response.relayResult.GetReply().GetLatestBlock() - rp.consumerConsistency.SetSeenBlock(blockSeen, rp.protocolMessage.GetUserData()) + rp.consumerConsistency.SetSeenBlock(blockSeen, rp.RelayStateMachine.GetProtocolMessage().GetUserData()) } } @@ -305,7 +293,7 @@ func (rp *RelayProcessor) WaitForResults(ctx context.Context) error { return nil } case <-ctx.Done(): - return utils.LavaFormatWarning("cancelled relay processor", nil, utils.LogAttr("total responses", responsesCount)) + return utils.LavaFormatDebug("cancelled relay processor", utils.LogAttr("total responses", responsesCount)) } } } @@ -315,7 +303,7 @@ func (rp *RelayProcessor) responsesQuorum(results []common.RelayResult, quorumSi return nil, errors.New("quorumSize must be greater than zero") } countMap := make(map[string]int) // Map to store the count of each unique result.Reply.Data - deterministic := rp.protocolMessage.GetApi().Category.Deterministic + deterministic := rp.RelayStateMachine.GetProtocolMessage().GetApi().Category.Deterministic var bestQosResult common.RelayResult bestQos := sdktypes.ZeroDec() nilReplies := 0 diff --git a/protocol/rpcconsumer/relay_processor_test.go b/protocol/rpcconsumer/relay_processor_test.go index 8a27219861..7bd4f85151 100644 --- a/protocol/rpcconsumer/relay_processor_test.go +++ b/protocol/rpcconsumer/relay_processor_test.go @@ -24,6 +24,9 @@ func (romm *relayProcessorMetricsMock) SetRelayNodeErrorMetric(chainId string, a func (romm *relayProcessorMetricsMock) SetNodeErrorRecoveredSuccessfullyMetric(chainId string, apiInterface string, attempt string) { } +func (romm *relayProcessorMetricsMock) SetRelaySentByNewBatchTickerMetric(chainId string, apiInterface string) { +} + func (romm *relayProcessorMetricsMock) SetNodeErrorAttemptMetric(chainId string, apiInterface string) { } @@ -109,9 +112,9 @@ func TestRelayProcessorHappyFlow(t *testing.T) { consumerIp := "123.11" protocolMessage := chainlib.NewProtocolMessage(chainMsg, nil, nil, dappId, consumerIp) consistency := NewConsumerConsistency(specId) - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, protocolMessage, consistency, false, relayProcessorMetrics, relayProcessorMetrics, relayRetriesManagerInstance) + usedProviders := lavasession.NewUsedProviders(nil) + relayProcessor := NewRelayProcessor(ctx, 1, consistency, relayProcessorMetrics, relayProcessorMetrics, relayRetriesManagerInstance, NewRelayStateMachine(ctx, usedProviders, &RPCConsumerServer{}, protocolMessage, nil, false, relayProcessorMetrics)) - usedProviders := relayProcessor.GetUsedProviders() ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) defer cancel() canUse := usedProviders.TryLockSelection(ctx) @@ -165,9 +168,9 @@ func TestRelayProcessorNodeErrorRetryFlow(t *testing.T) { chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) protocolMessage := chainlib.NewProtocolMessage(chainMsg, nil, nil, "", "") - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, protocolMessage, nil, false, relayProcessorMetrics, relayProcessorMetrics, relayRetriesManagerInstance) + usedProviders := lavasession.NewUsedProviders(nil) + relayProcessor := NewRelayProcessor(ctx, 1, nil, relayProcessorMetrics, relayProcessorMetrics, relayRetriesManagerInstance, NewRelayStateMachine(ctx, usedProviders, &RPCConsumerServer{}, protocolMessage, nil, false, relayProcessorMetrics)) - usedProviders := relayProcessor.GetUsedProviders() ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) defer cancel() canUse := usedProviders.TryLockSelection(ctx) @@ -208,8 +211,9 @@ func TestRelayProcessorNodeErrorRetryFlow(t *testing.T) { chainMsg, err = chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) protocolMessage = chainlib.NewProtocolMessage(chainMsg, nil, nil, "", "") - relayProcessor = NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, protocolMessage, nil, false, relayProcessorMetrics, relayProcessorMetrics, relayRetriesManagerInstance) - usedProviders = relayProcessor.GetUsedProviders() + usedProviders = lavasession.NewUsedProviders(nil) + relayProcessor = NewRelayProcessor(ctx, 1, nil, relayProcessorMetrics, relayProcessorMetrics, relayRetriesManagerInstance, NewRelayStateMachine(ctx, usedProviders, &RPCConsumerServer{}, protocolMessage, nil, false, relayProcessorMetrics)) + ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*10) defer cancel() canUse = usedProviders.TryLockSelection(ctx) @@ -233,8 +237,9 @@ func TestRelayProcessorNodeErrorRetryFlow(t *testing.T) { chainMsg, err = chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/18", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) protocolMessage = chainlib.NewProtocolMessage(chainMsg, nil, nil, "", "") - relayProcessor = NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, protocolMessage, nil, false, relayProcessorMetrics, relayProcessorMetrics, relayRetriesManagerInstance) - usedProviders = relayProcessor.GetUsedProviders() + usedProviders = lavasession.NewUsedProviders(nil) + relayProcessor = NewRelayProcessor(ctx, 1, nil, relayProcessorMetrics, relayProcessorMetrics, relayRetriesManagerInstance, NewRelayStateMachine(ctx, usedProviders, &RPCConsumerServer{}, protocolMessage, nil, false, relayProcessorMetrics)) + ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*10) defer cancel() canUse = usedProviders.TryLockSelection(ctx) @@ -258,8 +263,8 @@ func TestRelayProcessorNodeErrorRetryFlow(t *testing.T) { chainMsg, err = chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) protocolMessage = chainlib.NewProtocolMessage(chainMsg, nil, nil, "", "") - relayProcessor = NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, protocolMessage, nil, false, relayProcessorMetrics, relayProcessorMetrics, relayRetriesManagerInstance) - usedProviders = relayProcessor.GetUsedProviders() + usedProviders = lavasession.NewUsedProviders(nil) + relayProcessor = NewRelayProcessor(ctx, 1, nil, relayProcessorMetrics, relayProcessorMetrics, relayRetriesManagerInstance, NewRelayStateMachine(ctx, usedProviders, &RPCConsumerServer{}, protocolMessage, nil, false, relayProcessorMetrics)) ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*10) defer cancel() canUse = usedProviders.TryLockSelection(ctx) @@ -309,9 +314,9 @@ func TestRelayProcessorNodeErrorRetryFlow(t *testing.T) { chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) protocolMessage := chainlib.NewProtocolMessage(chainMsg, nil, nil, "", "") - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, protocolMessage, nil, false, relayProcessorMetrics, relayProcessorMetrics, relayRetriesManagerInstance) + usedProviders := lavasession.NewUsedProviders(nil) + relayProcessor := NewRelayProcessor(ctx, 1, nil, relayProcessorMetrics, relayProcessorMetrics, relayRetriesManagerInstance, NewRelayStateMachine(ctx, usedProviders, &RPCConsumerServer{}, protocolMessage, nil, false, relayProcessorMetrics)) - usedProviders := relayProcessor.GetUsedProviders() ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) defer cancel() canUse := usedProviders.TryLockSelection(ctx) @@ -349,9 +354,9 @@ func TestRelayProcessorTimeout(t *testing.T) { chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) protocolMessage := chainlib.NewProtocolMessage(chainMsg, nil, nil, "", "") - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, protocolMessage, nil, false, relayProcessorMetrics, relayProcessorMetrics, relayRetriesManagerInstance) + usedProviders := lavasession.NewUsedProviders(nil) + relayProcessor := NewRelayProcessor(ctx, 1, nil, relayProcessorMetrics, relayProcessorMetrics, relayRetriesManagerInstance, NewRelayStateMachine(ctx, usedProviders, &RPCConsumerServer{}, protocolMessage, nil, false, relayProcessorMetrics)) - usedProviders := relayProcessor.GetUsedProviders() ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) defer cancel() canUse := usedProviders.TryLockSelection(ctx) @@ -402,9 +407,9 @@ func TestRelayProcessorRetry(t *testing.T) { chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) protocolMessage := chainlib.NewProtocolMessage(chainMsg, nil, nil, "", "") - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, protocolMessage, nil, false, relayProcessorMetrics, relayProcessorMetrics, relayRetriesManagerInstance) + usedProviders := lavasession.NewUsedProviders(nil) + relayProcessor := NewRelayProcessor(ctx, 1, nil, relayProcessorMetrics, relayProcessorMetrics, relayRetriesManagerInstance, NewRelayStateMachine(ctx, usedProviders, &RPCConsumerServer{}, protocolMessage, nil, false, relayProcessorMetrics)) - usedProviders := relayProcessor.GetUsedProviders() ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) defer cancel() canUse := usedProviders.TryLockSelection(ctx) @@ -447,9 +452,9 @@ func TestRelayProcessorRetryNodeError(t *testing.T) { chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) protocolMessage := chainlib.NewProtocolMessage(chainMsg, nil, nil, "", "") - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, protocolMessage, nil, false, relayProcessorMetrics, relayProcessorMetrics, relayRetriesManagerInstance) + usedProviders := lavasession.NewUsedProviders(nil) + relayProcessor := NewRelayProcessor(ctx, 1, nil, relayProcessorMetrics, relayProcessorMetrics, relayRetriesManagerInstance, NewRelayStateMachine(ctx, usedProviders, &RPCConsumerServer{}, protocolMessage, nil, false, relayProcessorMetrics)) - usedProviders := relayProcessor.GetUsedProviders() ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) defer cancel() canUse := usedProviders.TryLockSelection(ctx) @@ -493,8 +498,8 @@ func TestRelayProcessorStatefulApi(t *testing.T) { chainMsg, err := chainParser.ParseMsg("/cosmos/tx/v1beta1/txs", []byte("data"), http.MethodPost, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) protocolMessage := chainlib.NewProtocolMessage(chainMsg, nil, nil, "", "") - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, protocolMessage, nil, false, relayProcessorMetrics, relayProcessorMetrics, relayRetriesManagerInstance) - usedProviders := relayProcessor.GetUsedProviders() + usedProviders := lavasession.NewUsedProviders(nil) + relayProcessor := NewRelayProcessor(ctx, 1, nil, relayProcessorMetrics, relayProcessorMetrics, relayRetriesManagerInstance, NewRelayStateMachine(ctx, usedProviders, &RPCConsumerServer{}, protocolMessage, nil, false, relayProcessorMetrics)) ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) defer cancel() canUse := usedProviders.TryLockSelection(ctx) @@ -508,12 +513,21 @@ func TestRelayProcessorStatefulApi(t *testing.T) { go sendNodeError(relayProcessor, "lava2@test", time.Millisecond*20) go sendNodeError(relayProcessor, "lava3@test", time.Millisecond*25) go sendSuccessResp(relayProcessor, "lava4@test", time.Millisecond*100) - ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*200) + ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*300) defer cancel() - err = relayProcessor.WaitForResults(ctx) - require.NoError(t, err) + for i := 0; i < 10; i++ { + err := relayProcessor.WaitForResults(ctx) + require.NoError(t, err) + // Decide if we need to resend or not + if relayProcessor.HasRequiredNodeResults() { + break + } + time.Sleep(5 * time.Millisecond) + } resultsOk := relayProcessor.HasResults() require.True(t, resultsOk) + resultsOk = relayProcessor.HasRequiredNodeResults() + require.True(t, resultsOk) protocolErrors := relayProcessor.ProtocolErrors() require.Equal(t, uint64(1), protocolErrors) returnedResult, err := relayProcessor.ProcessingResult() @@ -539,8 +553,8 @@ func TestRelayProcessorStatefulApiErr(t *testing.T) { chainMsg, err := chainParser.ParseMsg("/cosmos/tx/v1beta1/txs", []byte("data"), http.MethodPost, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) protocolMessage := chainlib.NewProtocolMessage(chainMsg, nil, nil, "", "") - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, protocolMessage, nil, false, relayProcessorMetrics, relayProcessorMetrics, relayRetriesManagerInstance) - usedProviders := relayProcessor.GetUsedProviders() + usedProviders := lavasession.NewUsedProviders(nil) + relayProcessor := NewRelayProcessor(ctx, 1, nil, relayProcessorMetrics, relayProcessorMetrics, relayRetriesManagerInstance, NewRelayStateMachine(ctx, usedProviders, &RPCConsumerServer{}, protocolMessage, nil, false, relayProcessorMetrics)) ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) defer cancel() canUse := usedProviders.TryLockSelection(ctx) @@ -555,8 +569,9 @@ func TestRelayProcessorStatefulApiErr(t *testing.T) { go sendNodeError(relayProcessor, "lava3@test", time.Millisecond*25) ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*50) defer cancel() - err = relayProcessor.WaitForResults(ctx) - require.Error(t, err) + for i := 0; i < 2; i++ { + relayProcessor.WaitForResults(ctx) + } resultsOk := relayProcessor.HasResults() require.True(t, resultsOk) protocolErrors := relayProcessor.ProtocolErrors() @@ -584,8 +599,8 @@ func TestRelayProcessorLatest(t *testing.T) { chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/latest", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) protocolMessage := chainlib.NewProtocolMessage(chainMsg, nil, nil, "", "") - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, protocolMessage, nil, false, relayProcessorMetrics, relayProcessorMetrics, relayRetriesManagerInstance) - usedProviders := relayProcessor.GetUsedProviders() + usedProviders := lavasession.NewUsedProviders(nil) + relayProcessor := NewRelayProcessor(ctx, 1, nil, relayProcessorMetrics, relayProcessorMetrics, relayRetriesManagerInstance, NewRelayStateMachine(ctx, usedProviders, &RPCConsumerServer{}, protocolMessage, nil, false, relayProcessorMetrics)) ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) defer cancel() canUse := usedProviders.TryLockSelection(ctx) diff --git a/protocol/rpcconsumer/rpcconsumer_server.go b/protocol/rpcconsumer/rpcconsumer_server.go index 3dd3cbe0c2..a1f0adfa1d 100644 --- a/protocol/rpcconsumer/rpcconsumer_server.go +++ b/protocol/rpcconsumer/rpcconsumer_server.go @@ -227,7 +227,16 @@ func (rpccs *RPCConsumerServer) craftRelay(ctx context.Context) (ok bool, relay func (rpccs *RPCConsumerServer) sendRelayWithRetries(ctx context.Context, retries int, initialRelays bool, protocolMessage chainlib.ProtocolMessage) (bool, error) { success := false var err error - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, protocolMessage, rpccs.consumerConsistency, rpccs.debugRelays, rpccs.rpcConsumerLogs, rpccs, rpccs.relayRetriesManager) + usedProviders := lavasession.NewUsedProviders(nil) + relayProcessor := NewRelayProcessor( + ctx, + 1, + rpccs.consumerConsistency, + rpccs.rpcConsumerLogs, + rpccs, + rpccs.relayRetriesManager, + NewRelayStateMachine(ctx, usedProviders, rpccs, protocolMessage, nil, rpccs.debugRelays, rpccs.rpcConsumerLogs), + ) usedProvidersResets := 1 for i := 0; i < retries; i++ { // Check if we even have enough providers to communicate with them all. @@ -418,127 +427,28 @@ func (rpccs *RPCConsumerServer) ProcessRelaySend(ctx context.Context, protocolMe // make sure all of the child contexts are cancelled when we exit ctx, cancel := context.WithCancel(ctx) defer cancel() - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(protocolMessage), rpccs.requiredResponses, protocolMessage, rpccs.consumerConsistency, rpccs.debugRelays, rpccs.rpcConsumerLogs, rpccs, rpccs.relayRetriesManager) - var err error - // try sending a relay 3 times. if failed return the error - for retryFirstRelayAttempt := 0; retryFirstRelayAttempt < SendRelayAttempts; retryFirstRelayAttempt++ { - // record the relay analytics only on the first attempt. - if analytics != nil && retryFirstRelayAttempt > 0 { - analytics = nil - } - err = rpccs.sendRelayToProvider(ctx, protocolMessage, relayProcessor, analytics) - - // check if we had an error. if we did, try again. - if err == nil { - break - } - utils.LavaFormatWarning("Failed retryFirstRelayAttempt, will retry.", err, utils.LogAttr("attempt", retryFirstRelayAttempt)) - } - - if err != nil { - return relayProcessor, err - } + usedProviders := lavasession.NewUsedProviders(protocolMessage) + relayProcessor := NewRelayProcessor( + ctx, + rpccs.requiredResponses, + rpccs.consumerConsistency, + rpccs.rpcConsumerLogs, + rpccs, + rpccs.relayRetriesManager, + NewRelayStateMachine(ctx, usedProviders, rpccs, protocolMessage, analytics, rpccs.debugRelays, rpccs.rpcConsumerLogs), + ) - // a channel to be notified processing was done, true means we have results and can return - gotResults := make(chan bool) - processingTimeout, relayTimeout := rpccs.getProcessingTimeout(protocolMessage) - if rpccs.debugRelays { - utils.LavaFormatDebug("Relay initiated with the following timeout schedule", utils.LogAttr("processingTimeout", processingTimeout), utils.LogAttr("newRelayTimeout", relayTimeout)) - } - // create the processing timeout prior to entering the method so it wont reset every time - processingCtx, processingCtxCancel := context.WithTimeout(ctx, processingTimeout) - defer processingCtxCancel() - - readResultsFromProcessor := func() { - // ProcessResults is reading responses while blocking until the conditions are met - relayProcessor.WaitForResults(processingCtx) - // decide if we need to resend or not - if relayProcessor.HasRequiredNodeResults() { - gotResults <- true - } else { - gotResults <- false + relayTaskChannel := relayProcessor.GetRelayTaskChannel() + for task := range relayTaskChannel { + if task.IsDone() { + return relayProcessor, task.err } + err := rpccs.sendRelayToProvider(ctx, task.protocolMessage, relayProcessor, task.analytics) + relayProcessor.UpdateBatch(err) } - go readResultsFromProcessor() - returnCondition := make(chan error) - // used for checking whether to return an error to the user or to allow other channels return their result first see detailed description on the switch case below - validateReturnCondition := func(err error) { - currentlyUsedIsEmptyCounter := 0 - if err != nil { - for validateNoProvidersAreUsed := 0; validateNoProvidersAreUsed < numberOfTimesToCheckCurrentlyUsedIsEmpty; validateNoProvidersAreUsed++ { - if relayProcessor.usedProviders.CurrentlyUsed() == 0 { - currentlyUsedIsEmptyCounter++ - } - time.Sleep(5 * time.Millisecond) - } - // we failed to send a batch of relays, if there are no active sends we can terminate after validating X amount of times to make sure no racing channels - if currentlyUsedIsEmptyCounter >= numberOfTimesToCheckCurrentlyUsedIsEmpty { - returnCondition <- err - } - } - } - // every relay timeout we send a new batch - startNewBatchTicker := time.NewTicker(relayTimeout) - defer startNewBatchTicker.Stop() - numberOfRetriesLaunched := 0 - for { - select { - case success := <-gotResults: - if success { // check wether we can return the valid results or we need to send another relay - return relayProcessor, nil - } - // if we don't need to retry return what we currently have - if !relayProcessor.ShouldRetry(numberOfRetriesLaunched) { - return relayProcessor, nil - } - // otherwise continue sending another relay - err := rpccs.sendRelayToProvider(processingCtx, protocolMessage, relayProcessor, nil) - go validateReturnCondition(err) - go readResultsFromProcessor() - // increase number of retries launched only if we still have pairing available, if we exhausted the list we don't want to break early - // so it will just wait for the entire duration of the relay - if !lavasession.PairingListEmptyError.Is(err) { - numberOfRetriesLaunched++ - } - case <-startNewBatchTicker.C: - // only trigger another batch for non BestResult relays or if we didn't pass the retry limit. - if relayProcessor.ShouldRetry(numberOfRetriesLaunched) { - // limit the number of retries called from the new batch ticker flow. - // if we pass the limit we just wait for the relays we sent to return. - err := rpccs.sendRelayToProvider(processingCtx, protocolMessage, relayProcessor, nil) - go validateReturnCondition(err) - // add ticker launch metrics - go rpccs.rpcConsumerLogs.SetRelaySentByNewBatchTickerMetric(rpccs.GetChainIdAndApiInterface()) - // increase number of retries launched only if we still have pairing available, if we exhausted the list we don't want to break early - // so it will just wait for the entire duration of the relay - if !lavasession.PairingListEmptyError.Is(err) { - numberOfRetriesLaunched++ - } - } - case returnErr := <-returnCondition: - // we use this channel because there could be a race condition between us releasing the provider and about to send the return - // to an error happening on another relay processor's routine. this can cause an error that returns to the user - // if we don't release the case, it will cause the success case condition to not be executed - // detailed scenario: - // sending first relay -> waiting -> sending second relay -> getting an error on the second relay (not returning yet) -> - // -> (in parallel) first relay finished, removing from CurrentlyUsed providers -> checking currently used (on second failed relay) -> returning error instead of the successful relay. - // by releasing the case we allow the channel to be chosen again by the successful case. - return relayProcessor, returnErr - case <-processingCtx.Done(): - // in case we got a processing timeout we return context deadline exceeded to the user. - userData := protocolMessage.GetUserData() - utils.LavaFormatWarning("Relay Got processingCtx timeout", nil, - utils.LogAttr("processingTimeout", processingTimeout), - utils.LogAttr("dappId", userData.DappId), - utils.LogAttr("consumerIp", userData.ConsumerIp), - utils.LogAttr("protocolMessage.GetApi().Name", protocolMessage.GetApi().Name), - utils.LogAttr("GUID", ctx), - utils.LogAttr("relayProcessor", relayProcessor), - ) - return relayProcessor, processingCtx.Err() // returning the context error - } - } + // shouldn't happen. + return relayProcessor, utils.LavaFormatError("ProcessRelaySend channel closed unexpectedly", nil) } func (rpccs *RPCConsumerServer) CreateDappKey(userData common.UserData) string { @@ -1243,7 +1153,15 @@ func (rpccs *RPCConsumerServer) sendDataReliabilityRelayIfApplicable(ctx context userData := protocolMessage.GetUserData() // We create new protocol message from the old one, but with a new instance of relay request data. dataReliabilityProtocolMessage := chainlib.NewProtocolMessage(protocolMessage, nil, relayRequestData, userData.DappId, userData.ConsumerIp) - relayProcessorDataReliability := NewRelayProcessor(ctx, relayProcessor.usedProviders, 1, protocolMessage, rpccs.consumerConsistency, rpccs.debugRelays, rpccs.rpcConsumerLogs, rpccs, rpccs.relayRetriesManager) + relayProcessorDataReliability := NewRelayProcessor( + ctx, + 1, + rpccs.consumerConsistency, + rpccs.rpcConsumerLogs, + rpccs, + rpccs.relayRetriesManager, + NewRelayStateMachine(ctx, relayProcessor.usedProviders, rpccs, dataReliabilityProtocolMessage, nil, rpccs.debugRelays, rpccs.rpcConsumerLogs), + ) err := rpccs.sendRelayToProvider(ctx, dataReliabilityProtocolMessage, relayProcessorDataReliability, nil) if err != nil { return utils.LavaFormatWarning("failed data reliability relay to provider", err, utils.LogAttr("relayProcessorDataReliability", relayProcessorDataReliability)) diff --git a/protocol/rpcprovider/provider_state_machine.go b/protocol/rpcprovider/provider_state_machine.go index ad2ca35e06..9b386c80f3 100644 --- a/protocol/rpcprovider/provider_state_machine.go +++ b/protocol/rpcprovider/provider_state_machine.go @@ -12,17 +12,17 @@ import ( pairingtypes "github.com/lavanet/lava/v3/x/pairing/types" ) -type RelaySender interface { +type ProviderRelaySender interface { SendNodeMsg(ctx context.Context, ch chan interface{}, chainMessage chainlib.ChainMessageForSend, extensions []string) (relayReply *chainlib.RelayReplyWrapper, subscriptionID string, relayReplyServer *rpcclient.ClientSubscription, proxyUrl common.NodeUrl, chainId string, err error) } type ProviderStateMachine struct { relayRetriesManager lavaprotocol.RelayRetriesManagerInf chainId string - relaySender RelaySender + relaySender ProviderRelaySender } -func NewProviderStateMachine(chainId string, relayRetriesManager lavaprotocol.RelayRetriesManagerInf, relaySender RelaySender) *ProviderStateMachine { +func NewProviderStateMachine(chainId string, relayRetriesManager lavaprotocol.RelayRetriesManagerInf, relaySender ProviderRelaySender) *ProviderStateMachine { return &ProviderStateMachine{ relayRetriesManager: relayRetriesManager, chainId: chainId,