Skip to content

Commit

Permalink
fix: PRT-fix context timeout when sending relay retries (#1376)
Browse files Browse the repository at this point in the history
* add metrics to rpcconsumer

* fix lint

* fix a bug where processing timeout was always provided to relay even though we had less time to return the response.

* adding a protection for super low processing timeout left
  • Loading branch information
ranlavanet authored Apr 15, 2024
1 parent e90127e commit 0c0e442
Showing 1 changed file with 22 additions and 4 deletions.
26 changes: 22 additions & 4 deletions protocol/rpcconsumer/rpcconsumer_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,11 @@ func (rpccs *RPCConsumerServer) ProcessRelaySend(ctx context.Context, directiveH
}
utils.LavaFormatWarning("Failed retryFirstRelayAttempt, will retry.", err, utils.LogAttr("attempt", retryFirstRelayAttempt))
}

if err != nil {
return relayProcessor, err
}

// a channel to be notified processing was done, true means we have results and can return
gotResults := make(chan bool)
processingTimeout, relayTimeout := rpccs.getProcessingTimeout(chainMessage)
Expand Down Expand Up @@ -388,13 +393,13 @@ func (rpccs *RPCConsumerServer) ProcessRelaySend(ctx context.Context, directiveH
if success {
return relayProcessor, nil
}
err := rpccs.sendRelayToProvider(ctx, chainMessage, relayRequestData, dappID, consumerIp, relayProcessor)
err := rpccs.sendRelayToProvider(processingCtx, chainMessage, relayRequestData, dappID, consumerIp, relayProcessor)
go validateReturnCondition(err)
go readResultsFromProcessor()
case <-startNewBatchTicker.C:
// only trigger another batch for non BestResult relays
if relayProcessor.selection != BestResult {
err := rpccs.sendRelayToProvider(ctx, chainMessage, relayRequestData, dappID, consumerIp, relayProcessor)
err := rpccs.sendRelayToProvider(processingCtx, chainMessage, relayRequestData, dappID, consumerIp, relayProcessor)
go validateReturnCondition(err)
// add ticker launch metrics
go rpccs.rpcConsumerLogs.SetRelaySentByNewBatchTickerMetric(rpccs.getChainIdAndApiInterface())
Expand Down Expand Up @@ -597,7 +602,20 @@ func (rpccs *RPCConsumerServer) sendRelayToProvider(

// unique per dappId and ip
consumerToken := common.GetUniqueToken(dappID, consumerIp)
processingTimeout, relayTimeout := rpccs.getProcessingTimeout(chainMessage)
processingTimeout, expectedRelayTimeoutForQOS := rpccs.getProcessingTimeout(chainMessage)
deadline, ok := ctx.Deadline()
if ok { // we have ctx deadline. we cant go past it.
processingTimeout = time.Until(deadline)
if processingTimeout <= 0 {
// no need to send we are out of time
utils.LavaFormatWarning("Creating context deadline for relay attempt ran out of time, processingTimeout <= 0 ", nil, utils.LogAttr("processingTimeout", processingTimeout), utils.LogAttr("Request data", localRelayRequestData))
return
}
// to prevent absurdly short context timeout set the shortest timeout to be the expected latency for qos time.
if processingTimeout < expectedRelayTimeoutForQOS {
processingTimeout = expectedRelayTimeoutForQOS
}
}
// send relay
relayLatency, errResponse, backoff := rpccs.relayInner(goroutineCtx, singleConsumerSession, localRelayResult, processingTimeout, chainMessage, consumerToken)
if errResponse != nil {
Expand Down Expand Up @@ -645,7 +663,7 @@ func (rpccs *RPCConsumerServer) sendRelayToProvider(
)
}

errResponse = rpccs.consumerSessionManager.OnSessionDone(singleConsumerSession, latestBlock, chainlib.GetComputeUnits(chainMessage), relayLatency, singleConsumerSession.CalculateExpectedLatency(relayTimeout), expectedBH, numOfProviders, pairingAddressesLen, chainMessage.GetApi().Category.HangingApi) // session done successfully
errResponse = rpccs.consumerSessionManager.OnSessionDone(singleConsumerSession, latestBlock, chainlib.GetComputeUnits(chainMessage), relayLatency, singleConsumerSession.CalculateExpectedLatency(expectedRelayTimeoutForQOS), expectedBH, numOfProviders, pairingAddressesLen, chainMessage.GetApi().Category.HangingApi) // session done successfully

if rpccs.cache.CacheActive() && rpcclient.ValidateStatusCodes(localRelayResult.StatusCode, true) == nil {
// copy reply data so if it changes it doesn't panic mid async send
Expand Down

0 comments on commit 0c0e442

Please sign in to comment.