From 0c0e4429d9ba5acf6c95d12e3cd130d0a05e6cfc Mon Sep 17 00:00:00 2001 From: Ran Mishael <106548467+ranlavanet@users.noreply.github.com> Date: Mon, 15 Apr 2024 15:11:30 +0200 Subject: [PATCH] fix: PRT-fix context timeout when sending relay retries (#1376) * add metrics to rpcconsumer * fix lint * fix a bug where processing timeout was always provided to relay even though we had less time to return the response. * adding a protection for super low processing timeout left --- protocol/rpcconsumer/rpcconsumer_server.go | 26 ++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/protocol/rpcconsumer/rpcconsumer_server.go b/protocol/rpcconsumer/rpcconsumer_server.go index cd5100e3dd..47f88d1c38 100644 --- a/protocol/rpcconsumer/rpcconsumer_server.go +++ b/protocol/rpcconsumer/rpcconsumer_server.go @@ -344,6 +344,11 @@ func (rpccs *RPCConsumerServer) ProcessRelaySend(ctx context.Context, directiveH } utils.LavaFormatWarning("Failed retryFirstRelayAttempt, will retry.", err, utils.LogAttr("attempt", retryFirstRelayAttempt)) } + + if err != nil { + return relayProcessor, err + } + // a channel to be notified processing was done, true means we have results and can return gotResults := make(chan bool) processingTimeout, relayTimeout := rpccs.getProcessingTimeout(chainMessage) @@ -388,13 +393,13 @@ func (rpccs *RPCConsumerServer) ProcessRelaySend(ctx context.Context, directiveH if success { return relayProcessor, nil } - err := rpccs.sendRelayToProvider(ctx, chainMessage, relayRequestData, dappID, consumerIp, relayProcessor) + err := rpccs.sendRelayToProvider(processingCtx, chainMessage, relayRequestData, dappID, consumerIp, relayProcessor) go validateReturnCondition(err) go readResultsFromProcessor() case <-startNewBatchTicker.C: // only trigger another batch for non BestResult relays if relayProcessor.selection != BestResult { - err := rpccs.sendRelayToProvider(ctx, chainMessage, relayRequestData, dappID, consumerIp, relayProcessor) + err := rpccs.sendRelayToProvider(processingCtx, chainMessage, relayRequestData, dappID, consumerIp, relayProcessor) go validateReturnCondition(err) // add ticker launch metrics go rpccs.rpcConsumerLogs.SetRelaySentByNewBatchTickerMetric(rpccs.getChainIdAndApiInterface()) @@ -597,7 +602,20 @@ func (rpccs *RPCConsumerServer) sendRelayToProvider( // unique per dappId and ip consumerToken := common.GetUniqueToken(dappID, consumerIp) - processingTimeout, relayTimeout := rpccs.getProcessingTimeout(chainMessage) + processingTimeout, expectedRelayTimeoutForQOS := rpccs.getProcessingTimeout(chainMessage) + deadline, ok := ctx.Deadline() + if ok { // we have ctx deadline. we cant go past it. + processingTimeout = time.Until(deadline) + if processingTimeout <= 0 { + // no need to send we are out of time + utils.LavaFormatWarning("Creating context deadline for relay attempt ran out of time, processingTimeout <= 0 ", nil, utils.LogAttr("processingTimeout", processingTimeout), utils.LogAttr("Request data", localRelayRequestData)) + return + } + // to prevent absurdly short context timeout set the shortest timeout to be the expected latency for qos time. + if processingTimeout < expectedRelayTimeoutForQOS { + processingTimeout = expectedRelayTimeoutForQOS + } + } // send relay relayLatency, errResponse, backoff := rpccs.relayInner(goroutineCtx, singleConsumerSession, localRelayResult, processingTimeout, chainMessage, consumerToken) if errResponse != nil { @@ -645,7 +663,7 @@ func (rpccs *RPCConsumerServer) sendRelayToProvider( ) } - errResponse = rpccs.consumerSessionManager.OnSessionDone(singleConsumerSession, latestBlock, chainlib.GetComputeUnits(chainMessage), relayLatency, singleConsumerSession.CalculateExpectedLatency(relayTimeout), expectedBH, numOfProviders, pairingAddressesLen, chainMessage.GetApi().Category.HangingApi) // session done successfully + errResponse = rpccs.consumerSessionManager.OnSessionDone(singleConsumerSession, latestBlock, chainlib.GetComputeUnits(chainMessage), relayLatency, singleConsumerSession.CalculateExpectedLatency(expectedRelayTimeoutForQOS), expectedBH, numOfProviders, pairingAddressesLen, chainMessage.GetApi().Category.HangingApi) // session done successfully if rpccs.cache.CacheActive() && rpcclient.ValidateStatusCodes(localRelayResult.StatusCode, true) == nil { // copy reply data so if it changes it doesn't panic mid async send