Skip to content

Commit

Permalink
PRT-improve-timeouts-on-failed-providers (#982)
Browse files Browse the repository at this point in the history
* fix cache not allowing jsonrpc batches or string ids

* improve timeout handling, grpc client state verifications, added the ability to override timeouts by spec and user

* bump version

* lint

* fix optimizer timeout not affecting enough

* give better default scores to providers

* fixed all issues
  • Loading branch information
omerlavanet authored Nov 20, 2023
1 parent 12f6f8c commit 28c8b54
Show file tree
Hide file tree
Showing 20 changed files with 260 additions and 99 deletions.
3 changes: 2 additions & 1 deletion cookbook/specs/spec_add_ethereum.json
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,8 @@
"subscription": false,
"stateful": 0
},
"extra_compute_units": 0
"extra_compute_units": 0,
"timeout_ms":1000
},
{
"name": "eth_getBalance",
Expand Down
32 changes: 31 additions & 1 deletion ecosystem/lava-sdk/src/common/timeout.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,19 @@
import { BaseChainParser } from "../chainlib/base_chain_parser";
import { BaseChainMessageContainer } from "../chainlib/chain_message";
import {
IsHangingApi,
GetComputeUnits,
} from "../chainlib/chain_message_queries";

export const TimePerCU = 100; // ms
export const MinimumTimePerRelayDelay = 1000; // ms
export const AverageWorldLatency = 300; // ms

export function getTimePerCu(computeUnits: number): number {
return localNodeTimePerCu(computeUnits) + MinimumTimePerRelayDelay;
if (localNodeTimePerCu(computeUnits) < MinimumTimePerRelayDelay) {
return MinimumTimePerRelayDelay;
}
return localNodeTimePerCu(computeUnits);
}

function localNodeTimePerCu(computeUnits: number): number {
Expand All @@ -13,3 +23,23 @@ function localNodeTimePerCu(computeUnits: number): number {
export function baseTimePerCU(computeUnits: number): number {
return computeUnits * TimePerCU;
}

export function GetRelayTimeout(
chainMessage: BaseChainMessageContainer,
chainParser: BaseChainParser,
timeouts: number
): number {
let extraRelayTimeout = 0;
if (IsHangingApi(chainMessage)) {
const chainStats = chainParser.chainBlockStats();
extraRelayTimeout = chainStats.averageBlockTime;
}
let relayTimeAddition = getTimePerCu(GetComputeUnits(chainMessage));
if (chainMessage.getApi().getTimeoutMs() > 0) {
relayTimeAddition = chainMessage.getApi().getTimeoutMs();
}
// Set relay timout, increase it every time we fail a relay on timeout
return (
extraRelayTimeout + (timeouts + 1) * relayTimeAddition + AverageWorldLatency
);
}
14 changes: 9 additions & 5 deletions ecosystem/lava-sdk/src/providerOptimizer/providerOptimizer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@ import { ProviderOptimizer as ProviderOptimizerInterface } from "../lavasession/
import { Logger } from "../logger/logger";
import random from "random";
import gammainc from "@stdlib/math-base-special-gammainc";
import { baseTimePerCU, getTimePerCu } from "../common/timeout";
import {
AverageWorldLatency,
baseTimePerCU,
getTimePerCu,
} from "../common/timeout";
import BigNumber from "bignumber.js";
import { hourInMillis, millisToSeconds, now } from "../util/time";
import { ScoreStore } from "../util/score/decayScore";
Expand Down Expand Up @@ -162,7 +166,7 @@ export class ProviderOptimizer implements ProviderOptimizerInterface {
if (latency > 0) {
let baseLatency = this.baseWorldLatency + baseTimePerCU(cu) / 2;
if (isHangingApi) {
baseLatency += this.averageBlockTime;
baseLatency += this.averageBlockTime / 2;
}
providerData = this.updateProbeEntryLatency(
providerData,
Expand Down Expand Up @@ -440,7 +444,7 @@ export class ProviderOptimizer implements ProviderOptimizerInterface {
requestedBlock: number
): number {
const baseLatency = this.baseWorldLatency + baseTimePerCU(cu) / 2;
const timeoutDuration = getTimePerCu(cu);
const timeoutDuration = AverageWorldLatency + getTimePerCu(cu);

let historicalLatency = 0;
if (providerData.latency.denom === 0) {
Expand Down Expand Up @@ -534,8 +538,8 @@ export class ProviderOptimizer implements ProviderOptimizerInterface {
const time = -1 * INITIAL_DATA_STALENESS * hourInMillis;
data = {
availability: new ScoreStore(0.99, 1, now() + time),
latency: new ScoreStore(2, 1, now() + time),
sync: new ScoreStore(2, 1, now() + time),
latency: new ScoreStore(1, 1, now() + time),
sync: new ScoreStore(1, 1, now() + time),
syncBlock: 0,
};
}
Expand Down
32 changes: 19 additions & 13 deletions ecosystem/lava-sdk/src/rpcconsumer/rpcconsumer_server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@ import {
RelayRequest,
} from "../grpc_web_services/lavanet/lava/pairing/relay_pb";
import SDKErrors from "../sdk/errors";
import { AverageWorldLatency, getTimePerCu } from "../common/timeout";
import {
AverageWorldLatency,
GetRelayTimeout,
getTimePerCu,
} from "../common/timeout";
import { FinalizationConsensus } from "../lavaprotocol/finalization_consensus";
import { BACKOFF_TIME_ON_FAILURE, LATEST_BLOCK } from "../common/common";
import { BaseChainMessageContainer } from "../chainlib/chain_message";
Expand Down Expand Up @@ -113,6 +117,8 @@ export class RPCConsumerServer {
this.consumerSessionManager.getValidAddresses("", []).size,
MaxRelayRetries
);

let timeouts = 0;
for (
let retries = 0;
retries < maxRetriesAsSizeOfValidAddressesList;
Expand All @@ -121,7 +127,8 @@ export class RPCConsumerServer {
const relayResult = await this.sendRelayToProvider(
chainMessage,
relayPrivateData,
unwantedProviders
unwantedProviders,
timeouts
);
if (relayResult instanceof Array) {
// relayResult can be an Array of errors from relaying to multiple providers
Expand All @@ -134,6 +141,9 @@ export class RPCConsumerServer {
} else {
unwantedProviders.add(oneResult.providerAddress);
}
if (oneResult.err == SDKErrors.relayTimeout) {
timeouts++;
}
errors.push(oneResult.err);
}
} else if (relayResult instanceof Error) {
Expand All @@ -156,23 +166,19 @@ export class RPCConsumerServer {
private async sendRelayToProvider(
chainMessage: BaseChainMessageContainer,
relayData: RelayPrivateData,
unwantedProviders: Set<string>
unwantedProviders: Set<string>,
timeouts: number
): Promise<RelayResult | Array<RelayError> | Error> {
if (IsSubscription(chainMessage)) {
return new Error("subscription currently not supported");
}
const chainID = this.rpcEndpoint.chainId;
const lavaChainId = this.lavaChainId;

let extraRelayTimeout = 0;
if (IsHangingApi(chainMessage)) {
const { averageBlockTime } = this.chainParser.chainBlockStats();
extraRelayTimeout = averageBlockTime;
}
const relayTimeout =
extraRelayTimeout +
getTimePerCu(GetComputeUnits(chainMessage)) +
AverageWorldLatency;
const relayTimeout = GetRelayTimeout(
chainMessage,
this.chainParser,
timeouts
);
const consumerSessionsMap = this.consumerSessionManager.getSessions(
GetComputeUnits(chainMessage),
unwantedProviders,
Expand Down
1 change: 1 addition & 0 deletions proto/lavanet/lava/spec/api_collection.proto
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ message Api {
uint64 extra_compute_units = 4;
SpecCategory category = 6 [(gogoproto.nullable) = false];
BlockParser block_parsing = 7 [(gogoproto.nullable) = false];
uint64 timeout_ms = 8;
}

message ParseDirective {
Expand Down
9 changes: 9 additions & 0 deletions protocol/chainlib/chain_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package chainlib

import (
"math"
"time"

"github.com/lavanet/lava/protocol/chainlib/chainproxy/rpcInterfaceMessages"
pairingtypes "github.com/lavanet/lava/x/pairing/types"
Expand All @@ -21,6 +22,14 @@ type baseChainMessageContainer struct {
msg updatableRPCInput
apiCollection *spectypes.ApiCollection
extensions []*spectypes.Extension
timeoutOverride time.Duration
}

func (pm *baseChainMessageContainer) TimeoutOverride(override ...time.Duration) time.Duration {
if len(override) > 0 {
pm.timeoutOverride = override[0]
}
return pm.timeoutOverride
}

func (pm *baseChainMessageContainer) DisableErrorHandling() {
Expand Down
1 change: 1 addition & 0 deletions protocol/chainlib/chainlib.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type ChainMessage interface {
AppendHeader(metadata []pairingtypes.Metadata)
GetExtensions() []*spectypes.Extension
DisableErrorHandling()
TimeoutOverride(...time.Duration) time.Duration
ChainMessageForSend
}

Expand Down
17 changes: 17 additions & 0 deletions protocol/chainlib/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,3 +257,20 @@ func CompareRequestedBlockInBatch(firstRequestedBlock int64, second int64) (late
// both are positive
return returnBigger(firstRequestedBlock, second)
}

func GetRelayTimeout(chainMessage ChainMessage, chainParser ChainParser, timeouts int) time.Duration {
if chainMessage.TimeoutOverride() != 0 {
return chainMessage.TimeoutOverride()
}
// Calculate extra RelayTimeout
extraRelayTimeout := time.Duration(0)
if IsHangingApi(chainMessage) {
_, extraRelayTimeout, _, _ = chainParser.ChainBlockStats()
}
relayTimeAddition := common.GetTimePerCu(GetComputeUnits(chainMessage))
if chainMessage.GetApi().TimeoutMs > 0 {
relayTimeAddition = time.Millisecond * time.Duration(chainMessage.GetApi().TimeoutMs)
}
// Set relay timout, increase it every time we fail a relay on timeout
return extraRelayTimeout + time.Duration(timeouts+1)*relayTimeAddition + common.AverageWorldLatency
}
6 changes: 4 additions & 2 deletions protocol/chainlib/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"io"
"net/http"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -279,8 +280,9 @@ func (apil *GrpcChainListener) Serve(ctx context.Context) {
lis := GetListenerWithRetryGrpc("tcp", apil.endpoint.NetworkAddress)
apiInterface := apil.endpoint.ApiInterface
sendRelayCallback := func(ctx context.Context, method string, reqBody []byte) ([]byte, metadata.MD, error) {
ctx = utils.WithUniqueIdentifier(ctx, utils.GenerateUniqueIdentifier())
msgSeed := apil.logger.GetMessageSeed()
guid := utils.GenerateUniqueIdentifier()
ctx = utils.WithUniqueIdentifier(ctx, guid)
msgSeed := strconv.FormatUint(guid, 10)
metadataValues, _ := metadata.FromIncomingContext(ctx)
startTime := time.Now()
// Extract dappID from grpc header
Expand Down
14 changes: 9 additions & 5 deletions protocol/chainlib/jsonRPC.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,15 +328,16 @@ func (apil *JsonRPCChainListener) Serve(ctx context.Context) {
}

ctx, cancel := context.WithCancel(context.Background())
ctx = utils.WithUniqueIdentifier(ctx, utils.GenerateUniqueIdentifier())
guid := utils.GenerateUniqueIdentifier()
ctx = utils.WithUniqueIdentifier(ctx, guid)
msgSeed = strconv.FormatUint(guid, 10)
defer cancel() // incase there's a problem make sure to cancel the connection
utils.LavaFormatDebug("ws in <<<", utils.Attribute{Key: "seed", Value: msgSeed}, utils.Attribute{Key: "GUID", Value: ctx}, utils.Attribute{Key: "msg", Value: msg}, utils.Attribute{Key: "dappID", Value: dappID})
metricsData := metrics.NewRelayAnalytics(dappID, chainID, apiInterface)
relayResult, err := apil.relaySender.SendRelay(ctx, "", string(msg), http.MethodPost, dappID, websockConn.RemoteAddr().String(), metricsData, nil)
reply := relayResult.GetReply()
replyServer := relayResult.GetReplyServer()
go apil.logger.AddMetricForWebSocket(metricsData, err, websockConn)

if err != nil {
apil.logger.AnalyzeWebSocketErrorAndWriteMessage(websockConn, messageType, err, msgSeed, msg, spectypes.APIInterfaceJsonRPC, time.Since(startTime))
continue
Expand Down Expand Up @@ -390,19 +391,22 @@ func (apil *JsonRPCChainListener) Serve(ctx context.Context) {
startTime := time.Now()
endTx := apil.logger.LogStartTransaction("jsonRpc-http post")
defer endTx()
msgSeed := apil.logger.GetMessageSeed()
dappID := extractDappIDFromFiberContext(fiberCtx)
metricsData := metrics.NewRelayAnalytics(dappID, chainID, apiInterface)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ctx = utils.WithUniqueIdentifier(ctx, utils.GenerateUniqueIdentifier())
guid := utils.GenerateUniqueIdentifier()
ctx = utils.WithUniqueIdentifier(ctx, guid)
msgSeed := strconv.FormatUint(guid, 10)
utils.LavaFormatInfo("in <<<", utils.Attribute{Key: "GUID", Value: ctx}, utils.Attribute{Key: "seed", Value: msgSeed}, utils.Attribute{Key: "msg", Value: fiberCtx.Body()}, utils.Attribute{Key: "dappID", Value: dappID})
if test_mode {
apil.logger.LogTestMode(fiberCtx)
}

consumerIp := fiberCtx.Get(common.IP_FORWARDING_HEADER_NAME, fiberCtx.IP())
relayResult, err := apil.relaySender.SendRelay(ctx, "", string(fiberCtx.Body()), http.MethodPost, dappID, consumerIp, metricsData, nil)
metadataValues := fiberCtx.GetReqHeaders()
headers := convertToMetadataMap(metadataValues)
relayResult, err := apil.relaySender.SendRelay(ctx, "", string(fiberCtx.Body()), http.MethodPost, dappID, consumerIp, metricsData, headers)
reply := relayResult.GetReply()
if relayResult.GetProvider() != "" {
fiberCtx.Set(common.PROVIDER_ADDRESS_HEADER_NAME, relayResult.GetProvider())
Expand Down
9 changes: 8 additions & 1 deletion protocol/chainlib/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,10 @@ func (apil *RestChainListener) Serve(ctx context.Context) {
ctx, cancel := context.WithCancel(context.Background())
ctx = utils.WithUniqueIdentifier(ctx, utils.GenerateUniqueIdentifier())
defer cancel() // incase there's a problem make sure to cancel the connection

guid, found := utils.GetUniqueIdentifier(ctx)
if found {
msgSeed = strconv.FormatUint(guid, 10)
}
// TODO: handle contentType, in case its not application/json currently we set it to application/json in the Send() method
// contentType := string(c.Context().Request.Header.ContentType())
dappID := extractDappIDFromFiberContext(fiberCtx)
Expand Down Expand Up @@ -327,6 +330,10 @@ func (apil *RestChainListener) Serve(ctx context.Context) {
restHeaders := convertToMetadataMap(metadataValues)
ctx, cancel := context.WithCancel(context.Background())
ctx = utils.WithUniqueIdentifier(ctx, utils.GenerateUniqueIdentifier())
guid, found := utils.GetUniqueIdentifier(ctx)
if found {
msgSeed = strconv.FormatUint(guid, 10)
}
defer cancel() // incase there's a problem make sure to cancel the connection
utils.LavaFormatInfo("in <<<", utils.Attribute{Key: "GUID", Value: ctx}, utils.Attribute{Key: "path", Value: path}, utils.Attribute{Key: "dappID", Value: dappID}, utils.Attribute{Key: "msgSeed", Value: msgSeed})
relayResult, err := apil.relaySender.SendRelay(ctx, path+query, "", fiberCtx.Method(), dappID, fiberCtx.Get(common.IP_FORWARDING_HEADER_NAME, fiberCtx.IP()), analytics, restHeaders)
Expand Down
26 changes: 16 additions & 10 deletions protocol/chainlib/tendermintRPC.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,10 +356,11 @@ func (apil *TendermintRpcChainListener) Serve(ctx context.Context) {
}

ctx, cancel := context.WithCancel(context.Background())
ctx = utils.WithUniqueIdentifier(ctx, utils.GenerateUniqueIdentifier())
guid := utils.GenerateUniqueIdentifier()
ctx = utils.WithUniqueIdentifier(ctx, guid)
defer cancel() // incase there's a problem make sure to cancel the connection
utils.LavaFormatInfo("ws in <<<", utils.Attribute{Key: "GUID", Value: ctx}, utils.Attribute{Key: "seed", Value: msgSeed}, utils.Attribute{Key: "msg", Value: msg}, utils.Attribute{Key: "dappID", Value: dappID})

msgSeed = strconv.FormatUint(guid, 10)
metricsData := metrics.NewRelayAnalytics(dappID, chainID, apiInterface)
relayResult, err := apil.relaySender.SendRelay(ctx, "", string(msg), "", dappID, websocketConn.RemoteAddr().String(), metricsData, nil)
reply := relayResult.GetReply()
Expand Down Expand Up @@ -417,15 +418,17 @@ func (apil *TendermintRpcChainListener) Serve(ctx context.Context) {
startTime := time.Now()
endTx := apil.logger.LogStartTransaction("tendermint-WebSocket")
defer endTx()
msgSeed := apil.logger.GetMessageSeed()
dappID := extractDappIDFromFiberContext(fiberCtx)
metricsData := metrics.NewRelayAnalytics(dappID, chainID, apiInterface)
ctx, cancel := context.WithCancel(context.Background())
ctx = utils.WithUniqueIdentifier(ctx, utils.GenerateUniqueIdentifier())
guid := utils.GenerateUniqueIdentifier()
ctx = utils.WithUniqueIdentifier(ctx, guid)
defer cancel() // incase there's a problem make sure to cancel the connection

msgSeed := strconv.FormatUint(guid, 10)
utils.LavaFormatInfo("in <<<", utils.Attribute{Key: "GUID", Value: ctx}, utils.Attribute{Key: "seed", Value: msgSeed}, utils.Attribute{Key: "msg", Value: fiberCtx.Body()}, utils.Attribute{Key: "dappID", Value: dappID})
relayResult, err := apil.relaySender.SendRelay(ctx, "", string(fiberCtx.Body()), "", dappID, fiberCtx.Get(common.IP_FORWARDING_HEADER_NAME, fiberCtx.IP()), metricsData, nil)
metadataValues := fiberCtx.GetReqHeaders()
headers := convertToMetadataMap(metadataValues)
relayResult, err := apil.relaySender.SendRelay(ctx, "", string(fiberCtx.Body()), "", dappID, fiberCtx.Get(common.IP_FORWARDING_HEADER_NAME, fiberCtx.IP()), metricsData, headers)
reply := relayResult.GetReply()
if relayResult.GetProvider() != "" {
fiberCtx.Set(common.PROVIDER_ADDRESS_HEADER_NAME, relayResult.GetProvider())
Expand Down Expand Up @@ -471,13 +474,16 @@ func (apil *TendermintRpcChainListener) Serve(ctx context.Context) {
query := "?" + string(fiberCtx.Request().URI().QueryString())
path := fiberCtx.Params("*")
dappID := extractDappIDFromFiberContext(fiberCtx)
msgSeed := apil.logger.GetMessageSeed()
ctx, cancel := context.WithCancel(context.Background())
ctx = utils.WithUniqueIdentifier(ctx, utils.GenerateUniqueIdentifier())
guid := utils.GenerateUniqueIdentifier()
ctx = utils.WithUniqueIdentifier(ctx, guid)
defer cancel() // incase there's a problem make sure to cancel the connection
utils.LavaFormatInfo("urirpc in <<<", utils.Attribute{Key: "GUID", Value: ctx}, utils.Attribute{Key: "seed", Value: msgSeed}, utils.Attribute{Key: "msg", Value: path}, utils.Attribute{Key: "dappID", Value: dappID})
utils.LavaFormatInfo("urirpc in <<<", utils.Attribute{Key: "GUID", Value: ctx}, utils.Attribute{Key: "msg", Value: path}, utils.Attribute{Key: "dappID", Value: dappID})
metricsData := metrics.NewRelayAnalytics(dappID, chainID, apiInterface)
relayResult, err := apil.relaySender.SendRelay(ctx, path+query, "", "", dappID, fiberCtx.Get(common.IP_FORWARDING_HEADER_NAME, fiberCtx.IP()), metricsData, nil)
metadataValues := fiberCtx.GetReqHeaders()
headers := convertToMetadataMap(metadataValues)
relayResult, err := apil.relaySender.SendRelay(ctx, path+query, "", "", dappID, fiberCtx.Get(common.IP_FORWARDING_HEADER_NAME, fiberCtx.IP()), metricsData, headers)
msgSeed := strconv.FormatUint(guid, 10)
reply := relayResult.GetReply()
go apil.logger.AddMetricForHttp(metricsData, err, fiberCtx.GetReqHeaders())
if relayResult.GetProvider() != "" {
Expand Down
4 changes: 3 additions & 1 deletion protocol/common/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ const (
URL_QUERY_PARAMETERS_SEPARATOR_OTHER_PARAMETERS = "&"
IP_FORWARDING_HEADER_NAME = "X-Forwarded-For"
PROVIDER_ADDRESS_HEADER_NAME = "Lava-Provider-Address"
BLOCK_PROVIDERS_ADDRESSES_HEADER_NAME = "Lava-Providers-Block"
// these headers need to be lowercase
BLOCK_PROVIDERS_ADDRESSES_HEADER_NAME = "lava-providers-block"
RELAY_TIMEOUT_HEADER_NAME = "lava-relay-timeout"
)

type NodeUrl struct {
Expand Down
5 changes: 4 additions & 1 deletion protocol/common/timeout.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ func BaseTimePerCU(cu uint64) time.Duration {
}

func GetTimePerCu(cu uint64) time.Duration {
return LocalNodeTimePerCu(cu) + MinimumTimePerRelayDelay
if LocalNodeTimePerCu(cu) < MinimumTimePerRelayDelay {
return MinimumTimePerRelayDelay
}
return LocalNodeTimePerCu(cu)
}

func ContextOutOfTime(ctx context.Context) bool {
Expand Down
Loading

0 comments on commit 28c8b54

Please sign in to comment.