Skip to content

Commit

Permalink
added result manager
Browse files Browse the repository at this point in the history
  • Loading branch information
omerlavanet committed Aug 26, 2024
1 parent 3e21543 commit 25fd0f0
Show file tree
Hide file tree
Showing 2 changed files with 249 additions and 152 deletions.
192 changes: 40 additions & 152 deletions protocol/rpcconsumer/relay_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/lavanet/lava/v2/protocol/common"
"github.com/lavanet/lava/v2/protocol/lavasession"
"github.com/lavanet/lava/v2/utils"
spectypes "github.com/lavanet/lava/v2/x/spec/types"
)

const (
Expand Down Expand Up @@ -44,9 +43,6 @@ type RelayProcessor struct {
usedProviders *lavasession.UsedProviders
responses chan *relayResponse
requiredSuccesses int
nodeResponseErrors RelayErrors
protocolResponseErrors RelayErrors
successResults []common.RelayResult
lock sync.RWMutex
protocolMessage chainlib.ProtocolMessage
guid uint64
Expand All @@ -60,6 +56,7 @@ type RelayProcessor struct {
chainIdAndApiInterfaceGetter chainIdAndApiInterfaceGetter
disableRelayRetry bool
relayRetriesManager *RelayRetriesManager
ResultsManager
}

func NewRelayProcessor(
Expand All @@ -86,8 +83,7 @@ func NewRelayProcessor(
usedProviders: usedProviders,
requiredSuccesses: requiredSuccesses,
responses: make(chan *relayResponse, MaxCallsPerRelay), // we set it as buffered so it is not blocking
nodeResponseErrors: RelayErrors{relayErrors: []RelayError{}},
protocolResponseErrors: RelayErrors{relayErrors: []RelayError{}, onFailureMergeAll: true},
ResultsManager: NewResultsManager(guid),
protocolMessage: protocolMessage,
guid: guid,
selection: selection,
Expand Down Expand Up @@ -133,17 +129,13 @@ func (rp *RelayProcessor) String() string {
if rp == nil {
return ""
}
rp.lock.RLock()
nodeErrors := len(rp.nodeResponseErrors.relayErrors)
protocolErrors := len(rp.protocolResponseErrors.relayErrors)
results := len(rp.successResults)
usedProviders := rp.usedProviders
rp.lock.RUnlock()

usedProviders := rp.GetUsedProviders()

currentlyUsedAddresses := usedProviders.CurrentlyUsedAddresses()
unwantedAddresses := usedProviders.UnwantedAddresses()
return fmt.Sprintf("relayProcessor {results:%d, nodeErrors:%d, protocolErrors:%d,unwantedAddresses: %s,currentlyUsedAddresses:%s}",
results, nodeErrors, protocolErrors, strings.Join(unwantedAddresses, ";"), strings.Join(currentlyUsedAddresses, ";"))
return fmt.Sprintf("relayProcessor {%s, unwantedAddresses: %s,currentlyUsedAddresses:%s}",
rp.ResultsManager.String(), strings.Join(unwantedAddresses, ";"), strings.Join(currentlyUsedAddresses, ";"))
}

func (rp *RelayProcessor) GetUsedProviders() *lavasession.UsedProviders {
Expand All @@ -162,34 +154,7 @@ func (rp *RelayProcessor) NodeResults() []common.RelayResult {
return nil
}
rp.readExistingResponses()
rp.lock.RLock()
defer rp.lock.RUnlock()
return rp.nodeResultsInner()
}

// only when locked
func (rp *RelayProcessor) nodeResultsInner() []common.RelayResult {
// start with results and add to them node results
nodeResults := rp.successResults
nodeResults = append(nodeResults, rp.nodeErrors()...)
return nodeResults
}

// only when locked
func (rp *RelayProcessor) nodeErrors() (ret []common.RelayResult) {
for _, relayError := range rp.nodeResponseErrors.relayErrors {
ret = append(ret, relayError.response.relayResult)
}
return ret
}

func (rp *RelayProcessor) ProtocolErrors() uint64 {
if rp == nil {
return 0
}
rp.lock.RLock()
defer rp.lock.RUnlock()
return uint64(len(rp.protocolResponseErrors.relayErrors))
return rp.ResultsManager.NodeResults()
}

func (rp *RelayProcessor) SetResponse(response *relayResponse) {
Expand All @@ -202,82 +167,12 @@ func (rp *RelayProcessor) SetResponse(response *relayResponse) {
rp.responses <- response
}

func (rp *RelayProcessor) setValidResponse(response *relayResponse) {
rp.lock.Lock()
defer rp.lock.Unlock()

// future relay requests and data reliability requests need to ask for the same specific block height to get consensus on the reply
// we do not modify the chain message data on the consumer, only it's requested block, so we let the provider know it can't put any block height it wants by setting a specific block height
reqBlock, _ := rp.protocolMessage.RequestedBlock()
if reqBlock == spectypes.LATEST_BLOCK {
// TODO: when we turn on dataReliability on latest call UpdateLatest, until then we turn it off always
// modifiedOnLatestReq := rp.chainMessage.UpdateLatestBlockInMessage(response.relayResult.Reply.LatestBlock, false)
// if !modifiedOnLatestReq {
response.relayResult.Finalized = false // shut down data reliability
// }
}

if response.relayResult.Reply == nil {
utils.LavaFormatError("got to setValidResponse with nil Reply",
response.err,
utils.LogAttr("ProviderInfo", response.relayResult.ProviderInfo),
utils.LogAttr("StatusCode", response.relayResult.StatusCode),
utils.LogAttr("Finalized", response.relayResult.Finalized),
utils.LogAttr("Quorum", response.relayResult.Quorum),
)
return
}
// no error, update the seen block
blockSeen := response.relayResult.Reply.LatestBlock
// nil safe
rp.consumerConsistency.SetSeenBlock(blockSeen, rp.userData.DappId, rp.userData.ConsumerIp)
// on subscribe results, we just append to successful results instead of parsing results because we already have a validation.
if chainlib.IsFunctionTagOfType(rp.protocolMessage, spectypes.FUNCTION_TAG_SUBSCRIBE) {
rp.successResults = append(rp.successResults, response.relayResult)
return
}

// check response error
foundError, errorMessage := rp.protocolMessage.CheckResponseError(response.relayResult.Reply.Data, response.relayResult.StatusCode)
if foundError {
// this is a node error, meaning we still didn't get a good response.
// we may choose to wait until there will be a response or timeout happens
// if we decide to wait and timeout happens we will take the majority of response messages
err := fmt.Errorf("%s", errorMessage)
rp.nodeResponseErrors.relayErrors = append(rp.nodeResponseErrors.relayErrors, RelayError{err: err, ProviderInfo: response.relayResult.ProviderInfo, response: response})
// send relay error metrics only on non stateful queries, as stateful queries always return X-1/X errors.
if rp.selection != BestResult {
go rp.metricsInf.SetRelayNodeErrorMetric(rp.chainIdAndApiInterfaceGetter.GetChainIdAndApiInterface())
utils.LavaFormatInfo("Relay received a node error", utils.LogAttr("Error", err), utils.LogAttr("provider", response.relayResult.ProviderInfo), utils.LogAttr("Request", rp.protocolMessage.GetApi().Name), utils.LogAttr("requested_block", reqBlock))
}
return
}
rp.successResults = append(rp.successResults, response.relayResult)
}

func (rp *RelayProcessor) setErrorResponse(response *relayResponse) {
rp.lock.Lock()
defer rp.lock.Unlock()
utils.LavaFormatDebug("could not send relay to provider", utils.Attribute{Key: "GUID", Value: rp.guid}, utils.Attribute{Key: "provider", Value: response.relayResult.ProviderInfo.ProviderAddress}, utils.Attribute{Key: "error", Value: response.err.Error()})
rp.protocolResponseErrors.relayErrors = append(rp.protocolResponseErrors.relayErrors, RelayError{err: response.err, ProviderInfo: response.relayResult.ProviderInfo, response: response})
}

func (rp *RelayProcessor) checkEndProcessing(responsesCount int) bool {
rp.lock.RLock()
defer rp.lock.RUnlock()
resultsCount := len(rp.successResults)
if resultsCount >= rp.requiredSuccesses {
// we have enough successes, we can return
if rp.ResultsManager.RequiredResults(rp.requiredSuccesses, rp.selection) {
return true
}
if rp.selection == Quorum {
// we need a quorum of all node results
nodeErrors := len(rp.nodeResponseErrors.relayErrors)
if nodeErrors+resultsCount >= rp.requiredSuccesses {
// we have enough node results for our quorum
return true
}
}
// check if we got all of the responses
if responsesCount >= rp.usedProviders.SessionsLatestBatch() {
// no active sessions, and we read all the responses, we can return
Expand All @@ -286,19 +181,6 @@ func (rp *RelayProcessor) checkEndProcessing(responsesCount int) bool {
return false
}

// this function defines if we should use the processor to return the result (meaning it has some insight and responses) or just return to the user
func (rp *RelayProcessor) HasResults() bool {
if rp == nil {
return false
}
rp.lock.RLock()
defer rp.lock.RUnlock()
resultsCount := len(rp.successResults)
nodeErrors := len(rp.nodeResponseErrors.relayErrors)
protocolErrors := len(rp.protocolResponseErrors.relayErrors)
return resultsCount+nodeErrors+protocolErrors > 0
}

func (rp *RelayProcessor) getInputMsgInfoHashString() (string, error) {
hash, err := rp.protocolMessage.GetRawRequestHash()
hashString := ""
Expand Down Expand Up @@ -350,7 +232,7 @@ func (rp *RelayProcessor) HasRequiredNodeResults() bool {
}
rp.lock.RLock()
defer rp.lock.RUnlock()
resultsCount := len(rp.successResults)
resultsCount, nodeErrors, _ := rp.GetResults()

hash, hashErr := rp.getInputMsgInfoHashString()
if resultsCount >= rp.requiredSuccesses {
Expand All @@ -361,7 +243,6 @@ func (rp *RelayProcessor) HasRequiredNodeResults() bool {
// Check if we need to add node errors retry metrics
if rp.selection == Quorum {
// If nodeErrors length is larger than 0, our retry mechanism was activated. we add our metrics now.
nodeErrors := len(rp.nodeResponseErrors.relayErrors)
if nodeErrors > 0 {
chainId, apiInterface := rp.chainIdAndApiInterfaceGetter.GetChainIdAndApiInterface()
go rp.metricsInf.SetNodeErrorRecoveredSuccessfullyMetric(chainId, apiInterface, strconv.Itoa(nodeErrors))
Expand All @@ -371,7 +252,6 @@ func (rp *RelayProcessor) HasRequiredNodeResults() bool {
}
if rp.selection == Quorum {
// We need a quorum of all node results
nodeErrors := len(rp.nodeResponseErrors.relayErrors)
if nodeErrors+resultsCount >= rp.requiredSuccesses {
// Retry on node error flow:
return rp.shouldRetryRelay(resultsCount, hashErr, nodeErrors, hash)
Expand All @@ -382,13 +262,17 @@ func (rp *RelayProcessor) HasRequiredNodeResults() bool {
}

func (rp *RelayProcessor) handleResponse(response *relayResponse) {
if response == nil {
return
nodeError := rp.ResultsManager.SetResponse(response, rp.protocolMessage)
// send relay error metrics only on non stateful queries, as stateful queries always return X-1/X errors.
if nodeError != nil && rp.selection != BestResult {
go rp.metricsInf.SetRelayNodeErrorMetric(rp.chainIdAndApiInterfaceGetter.GetChainIdAndApiInterface())
utils.LavaFormatInfo("Relay received a node error", utils.LogAttr("Error", nodeError), utils.LogAttr("provider", response.relayResult.ProviderInfo), utils.LogAttr("Request", rp.protocolMessage.GetApi().Name))
}
if response.err != nil {
rp.setErrorResponse(response)
} else {
rp.setValidResponse(response)

if response != nil && response.relayResult.GetReply().GetLatestBlock() > 0 {
// set consumer consistency when possible
blockSeen := response.relayResult.GetReply().GetLatestBlock()
rp.consumerConsistency.SetSeenBlock(blockSeen, rp.userData.DappId, rp.userData.ConsumerIp)
}
}

Expand Down Expand Up @@ -501,54 +385,58 @@ func (rp *RelayProcessor) ProcessingResult() (returnedResult *common.RelayResult

rp.lock.RLock()
defer rp.lock.RUnlock()

successResults, nodeErrors, protocolErrors := rp.GetResultsData()
successResultsCount, nodeErrorCount, protocolErrorCount := len(successResults), len(nodeErrors), len(protocolErrors)
// there are enough successes
successResultsCount := len(rp.successResults)
if successResultsCount >= rp.requiredSuccesses {
return rp.responsesQuorum(rp.successResults, rp.requiredSuccesses)
return rp.responsesQuorum(successResults, rp.requiredSuccesses)
}
nodeResults := rp.nodeResultsInner()
// there are not enough successes, let's check if there are enough node errors

if rp.debugRelay {
// adding as much debug info as possible. all successful relays, all node errors and all protocol errors
utils.LavaFormatDebug("[Processing Result] Debug Relay", utils.LogAttr("rp.requiredSuccesses", rp.requiredSuccesses))
utils.LavaFormatDebug("[Processing Debug] number of node results", utils.LogAttr("len(rp.successResults)", len(rp.successResults)), utils.LogAttr("len(rp.nodeResponseErrors.relayErrors)", len(rp.nodeResponseErrors.relayErrors)), utils.LogAttr("len(rp.protocolResponseErrors.relayErrors)", len(rp.protocolResponseErrors.relayErrors)))
for idx, result := range rp.successResults {
utils.LavaFormatDebug("[Processing Debug] number of node results", utils.LogAttr("successResultsCount", successResultsCount), utils.LogAttr("nodeErrorCount", nodeErrorCount), utils.LogAttr("protocolErrorCount", protocolErrorCount))
for idx, result := range successResults {
utils.LavaFormatDebug("[Processing Debug] success result", utils.LogAttr("idx", idx), utils.LogAttr("result", result))
}
for idx, result := range rp.nodeResponseErrors.relayErrors {
for idx, result := range nodeErrors {
utils.LavaFormatDebug("[Processing Debug] node result", utils.LogAttr("idx", idx), utils.LogAttr("result", result))
}
for idx, result := range rp.protocolResponseErrors.relayErrors {
for idx, result := range protocolErrors {
utils.LavaFormatDebug("[Processing Debug] protocol error", utils.LogAttr("idx", idx), utils.LogAttr("result", result))
}
}

if len(nodeResults) >= rp.requiredSuccesses {
// there are not enough successes, let's check if there are enough node errors
if successResultsCount+nodeErrorCount >= rp.requiredSuccesses {
if rp.selection == Quorum {
nodeResults := make([]common.RelayResult, 0, len(successResults)+len(nodeErrors))
nodeResults = append(nodeResults, successResults...)
nodeResults = append(nodeResults, nodeErrors...)
return rp.responsesQuorum(nodeResults, rp.requiredSuccesses)
} else if rp.selection == BestResult && successResultsCount > len(rp.nodeResponseErrors.relayErrors) {
} else if rp.selection == BestResult && successResultsCount > nodeErrorCount {
// we have more than half succeeded, and we are success oriented
return rp.responsesQuorum(rp.successResults, (rp.requiredSuccesses+1)/2)
return rp.responsesQuorum(successResults, (rp.requiredSuccesses+1)/2)
}
}
// we don't have enough for a quorum, prefer a node error on protocol errors
if len(rp.nodeResponseErrors.relayErrors) >= rp.requiredSuccesses { // if we have node errors, we prefer returning them over protocol errors.
nodeErr := rp.nodeResponseErrors.GetBestErrorMessageForUser()
if nodeErrorCount >= rp.requiredSuccesses { // if we have node errors, we prefer returning them over protocol errors.
nodeErr := rp.GetBestNodeErrorMessageForUser()
return &nodeErr.response.relayResult, nil
}

// if we got here we trigger a protocol error
returnedResult = &common.RelayResult{StatusCode: http.StatusInternalServerError}
if len(rp.nodeResponseErrors.relayErrors) > 0 { // if we have node errors, we prefer returning them over protocol errors, even if it's just the one
nodeErr := rp.nodeResponseErrors.GetBestErrorMessageForUser()
if nodeErrorCount > 0 { // if we have node errors, we prefer returning them over protocol errors, even if it's just the one
nodeErr := rp.GetBestNodeErrorMessageForUser()
processingError = nodeErr.err
errorResponse := nodeErr.response
if errorResponse != nil {
returnedResult = &errorResponse.relayResult
}
} else if len(rp.protocolResponseErrors.relayErrors) > 0 {
protocolErr := rp.protocolResponseErrors.GetBestErrorMessageForUser()
} else if protocolErrorCount > 0 {
protocolErr := rp.GetBestProtocolErrorMessageForUser()
processingError = protocolErr.err
errorResponse := protocolErr.response
if errorResponse != nil {
Expand Down
Loading

0 comments on commit 25fd0f0

Please sign in to comment.