diff --git a/protocol/chainlib/protocol_message.go b/protocol/chainlib/protocol_message.go index 0f6777a04f..d2576c2de2 100644 --- a/protocol/chainlib/protocol_message.go +++ b/protocol/chainlib/protocol_message.go @@ -7,6 +7,11 @@ import ( pairingtypes "github.com/lavanet/lava/v2/x/pairing/types" ) +type UserData struct { + ConsumerIp string + DappId string +} + type BaseProtocolMessage struct { ChainMessage directiveHeaders map[string]string diff --git a/protocol/integration/protocol_test.go b/protocol/integration/protocol_test.go index 8fece3cd9f..e53463335a 100644 --- a/protocol/integration/protocol_test.go +++ b/protocol/integration/protocol_test.go @@ -1090,12 +1090,12 @@ func TestSameProviderConflictReport(t *testing.T) { twoProvidersConflictSent := false sameProviderConflictSent := false - wg := sync.WaitGroup{} - wg.Add(1) + numberOfRelays := 10 + reported := make(chan bool, numberOfRelays) txConflictDetectionMock := func(ctx context.Context, finalizationConflict *conflicttypes.FinalizationConflict, responseConflict *conflicttypes.ResponseConflict, conflictHandler common.ConflictHandlerInterface) error { if finalizationConflict == nil { require.FailNow(t, "Finalization conflict should not be nil") - wg.Done() + reported <- true return nil } utils.LavaFormatDebug("@@@@@@@@@@@@@@@ Called conflict mock tx", utils.LogAttr("provider0", finalizationConflict.RelayFinalization_0.RelaySession.Provider), utils.LogAttr("provider0", finalizationConflict.RelayFinalization_1.RelaySession.Provider)) @@ -1114,7 +1114,7 @@ func TestSameProviderConflictReport(t *testing.T) { } twoProvidersConflictSent = true - wg.Done() + reported <- true return nil } mockConsumerStateTracker.SetTxConflictDetectionWrapper(txConflictDetectionMock) @@ -1137,14 +1137,16 @@ func TestSameProviderConflictReport(t *testing.T) { req, err := http.NewRequest(http.MethodPost, "http://"+consumerListenAddress+"/cosmos/tx/v1beta1/txs", nil) require.NoError(t, err) - for i := 0; i < 2; i++ { + for i := 0; i < numberOfRelays; i++ { // Two relays to trigger both same provider conflict and - resp, err := client.Do(req) - require.NoError(t, err) - require.Equal(t, http.StatusOK, resp.StatusCode) + go func() { + resp, err := client.Do(req) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + }() } // conflict calls happen concurrently, therefore we need to wait the call. - wg.Wait() + <-reported require.True(t, sameProviderConflictSent) require.True(t, twoProvidersConflictSent) }) diff --git a/protocol/lavasession/consumer_session_manager.go b/protocol/lavasession/consumer_session_manager.go index 9b806a4d69..397c1301f8 100644 --- a/protocol/lavasession/consumer_session_manager.go +++ b/protocol/lavasession/consumer_session_manager.go @@ -832,6 +832,8 @@ func (csm *ConsumerSessionManager) removeAddressFromValidAddresses(address strin // Blocks a provider making him unavailable for pick this epoch, will also report him as unavailable if reportProvider is set to true. // Validates that the sessionEpoch is equal to cs.currentEpoch otherwise doesn't take effect. func (csm *ConsumerSessionManager) blockProvider(address string, reportProvider bool, sessionEpoch uint64, disconnections uint64, errors uint64, allowSecondChance bool, reconnectCallback func() error, errorsForReport []error) error { + utils.LavaFormatDebug("CSM Blocking provider", utils.LogAttr("address", address), utils.LogAttr("errorsForReport", errorsForReport), utils.LogAttr("allowing_second_chance", allowSecondChance)) + // find Index of the address if sessionEpoch != csm.atomicReadCurrentEpoch() { // we read here atomically so cs.currentEpoch cant change in the middle, so we can save time if epochs mismatch return EpochMismatchError @@ -847,10 +849,10 @@ func (csm *ConsumerSessionManager) blockProvider(address string, reportProvider go func() { <-time.After(retrySecondChanceAfter) // check epoch is still relevant, if not just return - utils.LavaFormatDebug("Running second chance for provider", utils.LogAttr("address", address)) if sessionEpoch != csm.atomicReadCurrentEpoch() { return } + utils.LavaFormatDebug("Running second chance for provider", utils.LogAttr("address", address)) csm.validateAndReturnBlockedProviderToValidAddressesList(address) }() } @@ -967,6 +969,7 @@ func (csm *ConsumerSessionManager) OnSessionFailure(consumerSession *SingleConsu if err != nil { return err } + if !redemptionSession && blockProvider { publicProviderAddress, pairingEpoch := parentConsumerSessionsWithProvider.getPublicLavaAddressAndPairingEpoch() err = csm.blockProvider(publicProviderAddress, reportProvider, pairingEpoch, 0, consecutiveErrors, allowSecondChance, nil, errorsForConsumerSession) diff --git a/protocol/rpcconsumer/relay_processor.go b/protocol/rpcconsumer/relay_processor.go index 7d66aae584..bc671776b8 100644 --- a/protocol/rpcconsumer/relay_processor.go +++ b/protocol/rpcconsumer/relay_processor.go @@ -15,7 +15,6 @@ import ( "github.com/lavanet/lava/v2/protocol/common" "github.com/lavanet/lava/v2/protocol/lavasession" "github.com/lavanet/lava/v2/utils" - spectypes "github.com/lavanet/lava/v2/x/spec/types" ) type Selection int @@ -46,9 +45,6 @@ type RelayProcessor struct { usedProviders *lavasession.UsedProviders responses chan *relayResponse requiredSuccesses int - nodeResponseErrors RelayErrors - protocolResponseErrors RelayErrors - successResults []common.RelayResult lock sync.RWMutex protocolMessage chainlib.ProtocolMessage guid uint64 @@ -60,6 +56,7 @@ type RelayProcessor struct { metricsInf MetricsInterface chainIdAndApiInterfaceGetter chainIdAndApiInterfaceGetter relayRetriesManager *RelayRetriesManager + ResultsManager } func NewRelayProcessor( @@ -85,8 +82,7 @@ func NewRelayProcessor( usedProviders: usedProviders, requiredSuccesses: requiredSuccesses, responses: make(chan *relayResponse, MaxCallsPerRelay), // we set it as buffered so it is not blocking - nodeResponseErrors: RelayErrors{relayErrors: []RelayError{}}, - protocolResponseErrors: RelayErrors{relayErrors: []RelayError{}, onFailureMergeAll: true}, + ResultsManager: NewResultsManager(guid), protocolMessage: protocolMessage, guid: guid, selection: selection, @@ -131,17 +127,13 @@ func (rp *RelayProcessor) String() string { if rp == nil { return "" } - rp.lock.RLock() - nodeErrors := len(rp.nodeResponseErrors.relayErrors) - protocolErrors := len(rp.protocolResponseErrors.relayErrors) - results := len(rp.successResults) - usedProviders := rp.usedProviders - rp.lock.RUnlock() + + usedProviders := rp.GetUsedProviders() currentlyUsedAddresses := usedProviders.CurrentlyUsedAddresses() unwantedAddresses := usedProviders.UnwantedAddresses() - return fmt.Sprintf("relayProcessor {results:%d, nodeErrors:%d, protocolErrors:%d,unwantedAddresses: %s,currentlyUsedAddresses:%s}", - results, nodeErrors, protocolErrors, strings.Join(unwantedAddresses, ";"), strings.Join(currentlyUsedAddresses, ";")) + return fmt.Sprintf("relayProcessor {%s, unwantedAddresses: %s,currentlyUsedAddresses:%s}", + rp.ResultsManager.String(), strings.Join(unwantedAddresses, ";"), strings.Join(currentlyUsedAddresses, ";")) } func (rp *RelayProcessor) GetUsedProviders() *lavasession.UsedProviders { @@ -160,34 +152,7 @@ func (rp *RelayProcessor) NodeResults() []common.RelayResult { return nil } rp.readExistingResponses() - rp.lock.RLock() - defer rp.lock.RUnlock() - return rp.nodeResultsInner() -} - -// only when locked -func (rp *RelayProcessor) nodeResultsInner() []common.RelayResult { - // start with results and add to them node results - nodeResults := rp.successResults - nodeResults = append(nodeResults, rp.nodeErrors()...) - return nodeResults -} - -// only when locked -func (rp *RelayProcessor) nodeErrors() (ret []common.RelayResult) { - for _, relayError := range rp.nodeResponseErrors.relayErrors { - ret = append(ret, relayError.response.relayResult) - } - return ret -} - -func (rp *RelayProcessor) ProtocolErrors() uint64 { - if rp == nil { - return 0 - } - rp.lock.RLock() - defer rp.lock.RUnlock() - return uint64(len(rp.protocolResponseErrors.relayErrors)) + return rp.ResultsManager.NodeResults() } func (rp *RelayProcessor) SetResponse(response *relayResponse) { @@ -200,83 +165,12 @@ func (rp *RelayProcessor) SetResponse(response *relayResponse) { rp.responses <- response } -func (rp *RelayProcessor) setValidResponse(response *relayResponse) { - rp.lock.Lock() - defer rp.lock.Unlock() - - // future relay requests and data reliability requests need to ask for the same specific block height to get consensus on the reply - // we do not modify the chain message data on the consumer, only it's requested block, so we let the provider know it can't put any block height it wants by setting a specific block height - reqBlock, _ := rp.protocolMessage.RequestedBlock() - if reqBlock == spectypes.LATEST_BLOCK { - // TODO: when we turn on dataReliability on latest call UpdateLatest, until then we turn it off always - // modifiedOnLatestReq := rp.chainMessage.UpdateLatestBlockInMessage(response.relayResult.Reply.LatestBlock, false) - // if !modifiedOnLatestReq { - response.relayResult.Finalized = false // shut down data reliability - // } - } - - if response.relayResult.Reply == nil { - utils.LavaFormatError("got to setValidResponse with nil Reply", - response.err, - utils.LogAttr("ProviderInfo", response.relayResult.ProviderInfo), - utils.LogAttr("StatusCode", response.relayResult.StatusCode), - utils.LogAttr("Finalized", response.relayResult.Finalized), - utils.LogAttr("Quorum", response.relayResult.Quorum), - ) - return - } - // no error, update the seen block - blockSeen := response.relayResult.Reply.LatestBlock - // nil safe - userData := rp.protocolMessage.GetUserData() - rp.consumerConsistency.SetSeenBlock(blockSeen, userData) - // on subscribe results, we just append to successful results instead of parsing results because we already have a validation. - if chainlib.IsFunctionTagOfType(rp.protocolMessage, spectypes.FUNCTION_TAG_SUBSCRIBE) { - rp.successResults = append(rp.successResults, response.relayResult) - return - } - - // check response error - foundError, errorMessage := rp.protocolMessage.CheckResponseError(response.relayResult.Reply.Data, response.relayResult.StatusCode) - if foundError { - // this is a node error, meaning we still didn't get a good response. - // we may choose to wait until there will be a response or timeout happens - // if we decide to wait and timeout happens we will take the majority of response messages - err := fmt.Errorf("%s", errorMessage) - rp.nodeResponseErrors.relayErrors = append(rp.nodeResponseErrors.relayErrors, RelayError{err: err, ProviderInfo: response.relayResult.ProviderInfo, response: response}) - // send relay error metrics only on non stateful queries, as stateful queries always return X-1/X errors. - if rp.selection != BestResult { - go rp.metricsInf.SetRelayNodeErrorMetric(rp.chainIdAndApiInterfaceGetter.GetChainIdAndApiInterface()) - utils.LavaFormatInfo("Relay received a node error", utils.LogAttr("Error", err), utils.LogAttr("provider", response.relayResult.ProviderInfo), utils.LogAttr("Request", rp.protocolMessage.GetApi().Name), utils.LogAttr("requested_block", reqBlock)) - } - return - } - rp.successResults = append(rp.successResults, response.relayResult) -} - -func (rp *RelayProcessor) setErrorResponse(response *relayResponse) { - rp.lock.Lock() - defer rp.lock.Unlock() - utils.LavaFormatDebug("could not send relay to provider", utils.Attribute{Key: "GUID", Value: rp.guid}, utils.Attribute{Key: "provider", Value: response.relayResult.ProviderInfo.ProviderAddress}, utils.Attribute{Key: "error", Value: response.err.Error()}) - rp.protocolResponseErrors.relayErrors = append(rp.protocolResponseErrors.relayErrors, RelayError{err: response.err, ProviderInfo: response.relayResult.ProviderInfo, response: response}) -} - func (rp *RelayProcessor) checkEndProcessing(responsesCount int) bool { rp.lock.RLock() defer rp.lock.RUnlock() - resultsCount := len(rp.successResults) - if resultsCount >= rp.requiredSuccesses { - // we have enough successes, we can return + if rp.ResultsManager.RequiredResults(rp.requiredSuccesses, rp.selection) { return true } - if rp.selection == Quorum { - // we need a quorum of all node results - nodeErrors := len(rp.nodeResponseErrors.relayErrors) - if nodeErrors+resultsCount >= rp.requiredSuccesses { - // we have enough node results for our quorum - return true - } - } // check if we got all of the responses if responsesCount >= rp.usedProviders.SessionsLatestBatch() { // no active sessions, and we read all the responses, we can return @@ -285,19 +179,6 @@ func (rp *RelayProcessor) checkEndProcessing(responsesCount int) bool { return false } -// this function defines if we should use the processor to return the result (meaning it has some insight and responses) or just return to the user -func (rp *RelayProcessor) HasResults() bool { - if rp == nil { - return false - } - rp.lock.RLock() - defer rp.lock.RUnlock() - resultsCount := len(rp.successResults) - nodeErrors := len(rp.nodeResponseErrors.relayErrors) - protocolErrors := len(rp.protocolResponseErrors.relayErrors) - return resultsCount+nodeErrors+protocolErrors > 0 -} - func (rp *RelayProcessor) getInputMsgInfoHashString() (string, error) { hash, err := rp.protocolMessage.GetRawRequestHash() hashString := "" @@ -350,7 +231,7 @@ func (rp *RelayProcessor) HasRequiredNodeResults() bool { } rp.lock.RLock() defer rp.lock.RUnlock() - resultsCount := len(rp.successResults) + resultsCount, nodeErrors, _ := rp.GetResults() hash, hashErr := rp.getInputMsgInfoHashString() if resultsCount >= rp.requiredSuccesses { @@ -361,7 +242,6 @@ func (rp *RelayProcessor) HasRequiredNodeResults() bool { // Check if we need to add node errors retry metrics if rp.selection == Quorum { // If nodeErrors length is larger than 0, our retry mechanism was activated. we add our metrics now. - nodeErrors := len(rp.nodeResponseErrors.relayErrors) if nodeErrors > 0 { chainId, apiInterface := rp.chainIdAndApiInterfaceGetter.GetChainIdAndApiInterface() go rp.metricsInf.SetNodeErrorRecoveredSuccessfullyMetric(chainId, apiInterface, strconv.Itoa(nodeErrors)) @@ -371,7 +251,6 @@ func (rp *RelayProcessor) HasRequiredNodeResults() bool { } if rp.selection == Quorum { // We need a quorum of all node results - nodeErrors := len(rp.nodeResponseErrors.relayErrors) if nodeErrors+resultsCount >= rp.requiredSuccesses { // Retry on node error flow: return rp.shouldRetryRelay(resultsCount, hashErr, nodeErrors, hash) @@ -382,13 +261,17 @@ func (rp *RelayProcessor) HasRequiredNodeResults() bool { } func (rp *RelayProcessor) handleResponse(response *relayResponse) { - if response == nil { - return + nodeError := rp.ResultsManager.SetResponse(response, rp.protocolMessage) + // send relay error metrics only on non stateful queries, as stateful queries always return X-1/X errors. + if nodeError != nil && rp.selection != BestResult { + go rp.metricsInf.SetRelayNodeErrorMetric(rp.chainIdAndApiInterfaceGetter.GetChainIdAndApiInterface()) + utils.LavaFormatInfo("Relay received a node error", utils.LogAttr("Error", nodeError), utils.LogAttr("provider", response.relayResult.ProviderInfo), utils.LogAttr("Request", rp.protocolMessage.GetApi().Name)) } - if response.err != nil { - rp.setErrorResponse(response) - } else { - rp.setValidResponse(response) + + if response != nil && response.relayResult.GetReply().GetLatestBlock() > 0 { + // set consumer consistency when possible + blockSeen := response.relayResult.GetReply().GetLatestBlock() + rp.consumerConsistency.SetSeenBlock(blockSeen, rp.protocolMessage.GetUserData()) } } @@ -501,54 +384,58 @@ func (rp *RelayProcessor) ProcessingResult() (returnedResult *common.RelayResult rp.lock.RLock() defer rp.lock.RUnlock() + + successResults, nodeErrors, protocolErrors := rp.GetResultsData() + successResultsCount, nodeErrorCount, protocolErrorCount := len(successResults), len(nodeErrors), len(protocolErrors) // there are enough successes - successResultsCount := len(rp.successResults) if successResultsCount >= rp.requiredSuccesses { - return rp.responsesQuorum(rp.successResults, rp.requiredSuccesses) + return rp.responsesQuorum(successResults, rp.requiredSuccesses) } - nodeResults := rp.nodeResultsInner() - // there are not enough successes, let's check if there are enough node errors if rp.debugRelay { // adding as much debug info as possible. all successful relays, all node errors and all protocol errors utils.LavaFormatDebug("[Processing Result] Debug Relay", utils.LogAttr("rp.requiredSuccesses", rp.requiredSuccesses)) - utils.LavaFormatDebug("[Processing Debug] number of node results", utils.LogAttr("len(rp.successResults)", len(rp.successResults)), utils.LogAttr("len(rp.nodeResponseErrors.relayErrors)", len(rp.nodeResponseErrors.relayErrors)), utils.LogAttr("len(rp.protocolResponseErrors.relayErrors)", len(rp.protocolResponseErrors.relayErrors))) - for idx, result := range rp.successResults { + utils.LavaFormatDebug("[Processing Debug] number of node results", utils.LogAttr("successResultsCount", successResultsCount), utils.LogAttr("nodeErrorCount", nodeErrorCount), utils.LogAttr("protocolErrorCount", protocolErrorCount)) + for idx, result := range successResults { utils.LavaFormatDebug("[Processing Debug] success result", utils.LogAttr("idx", idx), utils.LogAttr("result", result)) } - for idx, result := range rp.nodeResponseErrors.relayErrors { + for idx, result := range nodeErrors { utils.LavaFormatDebug("[Processing Debug] node result", utils.LogAttr("idx", idx), utils.LogAttr("result", result)) } - for idx, result := range rp.protocolResponseErrors.relayErrors { + for idx, result := range protocolErrors { utils.LavaFormatDebug("[Processing Debug] protocol error", utils.LogAttr("idx", idx), utils.LogAttr("result", result)) } } - if len(nodeResults) >= rp.requiredSuccesses { + // there are not enough successes, let's check if there are enough node errors + if successResultsCount+nodeErrorCount >= rp.requiredSuccesses { if rp.selection == Quorum { + nodeResults := make([]common.RelayResult, 0, len(successResults)+len(nodeErrors)) + nodeResults = append(nodeResults, successResults...) + nodeResults = append(nodeResults, nodeErrors...) return rp.responsesQuorum(nodeResults, rp.requiredSuccesses) - } else if rp.selection == BestResult && successResultsCount > len(rp.nodeResponseErrors.relayErrors) { + } else if rp.selection == BestResult && successResultsCount > nodeErrorCount { // we have more than half succeeded, and we are success oriented - return rp.responsesQuorum(rp.successResults, (rp.requiredSuccesses+1)/2) + return rp.responsesQuorum(successResults, (rp.requiredSuccesses+1)/2) } } // we don't have enough for a quorum, prefer a node error on protocol errors - if len(rp.nodeResponseErrors.relayErrors) >= rp.requiredSuccesses { // if we have node errors, we prefer returning them over protocol errors. - nodeErr := rp.nodeResponseErrors.GetBestErrorMessageForUser() + if nodeErrorCount >= rp.requiredSuccesses { // if we have node errors, we prefer returning them over protocol errors. + nodeErr := rp.GetBestNodeErrorMessageForUser() return &nodeErr.response.relayResult, nil } // if we got here we trigger a protocol error returnedResult = &common.RelayResult{StatusCode: http.StatusInternalServerError} - if len(rp.nodeResponseErrors.relayErrors) > 0 { // if we have node errors, we prefer returning them over protocol errors, even if it's just the one - nodeErr := rp.nodeResponseErrors.GetBestErrorMessageForUser() + if nodeErrorCount > 0 { // if we have node errors, we prefer returning them over protocol errors, even if it's just the one + nodeErr := rp.GetBestNodeErrorMessageForUser() processingError = nodeErr.err errorResponse := nodeErr.response if errorResponse != nil { returnedResult = &errorResponse.relayResult } - } else if len(rp.protocolResponseErrors.relayErrors) > 0 { - protocolErr := rp.protocolResponseErrors.GetBestErrorMessageForUser() + } else if protocolErrorCount > 0 { + protocolErr := rp.GetBestProtocolErrorMessageForUser() processingError = protocolErr.err errorResponse := protocolErr.response if errorResponse != nil { diff --git a/protocol/rpcconsumer/results_manager.go b/protocol/rpcconsumer/results_manager.go new file mode 100644 index 0000000000..69f421f070 --- /dev/null +++ b/protocol/rpcconsumer/results_manager.go @@ -0,0 +1,210 @@ +package rpcconsumer + +import ( + "fmt" + "sync" + + "github.com/lavanet/lava/v2/protocol/chainlib" + common "github.com/lavanet/lava/v2/protocol/common" + "github.com/lavanet/lava/v2/utils" + spectypes "github.com/lavanet/lava/v2/x/spec/types" +) + +type ResultsManager interface { + String() string + NodeResults() []common.RelayResult + RequiredResults(requiredSuccesses int, selection Selection) bool + ProtocolErrors() uint64 + HasResults() bool + GetResults() (success int, nodeErrors int, protocolErrors int) + GetResultsData() (successResults []common.RelayResult, nodeErrors []common.RelayResult, protocolErrors []RelayError) + SetResponse(response *relayResponse, protocolMessage chainlib.ProtocolMessage) (nodeError error) + GetBestNodeErrorMessageForUser() RelayError + GetBestProtocolErrorMessageForUser() RelayError + nodeErrors() (ret []common.RelayResult) +} + +type ResultsManagerInst struct { + nodeResponseErrors RelayErrors + protocolResponseErrors RelayErrors + successResults []common.RelayResult + lock sync.RWMutex + guid uint64 +} + +func NewResultsManager(guid uint64) ResultsManager { + return &ResultsManagerInst{ + guid: guid, + nodeResponseErrors: RelayErrors{relayErrors: []RelayError{}}, + protocolResponseErrors: RelayErrors{relayErrors: []RelayError{}, onFailureMergeAll: true}, + } +} + +func (rp *ResultsManagerInst) setErrorResponse(response *relayResponse) { + rp.lock.Lock() + defer rp.lock.Unlock() + utils.LavaFormatDebug("could not send relay to provider", utils.Attribute{Key: "GUID", Value: rp.guid}, utils.Attribute{Key: "provider", Value: response.relayResult.ProviderInfo.ProviderAddress}, utils.Attribute{Key: "error", Value: response.err.Error()}) + rp.protocolResponseErrors.relayErrors = append(rp.protocolResponseErrors.relayErrors, RelayError{err: response.err, ProviderInfo: response.relayResult.ProviderInfo, response: response}) +} + +// only when locked +func (rp *ResultsManagerInst) nodeResultsInner() []common.RelayResult { + // start with results and add to them node results + nodeResults := rp.successResults + nodeResults = append(nodeResults, rp.nodeErrors()...) + return nodeResults +} + +// only when locked +func (rp *ResultsManagerInst) nodeErrors() (ret []common.RelayResult) { + for _, relayError := range rp.nodeResponseErrors.relayErrors { + ret = append(ret, relayError.response.relayResult) + } + return ret +} + +// returns an error if and only if it was a parsed node error +func (rp *ResultsManagerInst) setValidResponse(response *relayResponse, protocolMessage chainlib.ProtocolMessage) (nodeError error) { + rp.lock.Lock() + defer rp.lock.Unlock() + + // future relay requests and data reliability requests need to ask for the same specific block height to get consensus on the reply + // we do not modify the chain message data on the consumer, only it's requested block, so we let the provider know it can't put any block height it wants by setting a specific block height + reqBlock, _ := protocolMessage.RequestedBlock() + if reqBlock == spectypes.LATEST_BLOCK { + // TODO: when we turn on dataReliability on latest call UpdateLatest, until then we turn it off always + // modifiedOnLatestReq := rp.chainMessage.UpdateLatestBlockInMessage(response.relayResult.Reply.LatestBlock, false) + // if !modifiedOnLatestReq { + response.relayResult.Finalized = false // shut down data reliability + // } + } + + if response.relayResult.Reply == nil { + utils.LavaFormatError("got to setValidResponse with nil Reply", + response.err, + utils.LogAttr("ProviderInfo", response.relayResult.ProviderInfo), + utils.LogAttr("StatusCode", response.relayResult.StatusCode), + utils.LogAttr("Finalized", response.relayResult.Finalized), + utils.LogAttr("Quorum", response.relayResult.Quorum), + ) + return nil + } + + // on subscribe results, we just append to successful results instead of parsing results because we already have a validation. + if chainlib.IsFunctionTagOfType(protocolMessage, spectypes.FUNCTION_TAG_SUBSCRIBE) { + rp.successResults = append(rp.successResults, response.relayResult) + return nil + } + + // check response error + foundError, errorMessage := protocolMessage.CheckResponseError(response.relayResult.Reply.Data, response.relayResult.StatusCode) + if foundError { + // this is a node error, meaning we still didn't get a good response. + // we may choose to wait until there will be a response or timeout happens + // if we decide to wait and timeout happens we will take the majority of response messages + err := fmt.Errorf("%s", errorMessage) + rp.nodeResponseErrors.relayErrors = append(rp.nodeResponseErrors.relayErrors, RelayError{err: err, ProviderInfo: response.relayResult.ProviderInfo, response: response}) + return err + } + rp.successResults = append(rp.successResults, response.relayResult) + return nil +} + +func (rm *ResultsManagerInst) GetResults() (success int, nodeErrors int, protocolErrors int) { + rm.lock.RLock() + defer rm.lock.RUnlock() + + nodeErrors = len(rm.nodeResponseErrors.relayErrors) + protocolErrors = len(rm.protocolResponseErrors.relayErrors) + success = len(rm.successResults) + return success, nodeErrors, protocolErrors +} + +func (rm *ResultsManagerInst) String() string { + results, nodeErrors, protocolErrors := rm.GetResults() + return fmt.Sprintf("resultsManager {success %d, nodeErrors:%d, protocolErrors:%d}", results, nodeErrors, protocolErrors) +} + +// this function returns all results that came from a node, meaning success, and node errors +func (rp *ResultsManagerInst) NodeResults() []common.RelayResult { + if rp == nil { + return nil + } + rp.lock.RLock() + defer rp.lock.RUnlock() + return rp.nodeResultsInner() +} + +func (rp *ResultsManagerInst) ProtocolErrors() uint64 { + if rp == nil { + return 0 + } + rp.lock.RLock() + defer rp.lock.RUnlock() + return uint64(len(rp.protocolResponseErrors.relayErrors)) +} + +func (rp *ResultsManagerInst) RequiredResults(requiredSuccesses int, selection Selection) bool { + if rp == nil { + return false + } + rp.lock.RLock() + defer rp.lock.RUnlock() + resultsCount := len(rp.successResults) + if resultsCount >= requiredSuccesses { + // we have enough successes, we can return + return true + } + if selection == Quorum { + // we need a quorum of all node results + nodeErrors := len(rp.nodeResponseErrors.relayErrors) + if nodeErrors+resultsCount >= requiredSuccesses { + // we have enough node results for our quorum + return true + } + } + return false +} + +// this function defines if we should use the manager to return the result (meaning it has some insight and responses) or just return to the user +func (rp *ResultsManagerInst) HasResults() bool { + if rp == nil { + return false + } + rp.lock.RLock() + defer rp.lock.RUnlock() + resultsCount := len(rp.successResults) + nodeErrors := len(rp.nodeResponseErrors.relayErrors) + protocolErrors := len(rp.protocolResponseErrors.relayErrors) + return resultsCount+nodeErrors+protocolErrors > 0 +} + +func (rp *ResultsManagerInst) SetResponse(response *relayResponse, protocolMessage chainlib.ProtocolMessage) (nodeError error) { + if response == nil { + return nil + } + if response.err != nil { + rp.setErrorResponse(response) + } else { + return rp.setValidResponse(response, protocolMessage) + } + return nil +} + +func (rp *ResultsManagerInst) GetResultsData() (successResults []common.RelayResult, nodeErrors []common.RelayResult, protocolErrors []RelayError) { + rp.lock.RLock() + defer rp.lock.RUnlock() + return rp.successResults, rp.nodeErrors(), rp.protocolResponseErrors.relayErrors +} + +func (rp *ResultsManagerInst) GetBestNodeErrorMessageForUser() RelayError { + rp.lock.RLock() + defer rp.lock.RUnlock() + return rp.nodeResponseErrors.GetBestErrorMessageForUser() +} + +func (rp *ResultsManagerInst) GetBestProtocolErrorMessageForUser() RelayError { + rp.lock.RLock() + defer rp.lock.RUnlock() + return rp.protocolResponseErrors.GetBestErrorMessageForUser() +}