From 28c8b5459fdaa286d86d7c51e1d2625879583348 Mon Sep 17 00:00:00 2001 From: Omer <100387053+omerlavanet@users.noreply.github.com> Date: Mon, 20 Nov 2023 14:55:58 +0200 Subject: [PATCH] PRT-improve-timeouts-on-failed-providers (#982) * fix cache not allowing jsonrpc batches or string ids * improve timeout handling, grpc client state verifications, added the ability to override timeouts by spec and user * bump version * lint * fix optimizer timeout not affecting enough * give better default scores to providers * fixed all issues --- cookbook/specs/spec_add_ethereum.json | 3 +- ecosystem/lava-sdk/src/common/timeout.ts | 32 ++++- .../providerOptimizer/providerOptimizer.ts | 14 ++- .../src/rpcconsumer/rpcconsumer_server.ts | 32 +++-- proto/lavanet/lava/spec/api_collection.proto | 1 + protocol/chainlib/chain_message.go | 9 ++ protocol/chainlib/chainlib.go | 1 + protocol/chainlib/common.go | 17 +++ protocol/chainlib/grpc.go | 6 +- protocol/chainlib/jsonRPC.go | 14 ++- protocol/chainlib/rest.go | 9 +- protocol/chainlib/tendermintRPC.go | 26 ++-- protocol/common/endpoints.go | 4 +- protocol/common/timeout.go | 5 +- protocol/lavasession/common.go | 3 +- protocol/lavasession/consumer_types.go | 29 ++++- .../provideroptimizer/provider_optimizer.go | 8 +- protocol/rpcconsumer/rpcconsumer_server.go | 31 +++-- x/protocol/types/params.go | 4 +- x/spec/types/api_collection.pb.go | 111 ++++++++++++------ 20 files changed, 260 insertions(+), 99 deletions(-) diff --git a/cookbook/specs/spec_add_ethereum.json b/cookbook/specs/spec_add_ethereum.json index fbf896976d..e8da015f45 100644 --- a/cookbook/specs/spec_add_ethereum.json +++ b/cookbook/specs/spec_add_ethereum.json @@ -205,7 +205,8 @@ "subscription": false, "stateful": 0 }, - "extra_compute_units": 0 + "extra_compute_units": 0, + "timeout_ms":1000 }, { "name": "eth_getBalance", diff --git a/ecosystem/lava-sdk/src/common/timeout.ts b/ecosystem/lava-sdk/src/common/timeout.ts index fcfe2a2ccc..e689d225b0 100644 --- a/ecosystem/lava-sdk/src/common/timeout.ts +++ b/ecosystem/lava-sdk/src/common/timeout.ts @@ -1,9 +1,19 @@ +import { BaseChainParser } from "../chainlib/base_chain_parser"; +import { BaseChainMessageContainer } from "../chainlib/chain_message"; +import { + IsHangingApi, + GetComputeUnits, +} from "../chainlib/chain_message_queries"; + export const TimePerCU = 100; // ms export const MinimumTimePerRelayDelay = 1000; // ms export const AverageWorldLatency = 300; // ms export function getTimePerCu(computeUnits: number): number { - return localNodeTimePerCu(computeUnits) + MinimumTimePerRelayDelay; + if (localNodeTimePerCu(computeUnits) < MinimumTimePerRelayDelay) { + return MinimumTimePerRelayDelay; + } + return localNodeTimePerCu(computeUnits); } function localNodeTimePerCu(computeUnits: number): number { @@ -13,3 +23,23 @@ function localNodeTimePerCu(computeUnits: number): number { export function baseTimePerCU(computeUnits: number): number { return computeUnits * TimePerCU; } + +export function GetRelayTimeout( + chainMessage: BaseChainMessageContainer, + chainParser: BaseChainParser, + timeouts: number +): number { + let extraRelayTimeout = 0; + if (IsHangingApi(chainMessage)) { + const chainStats = chainParser.chainBlockStats(); + extraRelayTimeout = chainStats.averageBlockTime; + } + let relayTimeAddition = getTimePerCu(GetComputeUnits(chainMessage)); + if (chainMessage.getApi().getTimeoutMs() > 0) { + relayTimeAddition = chainMessage.getApi().getTimeoutMs(); + } + // Set relay timout, increase it every time we fail a relay on timeout + return ( + extraRelayTimeout + (timeouts + 1) * relayTimeAddition + AverageWorldLatency + ); +} diff --git a/ecosystem/lava-sdk/src/providerOptimizer/providerOptimizer.ts b/ecosystem/lava-sdk/src/providerOptimizer/providerOptimizer.ts index bd4e050dae..99ee80de1e 100644 --- a/ecosystem/lava-sdk/src/providerOptimizer/providerOptimizer.ts +++ b/ecosystem/lava-sdk/src/providerOptimizer/providerOptimizer.ts @@ -3,7 +3,11 @@ import { ProviderOptimizer as ProviderOptimizerInterface } from "../lavasession/ import { Logger } from "../logger/logger"; import random from "random"; import gammainc from "@stdlib/math-base-special-gammainc"; -import { baseTimePerCU, getTimePerCu } from "../common/timeout"; +import { + AverageWorldLatency, + baseTimePerCU, + getTimePerCu, +} from "../common/timeout"; import BigNumber from "bignumber.js"; import { hourInMillis, millisToSeconds, now } from "../util/time"; import { ScoreStore } from "../util/score/decayScore"; @@ -162,7 +166,7 @@ export class ProviderOptimizer implements ProviderOptimizerInterface { if (latency > 0) { let baseLatency = this.baseWorldLatency + baseTimePerCU(cu) / 2; if (isHangingApi) { - baseLatency += this.averageBlockTime; + baseLatency += this.averageBlockTime / 2; } providerData = this.updateProbeEntryLatency( providerData, @@ -440,7 +444,7 @@ export class ProviderOptimizer implements ProviderOptimizerInterface { requestedBlock: number ): number { const baseLatency = this.baseWorldLatency + baseTimePerCU(cu) / 2; - const timeoutDuration = getTimePerCu(cu); + const timeoutDuration = AverageWorldLatency + getTimePerCu(cu); let historicalLatency = 0; if (providerData.latency.denom === 0) { @@ -534,8 +538,8 @@ export class ProviderOptimizer implements ProviderOptimizerInterface { const time = -1 * INITIAL_DATA_STALENESS * hourInMillis; data = { availability: new ScoreStore(0.99, 1, now() + time), - latency: new ScoreStore(2, 1, now() + time), - sync: new ScoreStore(2, 1, now() + time), + latency: new ScoreStore(1, 1, now() + time), + sync: new ScoreStore(1, 1, now() + time), syncBlock: 0, }; } diff --git a/ecosystem/lava-sdk/src/rpcconsumer/rpcconsumer_server.ts b/ecosystem/lava-sdk/src/rpcconsumer/rpcconsumer_server.ts index dfb5578ff2..814d961a2e 100644 --- a/ecosystem/lava-sdk/src/rpcconsumer/rpcconsumer_server.ts +++ b/ecosystem/lava-sdk/src/rpcconsumer/rpcconsumer_server.ts @@ -33,7 +33,11 @@ import { RelayRequest, } from "../grpc_web_services/lavanet/lava/pairing/relay_pb"; import SDKErrors from "../sdk/errors"; -import { AverageWorldLatency, getTimePerCu } from "../common/timeout"; +import { + AverageWorldLatency, + GetRelayTimeout, + getTimePerCu, +} from "../common/timeout"; import { FinalizationConsensus } from "../lavaprotocol/finalization_consensus"; import { BACKOFF_TIME_ON_FAILURE, LATEST_BLOCK } from "../common/common"; import { BaseChainMessageContainer } from "../chainlib/chain_message"; @@ -113,6 +117,8 @@ export class RPCConsumerServer { this.consumerSessionManager.getValidAddresses("", []).size, MaxRelayRetries ); + + let timeouts = 0; for ( let retries = 0; retries < maxRetriesAsSizeOfValidAddressesList; @@ -121,7 +127,8 @@ export class RPCConsumerServer { const relayResult = await this.sendRelayToProvider( chainMessage, relayPrivateData, - unwantedProviders + unwantedProviders, + timeouts ); if (relayResult instanceof Array) { // relayResult can be an Array of errors from relaying to multiple providers @@ -134,6 +141,9 @@ export class RPCConsumerServer { } else { unwantedProviders.add(oneResult.providerAddress); } + if (oneResult.err == SDKErrors.relayTimeout) { + timeouts++; + } errors.push(oneResult.err); } } else if (relayResult instanceof Error) { @@ -156,23 +166,19 @@ export class RPCConsumerServer { private async sendRelayToProvider( chainMessage: BaseChainMessageContainer, relayData: RelayPrivateData, - unwantedProviders: Set + unwantedProviders: Set, + timeouts: number ): Promise | Error> { if (IsSubscription(chainMessage)) { return new Error("subscription currently not supported"); } const chainID = this.rpcEndpoint.chainId; const lavaChainId = this.lavaChainId; - - let extraRelayTimeout = 0; - if (IsHangingApi(chainMessage)) { - const { averageBlockTime } = this.chainParser.chainBlockStats(); - extraRelayTimeout = averageBlockTime; - } - const relayTimeout = - extraRelayTimeout + - getTimePerCu(GetComputeUnits(chainMessage)) + - AverageWorldLatency; + const relayTimeout = GetRelayTimeout( + chainMessage, + this.chainParser, + timeouts + ); const consumerSessionsMap = this.consumerSessionManager.getSessions( GetComputeUnits(chainMessage), unwantedProviders, diff --git a/proto/lavanet/lava/spec/api_collection.proto b/proto/lavanet/lava/spec/api_collection.proto index e0256b4a9b..1c8002dcf6 100644 --- a/proto/lavanet/lava/spec/api_collection.proto +++ b/proto/lavanet/lava/spec/api_collection.proto @@ -67,6 +67,7 @@ message Api { uint64 extra_compute_units = 4; SpecCategory category = 6 [(gogoproto.nullable) = false]; BlockParser block_parsing = 7 [(gogoproto.nullable) = false]; + uint64 timeout_ms = 8; } message ParseDirective { diff --git a/protocol/chainlib/chain_message.go b/protocol/chainlib/chain_message.go index cee053c2c8..d9ac745f86 100644 --- a/protocol/chainlib/chain_message.go +++ b/protocol/chainlib/chain_message.go @@ -2,6 +2,7 @@ package chainlib import ( "math" + "time" "github.com/lavanet/lava/protocol/chainlib/chainproxy/rpcInterfaceMessages" pairingtypes "github.com/lavanet/lava/x/pairing/types" @@ -21,6 +22,14 @@ type baseChainMessageContainer struct { msg updatableRPCInput apiCollection *spectypes.ApiCollection extensions []*spectypes.Extension + timeoutOverride time.Duration +} + +func (pm *baseChainMessageContainer) TimeoutOverride(override ...time.Duration) time.Duration { + if len(override) > 0 { + pm.timeoutOverride = override[0] + } + return pm.timeoutOverride } func (pm *baseChainMessageContainer) DisableErrorHandling() { diff --git a/protocol/chainlib/chainlib.go b/protocol/chainlib/chainlib.go index 4b36a001be..05773b9086 100644 --- a/protocol/chainlib/chainlib.go +++ b/protocol/chainlib/chainlib.go @@ -70,6 +70,7 @@ type ChainMessage interface { AppendHeader(metadata []pairingtypes.Metadata) GetExtensions() []*spectypes.Extension DisableErrorHandling() + TimeoutOverride(...time.Duration) time.Duration ChainMessageForSend } diff --git a/protocol/chainlib/common.go b/protocol/chainlib/common.go index 01c9c1c266..aa7eb1c641 100644 --- a/protocol/chainlib/common.go +++ b/protocol/chainlib/common.go @@ -257,3 +257,20 @@ func CompareRequestedBlockInBatch(firstRequestedBlock int64, second int64) (late // both are positive return returnBigger(firstRequestedBlock, second) } + +func GetRelayTimeout(chainMessage ChainMessage, chainParser ChainParser, timeouts int) time.Duration { + if chainMessage.TimeoutOverride() != 0 { + return chainMessage.TimeoutOverride() + } + // Calculate extra RelayTimeout + extraRelayTimeout := time.Duration(0) + if IsHangingApi(chainMessage) { + _, extraRelayTimeout, _, _ = chainParser.ChainBlockStats() + } + relayTimeAddition := common.GetTimePerCu(GetComputeUnits(chainMessage)) + if chainMessage.GetApi().TimeoutMs > 0 { + relayTimeAddition = time.Millisecond * time.Duration(chainMessage.GetApi().TimeoutMs) + } + // Set relay timout, increase it every time we fail a relay on timeout + return extraRelayTimeout + time.Duration(timeouts+1)*relayTimeAddition + common.AverageWorldLatency +} diff --git a/protocol/chainlib/grpc.go b/protocol/chainlib/grpc.go index e0c736264b..a007b6f62d 100644 --- a/protocol/chainlib/grpc.go +++ b/protocol/chainlib/grpc.go @@ -8,6 +8,7 @@ import ( "fmt" "io" "net/http" + "strconv" "strings" "sync" "time" @@ -279,8 +280,9 @@ func (apil *GrpcChainListener) Serve(ctx context.Context) { lis := GetListenerWithRetryGrpc("tcp", apil.endpoint.NetworkAddress) apiInterface := apil.endpoint.ApiInterface sendRelayCallback := func(ctx context.Context, method string, reqBody []byte) ([]byte, metadata.MD, error) { - ctx = utils.WithUniqueIdentifier(ctx, utils.GenerateUniqueIdentifier()) - msgSeed := apil.logger.GetMessageSeed() + guid := utils.GenerateUniqueIdentifier() + ctx = utils.WithUniqueIdentifier(ctx, guid) + msgSeed := strconv.FormatUint(guid, 10) metadataValues, _ := metadata.FromIncomingContext(ctx) startTime := time.Now() // Extract dappID from grpc header diff --git a/protocol/chainlib/jsonRPC.go b/protocol/chainlib/jsonRPC.go index 23e3c4ec66..dfe4a741be 100644 --- a/protocol/chainlib/jsonRPC.go +++ b/protocol/chainlib/jsonRPC.go @@ -328,7 +328,9 @@ func (apil *JsonRPCChainListener) Serve(ctx context.Context) { } ctx, cancel := context.WithCancel(context.Background()) - ctx = utils.WithUniqueIdentifier(ctx, utils.GenerateUniqueIdentifier()) + guid := utils.GenerateUniqueIdentifier() + ctx = utils.WithUniqueIdentifier(ctx, guid) + msgSeed = strconv.FormatUint(guid, 10) defer cancel() // incase there's a problem make sure to cancel the connection utils.LavaFormatDebug("ws in <<<", utils.Attribute{Key: "seed", Value: msgSeed}, utils.Attribute{Key: "GUID", Value: ctx}, utils.Attribute{Key: "msg", Value: msg}, utils.Attribute{Key: "dappID", Value: dappID}) metricsData := metrics.NewRelayAnalytics(dappID, chainID, apiInterface) @@ -336,7 +338,6 @@ func (apil *JsonRPCChainListener) Serve(ctx context.Context) { reply := relayResult.GetReply() replyServer := relayResult.GetReplyServer() go apil.logger.AddMetricForWebSocket(metricsData, err, websockConn) - if err != nil { apil.logger.AnalyzeWebSocketErrorAndWriteMessage(websockConn, messageType, err, msgSeed, msg, spectypes.APIInterfaceJsonRPC, time.Since(startTime)) continue @@ -390,19 +391,22 @@ func (apil *JsonRPCChainListener) Serve(ctx context.Context) { startTime := time.Now() endTx := apil.logger.LogStartTransaction("jsonRpc-http post") defer endTx() - msgSeed := apil.logger.GetMessageSeed() dappID := extractDappIDFromFiberContext(fiberCtx) metricsData := metrics.NewRelayAnalytics(dappID, chainID, apiInterface) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - ctx = utils.WithUniqueIdentifier(ctx, utils.GenerateUniqueIdentifier()) + guid := utils.GenerateUniqueIdentifier() + ctx = utils.WithUniqueIdentifier(ctx, guid) + msgSeed := strconv.FormatUint(guid, 10) utils.LavaFormatInfo("in <<<", utils.Attribute{Key: "GUID", Value: ctx}, utils.Attribute{Key: "seed", Value: msgSeed}, utils.Attribute{Key: "msg", Value: fiberCtx.Body()}, utils.Attribute{Key: "dappID", Value: dappID}) if test_mode { apil.logger.LogTestMode(fiberCtx) } consumerIp := fiberCtx.Get(common.IP_FORWARDING_HEADER_NAME, fiberCtx.IP()) - relayResult, err := apil.relaySender.SendRelay(ctx, "", string(fiberCtx.Body()), http.MethodPost, dappID, consumerIp, metricsData, nil) + metadataValues := fiberCtx.GetReqHeaders() + headers := convertToMetadataMap(metadataValues) + relayResult, err := apil.relaySender.SendRelay(ctx, "", string(fiberCtx.Body()), http.MethodPost, dappID, consumerIp, metricsData, headers) reply := relayResult.GetReply() if relayResult.GetProvider() != "" { fiberCtx.Set(common.PROVIDER_ADDRESS_HEADER_NAME, relayResult.GetProvider()) diff --git a/protocol/chainlib/rest.go b/protocol/chainlib/rest.go index a0ed4be749..952a559417 100644 --- a/protocol/chainlib/rest.go +++ b/protocol/chainlib/rest.go @@ -267,7 +267,10 @@ func (apil *RestChainListener) Serve(ctx context.Context) { ctx, cancel := context.WithCancel(context.Background()) ctx = utils.WithUniqueIdentifier(ctx, utils.GenerateUniqueIdentifier()) defer cancel() // incase there's a problem make sure to cancel the connection - + guid, found := utils.GetUniqueIdentifier(ctx) + if found { + msgSeed = strconv.FormatUint(guid, 10) + } // TODO: handle contentType, in case its not application/json currently we set it to application/json in the Send() method // contentType := string(c.Context().Request.Header.ContentType()) dappID := extractDappIDFromFiberContext(fiberCtx) @@ -327,6 +330,10 @@ func (apil *RestChainListener) Serve(ctx context.Context) { restHeaders := convertToMetadataMap(metadataValues) ctx, cancel := context.WithCancel(context.Background()) ctx = utils.WithUniqueIdentifier(ctx, utils.GenerateUniqueIdentifier()) + guid, found := utils.GetUniqueIdentifier(ctx) + if found { + msgSeed = strconv.FormatUint(guid, 10) + } defer cancel() // incase there's a problem make sure to cancel the connection utils.LavaFormatInfo("in <<<", utils.Attribute{Key: "GUID", Value: ctx}, utils.Attribute{Key: "path", Value: path}, utils.Attribute{Key: "dappID", Value: dappID}, utils.Attribute{Key: "msgSeed", Value: msgSeed}) relayResult, err := apil.relaySender.SendRelay(ctx, path+query, "", fiberCtx.Method(), dappID, fiberCtx.Get(common.IP_FORWARDING_HEADER_NAME, fiberCtx.IP()), analytics, restHeaders) diff --git a/protocol/chainlib/tendermintRPC.go b/protocol/chainlib/tendermintRPC.go index 0dfde97690..c4650ec646 100644 --- a/protocol/chainlib/tendermintRPC.go +++ b/protocol/chainlib/tendermintRPC.go @@ -356,10 +356,11 @@ func (apil *TendermintRpcChainListener) Serve(ctx context.Context) { } ctx, cancel := context.WithCancel(context.Background()) - ctx = utils.WithUniqueIdentifier(ctx, utils.GenerateUniqueIdentifier()) + guid := utils.GenerateUniqueIdentifier() + ctx = utils.WithUniqueIdentifier(ctx, guid) defer cancel() // incase there's a problem make sure to cancel the connection utils.LavaFormatInfo("ws in <<<", utils.Attribute{Key: "GUID", Value: ctx}, utils.Attribute{Key: "seed", Value: msgSeed}, utils.Attribute{Key: "msg", Value: msg}, utils.Attribute{Key: "dappID", Value: dappID}) - + msgSeed = strconv.FormatUint(guid, 10) metricsData := metrics.NewRelayAnalytics(dappID, chainID, apiInterface) relayResult, err := apil.relaySender.SendRelay(ctx, "", string(msg), "", dappID, websocketConn.RemoteAddr().String(), metricsData, nil) reply := relayResult.GetReply() @@ -417,15 +418,17 @@ func (apil *TendermintRpcChainListener) Serve(ctx context.Context) { startTime := time.Now() endTx := apil.logger.LogStartTransaction("tendermint-WebSocket") defer endTx() - msgSeed := apil.logger.GetMessageSeed() dappID := extractDappIDFromFiberContext(fiberCtx) metricsData := metrics.NewRelayAnalytics(dappID, chainID, apiInterface) ctx, cancel := context.WithCancel(context.Background()) - ctx = utils.WithUniqueIdentifier(ctx, utils.GenerateUniqueIdentifier()) + guid := utils.GenerateUniqueIdentifier() + ctx = utils.WithUniqueIdentifier(ctx, guid) defer cancel() // incase there's a problem make sure to cancel the connection - + msgSeed := strconv.FormatUint(guid, 10) utils.LavaFormatInfo("in <<<", utils.Attribute{Key: "GUID", Value: ctx}, utils.Attribute{Key: "seed", Value: msgSeed}, utils.Attribute{Key: "msg", Value: fiberCtx.Body()}, utils.Attribute{Key: "dappID", Value: dappID}) - relayResult, err := apil.relaySender.SendRelay(ctx, "", string(fiberCtx.Body()), "", dappID, fiberCtx.Get(common.IP_FORWARDING_HEADER_NAME, fiberCtx.IP()), metricsData, nil) + metadataValues := fiberCtx.GetReqHeaders() + headers := convertToMetadataMap(metadataValues) + relayResult, err := apil.relaySender.SendRelay(ctx, "", string(fiberCtx.Body()), "", dappID, fiberCtx.Get(common.IP_FORWARDING_HEADER_NAME, fiberCtx.IP()), metricsData, headers) reply := relayResult.GetReply() if relayResult.GetProvider() != "" { fiberCtx.Set(common.PROVIDER_ADDRESS_HEADER_NAME, relayResult.GetProvider()) @@ -471,13 +474,16 @@ func (apil *TendermintRpcChainListener) Serve(ctx context.Context) { query := "?" + string(fiberCtx.Request().URI().QueryString()) path := fiberCtx.Params("*") dappID := extractDappIDFromFiberContext(fiberCtx) - msgSeed := apil.logger.GetMessageSeed() ctx, cancel := context.WithCancel(context.Background()) - ctx = utils.WithUniqueIdentifier(ctx, utils.GenerateUniqueIdentifier()) + guid := utils.GenerateUniqueIdentifier() + ctx = utils.WithUniqueIdentifier(ctx, guid) defer cancel() // incase there's a problem make sure to cancel the connection - utils.LavaFormatInfo("urirpc in <<<", utils.Attribute{Key: "GUID", Value: ctx}, utils.Attribute{Key: "seed", Value: msgSeed}, utils.Attribute{Key: "msg", Value: path}, utils.Attribute{Key: "dappID", Value: dappID}) + utils.LavaFormatInfo("urirpc in <<<", utils.Attribute{Key: "GUID", Value: ctx}, utils.Attribute{Key: "msg", Value: path}, utils.Attribute{Key: "dappID", Value: dappID}) metricsData := metrics.NewRelayAnalytics(dappID, chainID, apiInterface) - relayResult, err := apil.relaySender.SendRelay(ctx, path+query, "", "", dappID, fiberCtx.Get(common.IP_FORWARDING_HEADER_NAME, fiberCtx.IP()), metricsData, nil) + metadataValues := fiberCtx.GetReqHeaders() + headers := convertToMetadataMap(metadataValues) + relayResult, err := apil.relaySender.SendRelay(ctx, path+query, "", "", dappID, fiberCtx.Get(common.IP_FORWARDING_HEADER_NAME, fiberCtx.IP()), metricsData, headers) + msgSeed := strconv.FormatUint(guid, 10) reply := relayResult.GetReply() go apil.logger.AddMetricForHttp(metricsData, err, fiberCtx.GetReqHeaders()) if relayResult.GetProvider() != "" { diff --git a/protocol/common/endpoints.go b/protocol/common/endpoints.go index 48947651fd..91887d7154 100644 --- a/protocol/common/endpoints.go +++ b/protocol/common/endpoints.go @@ -20,7 +20,9 @@ const ( URL_QUERY_PARAMETERS_SEPARATOR_OTHER_PARAMETERS = "&" IP_FORWARDING_HEADER_NAME = "X-Forwarded-For" PROVIDER_ADDRESS_HEADER_NAME = "Lava-Provider-Address" - BLOCK_PROVIDERS_ADDRESSES_HEADER_NAME = "Lava-Providers-Block" + // these headers need to be lowercase + BLOCK_PROVIDERS_ADDRESSES_HEADER_NAME = "lava-providers-block" + RELAY_TIMEOUT_HEADER_NAME = "lava-relay-timeout" ) type NodeUrl struct { diff --git a/protocol/common/timeout.go b/protocol/common/timeout.go index 1e1701ed22..d1b08989dd 100644 --- a/protocol/common/timeout.go +++ b/protocol/common/timeout.go @@ -26,7 +26,10 @@ func BaseTimePerCU(cu uint64) time.Duration { } func GetTimePerCu(cu uint64) time.Duration { - return LocalNodeTimePerCu(cu) + MinimumTimePerRelayDelay + if LocalNodeTimePerCu(cu) < MinimumTimePerRelayDelay { + return MinimumTimePerRelayDelay + } + return LocalNodeTimePerCu(cu) } func ContextOutOfTime(ctx context.Context) bool { diff --git a/protocol/lavasession/common.go b/protocol/lavasession/common.go index 027aa1a3c7..349f0cfa94 100644 --- a/protocol/lavasession/common.go +++ b/protocol/lavasession/common.go @@ -59,7 +59,8 @@ func ConnectgRPCClient(ctx context.Context, address string, allowInsecure bool) tlsConf.InsecureSkipVerify = true // this will allow us to use self signed certificates in development. } credentials := credentials.NewTLS(&tlsConf) - return grpc.DialContext(ctx, address, grpc.WithBlock(), grpc.WithTransportCredentials(credentials)) + conn, err := grpc.DialContext(ctx, address, grpc.WithBlock(), grpc.WithTransportCredentials(credentials)) + return conn, err } func GenerateSelfSignedCertificate() (tls.Certificate, error) { diff --git a/protocol/lavasession/consumer_types.go b/protocol/lavasession/consumer_types.go index d4e7cd69ed..b26b5cf62b 100644 --- a/protocol/lavasession/consumer_types.go +++ b/protocol/lavasession/consumer_types.go @@ -278,8 +278,22 @@ func (cswp *ConsumerSessionsWithProvider) ConnectRawClientWithTimeout(ctx contex if err != nil { return nil, nil, err } - /*defer conn.Close()*/ - + ch := make(chan bool) + go func() { + for { + // Check if the connection state is not Connecting + if conn.GetState() == connectivity.Ready { + ch <- true + return + } + // Add some delay to avoid busy-waiting + time.Sleep(20 * time.Millisecond) + } + }() + select { + case <-connectCtx.Done(): + case <-ch: + } c := pairingtypes.NewRelayerClient(conn) return &c, conn, nil } @@ -349,7 +363,7 @@ func (cswp *ConsumerSessionsWithProvider) fetchEndpointConnectionFromConsumerSes continue } connectEndpoint := func(cswp *ConsumerSessionsWithProvider, ctx context.Context, endpoint *Endpoint) (connected_ bool) { - if endpoint.Client != nil && endpoint.connection.GetState() != connectivity.Shutdown { + if endpoint.Client != nil && endpoint.connection != nil && endpoint.connection.GetState() != connectivity.Shutdown && endpoint.connection.GetState() != connectivity.Idle { return true } client, conn, err := cswp.ConnectRawClientWithTimeout(ctx, endpoint.NetworkAddress) @@ -370,18 +384,25 @@ func (cswp *ConsumerSessionsWithProvider) fetchEndpointConnectionFromConsumerSes endpoint.connection = conn return true } + endpointState := connectivity.Idle + if endpoint.connection != nil { + endpointState = endpoint.connection.GetState() + } if endpoint.Client == nil { connected_ := connectEndpoint(cswp, ctx, endpoint) if !connected_ { continue } - } else if endpoint.connection.GetState() == connectivity.Shutdown { + } else if endpointState == connectivity.Shutdown || endpointState == connectivity.Idle { // connection was shut down, so we need to create a new one endpoint.connection.Close() connected_ := connectEndpoint(cswp, ctx, endpoint) if !connected_ { continue } + } else if endpointState == connectivity.TransientFailure || endpointState == connectivity.Connecting { + // can't use this one right now, but we could in the future + continue } cswp.Endpoints[idx] = endpoint return true, endpoint, false diff --git a/protocol/provideroptimizer/provider_optimizer.go b/protocol/provideroptimizer/provider_optimizer.go index aff21d4951..d8d816fc2b 100644 --- a/protocol/provideroptimizer/provider_optimizer.go +++ b/protocol/provideroptimizer/provider_optimizer.go @@ -83,7 +83,7 @@ func (po *ProviderOptimizer) appendRelayData(providerAddress string, latency tim if latency > 0 { baseLatency := po.baseWorldLatency + common.BaseTimePerCU(cu)/2 if isHangingApi { - baseLatency += po.averageBlockTime // hanging apis take longer + baseLatency += po.averageBlockTime / 2 // hanging apis take longer } providerData = po.updateProbeEntryLatency(providerData, latency, baseLatency, RELAY_UPDATE_WEIGHT, halfTime, sampleTime) } @@ -257,7 +257,7 @@ func (po *ProviderOptimizer) calculateSyncScore(syncScore score.ScoreStore) floa func (po *ProviderOptimizer) calculateLatencyScore(providerData ProviderData, cu uint64, requestedBlock int64) float64 { baseLatency := po.baseWorldLatency + common.BaseTimePerCU(cu)/2 // divide by two because the returned time is for timeout not for average - timeoutDuration := common.GetTimePerCu(cu) + timeoutDuration := common.GetTimePerCu(cu) + common.AverageWorldLatency var historicalLatency time.Duration if providerData.Latency.Denom == 0 { historicalLatency = baseLatency @@ -340,8 +340,8 @@ func (po *ProviderOptimizer) getProviderData(providerAddress string) (providerDa } else { providerData = ProviderData{ Availability: score.NewScoreStore(0.99, 1, time.Now().Add(-1*INITIAL_DATA_STALENESS*time.Hour)), // default value of 99% - Latency: score.NewScoreStore(2, 1, time.Now().Add(-1*INITIAL_DATA_STALENESS*time.Hour)), // default value of half score (twice the time) - Sync: score.NewScoreStore(2, 1, time.Now().Add(-1*INITIAL_DATA_STALENESS*time.Hour)), // default value of half score (twice the time) + Latency: score.NewScoreStore(1, 1, time.Now().Add(-1*INITIAL_DATA_STALENESS*time.Hour)), // default value of 1 score (encourage exploration) + Sync: score.NewScoreStore(1, 1, time.Now().Add(-1*INITIAL_DATA_STALENESS*time.Hour)), // default value of half score (encourage exploration) SyncBlock: 0, } } diff --git a/protocol/rpcconsumer/rpcconsumer_server.go b/protocol/rpcconsumer/rpcconsumer_server.go index 2187abc433..8e124cd5df 100644 --- a/protocol/rpcconsumer/rpcconsumer_server.go +++ b/protocol/rpcconsumer/rpcconsumer_server.go @@ -205,6 +205,7 @@ func (rpccs *RPCConsumerServer) SendRelay( if err != nil { return nil, err } + rpccs.HandleDirectiveHeadersForMessage(chainMessage, directiveHeaders) if _, ok := rpccs.consumerServices[chainlib.GetAddon(chainMessage)]; !ok { utils.LavaFormatError("unsupported addon usage, consumer policy does not allow", nil, utils.Attribute{Key: "addon", Value: chainlib.GetAddon(chainMessage)}, @@ -349,12 +350,6 @@ func (rpccs *RPCConsumerServer) sendRelayToProvider( chainID := rpccs.listenEndpoint.ChainID lavaChainID := rpccs.lavaChainID - // Calculate extra RelayTimeout - extraRelayTimeout := time.Duration(0) - if chainlib.IsHangingApi(chainMessage) { - _, extraRelayTimeout, _, _ = rpccs.chainParser.ChainBlockStats() - } - // Get Session. we get session here so we can use the epoch in the callbacks reqBlock, _ := chainMessage.RequestedBlock() if reqBlock == spectypes.LATEST_BLOCK && relayRequestData.SeenBlock != 0 { @@ -376,9 +371,7 @@ func (rpccs *RPCConsumerServer) sendRelayToProvider( // Make a channel for all providers to send responses responses := make(chan *relayResponse, len(sessions)) - // Set relay timout, increase it every time we fail a relay on timeout - relayTimeout := extraRelayTimeout + time.Duration(timeouts+1)*common.GetTimePerCu(chainlib.GetComputeUnits(chainMessage)) + common.AverageWorldLatency - + relayTimeout := chainlib.GetRelayTimeout(chainMessage, rpccs.chainParser, timeouts) // Iterate over the sessions map for providerPublicAddress, sessionInfo := range sessions { // Launch a separate goroutine for each session @@ -723,9 +716,12 @@ func (rpccs *RPCConsumerServer) LavaDirectiveHeaders(metadata []pairingtypes.Met metadataRet := []pairingtypes.Metadata{} headerDirectives := map[string]string{} for _, metaElement := range metadata { - switch metaElement.Name { - case common.BLOCK_PROVIDERS_ADDRESSES_HEADER_NAME, strings.ToLower(common.BLOCK_PROVIDERS_ADDRESSES_HEADER_NAME): - headerDirectives[common.BLOCK_PROVIDERS_ADDRESSES_HEADER_NAME] = metaElement.Value + name := strings.ToLower(metaElement.Name) + switch name { + case common.BLOCK_PROVIDERS_ADDRESSES_HEADER_NAME: + headerDirectives[name] = metaElement.Value + case common.RELAY_TIMEOUT_HEADER_NAME: + headerDirectives[name] = metaElement.Value default: metadataRet = append(metadataRet, metaElement) } @@ -744,3 +740,14 @@ func (rpccs *RPCConsumerServer) GetInitialUnwantedProviders(directiveHeaders map } return unwantedProviders } + +func (rpccs *RPCConsumerServer) HandleDirectiveHeadersForMessage(chainMessage chainlib.ChainMessage, directiveHeaders map[string]string) { + timeoutStr, ok := directiveHeaders[common.RELAY_TIMEOUT_HEADER_NAME] + if ok { + timeout, err := time.ParseDuration(timeoutStr) + if err == nil { + // set an override timeout + chainMessage.TimeoutOverride(timeout) + } + } +} diff --git a/x/protocol/types/params.go b/x/protocol/types/params.go index aeba8b5f40..23e2c40d90 100644 --- a/x/protocol/types/params.go +++ b/x/protocol/types/params.go @@ -12,8 +12,8 @@ import ( var _ paramtypes.ParamSet = (*Params)(nil) const ( - TARGET_VERSION = "0.27.2" - MIN_VERSION = "0.26.1" + TARGET_VERSION = "0.27.3" + MIN_VERSION = "0.27.0" ) var ( diff --git a/x/spec/types/api_collection.pb.go b/x/spec/types/api_collection.pb.go index 78cb3ccffa..9cecbab697 100644 --- a/x/spec/types/api_collection.pb.go +++ b/x/spec/types/api_collection.pb.go @@ -614,6 +614,7 @@ type Api struct { ExtraComputeUnits uint64 `protobuf:"varint,4,opt,name=extra_compute_units,json=extraComputeUnits,proto3" json:"extra_compute_units,omitempty"` Category SpecCategory `protobuf:"bytes,6,opt,name=category,proto3" json:"category"` BlockParsing BlockParser `protobuf:"bytes,7,opt,name=block_parsing,json=blockParsing,proto3" json:"block_parsing"` + TimeoutMs uint64 `protobuf:"varint,8,opt,name=timeout_ms,json=timeoutMs,proto3" json:"timeout_ms,omitempty"` } func (m *Api) Reset() { *m = Api{} } @@ -691,6 +692,13 @@ func (m *Api) GetBlockParsing() BlockParser { return BlockParser{} } +func (m *Api) GetTimeoutMs() uint64 { + if m != nil { + return m.TimeoutMs + } + return 0 +} + type ParseDirective struct { FunctionTag FUNCTION_TAG `protobuf:"varint,1,opt,name=function_tag,json=functionTag,proto3,enum=lavanet.lava.spec.FUNCTION_TAG" json:"function_tag,omitempty"` FunctionTemplate string `protobuf:"bytes,2,opt,name=function_template,json=functionTemplate,proto3" json:"function_template,omitempty"` @@ -926,12 +934,12 @@ func init() { } var fileDescriptor_c9f7567a181f534f = []byte{ - // 1335 bytes of a gzipped FileDescriptorProto + // 1351 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x56, 0x4f, 0x6f, 0xdb, 0xc6, 0x13, 0x35, 0x25, 0xda, 0x96, 0x46, 0x7f, 0xcc, 0x6c, 0xf2, 0xcb, 0x4f, 0x49, 0x1d, 0x29, 0x65, 0xd2, 0x36, 0x70, 0x00, 0x1b, 0x4d, 0x50, 0xa0, 0x08, 0x0a, 0x14, 0x94, 0x44, 0x27, 0x6a, 0x6c, 0xc9, 0x58, 0xcb, 0x46, 0xdd, 0x0b, 0xb1, 0x22, 0xd7, 0xd2, 0x22, 0x14, 0xc9, 0x92, 0x4b, 0xc3, - 0xee, 0xb5, 0xb7, 0x9e, 0x7a, 0xea, 0x37, 0x28, 0x50, 0xa0, 0x40, 0x81, 0x7e, 0x8b, 0x1c, 0x73, + 0xee, 0xb5, 0xb7, 0x9e, 0xfa, 0x25, 0x0a, 0x14, 0x28, 0x50, 0xa0, 0x87, 0x7e, 0x87, 0x1c, 0x73, 0xec, 0x49, 0x28, 0x9c, 0x43, 0xd1, 0x1c, 0x73, 0x2f, 0x50, 0xec, 0x92, 0x92, 0x45, 0x47, 0x49, 0x91, 0x13, 0x39, 0x6f, 0xde, 0xbc, 0x9d, 0xd9, 0x9d, 0x1d, 0x12, 0x3e, 0x76, 0xc9, 0x09, 0xf1, 0x28, 0xdf, 0x12, 0xcf, 0xad, 0x28, 0xa0, 0xf6, 0x16, 0x09, 0x98, 0x65, 0xfb, 0xae, 0x4b, 0x6d, @@ -953,13 +961,13 @@ var fileDescriptor_c9f7567a181f534f = []byte{ 0xc5, 0x8e, 0xad, 0x71, 0xec, 0x72, 0x16, 0xb8, 0x8c, 0x86, 0xf2, 0xc8, 0x73, 0xb8, 0x6c, 0xc7, 0xbb, 0x33, 0x0c, 0xdd, 0x07, 0x35, 0x8c, 0x5d, 0x5a, 0xcb, 0xcb, 0x76, 0xf8, 0xff, 0x82, 0x1c, 0x70, 0xec, 0x52, 0x2c, 0x49, 0xfa, 0x3a, 0xa8, 0xc2, 0x42, 0xd7, 0x60, 0x79, 0xe0, 0xfa, 0xf6, - 0x33, 0xb9, 0x9c, 0x8a, 0x13, 0x43, 0xff, 0x59, 0x81, 0xf2, 0x7c, 0xc2, 0x0b, 0x93, 0xfa, 0x0a, + 0x33, 0xb9, 0x9c, 0x8a, 0x13, 0x43, 0xff, 0x49, 0x81, 0xf2, 0x7c, 0xc2, 0x0b, 0x93, 0xfa, 0x0a, 0xd6, 0x2e, 0x1d, 0xc4, 0x3b, 0x3a, 0xf1, 0xd2, 0x39, 0x54, 0xb3, 0xe7, 0x80, 0x3e, 0x83, 0x95, 0x13, 0xe2, 0xc6, 0x74, 0xda, 0x85, 0xb7, 0xde, 0x26, 0x71, 0x28, 0x58, 0x38, 0x25, 0xeb, 0xdf, 0x01, 0x5c, 0xa0, 0x68, 0x1d, 0x8a, 0xb3, 0xb3, 0x49, 0x33, 0xbd, 0x00, 0xd0, 0x47, 0x50, 0xa5, 0xa7, 0x01, 0xb5, 0x39, 0x75, 0x2c, 0x19, 0x2e, 0xb3, 0x2d, 0xe2, 0xca, 0x14, 0x4d, 0x44, 0x3e, 0x81, 0x35, 0x97, 0x70, 0x1a, 0x71, 0xcb, 0x61, 0x91, 0xec, 0x3a, 0xb9, 0xa1, 0x2a, 0xae, 0x26, - 0x70, 0x3b, 0x45, 0xf5, 0xdf, 0x73, 0x50, 0xcd, 0xf6, 0x2a, 0x3a, 0x84, 0x8a, 0x18, 0x04, 0xcc, + 0x70, 0x3b, 0x45, 0xf5, 0xdf, 0x72, 0x50, 0xcd, 0xf6, 0x2a, 0x3a, 0x84, 0x8a, 0x18, 0x04, 0xcc, 0xe3, 0x34, 0x3c, 0x26, 0x76, 0xba, 0x5d, 0xcd, 0x4f, 0x5f, 0x4d, 0x1a, 0x59, 0xc7, 0xeb, 0x49, 0x63, 0x7d, 0x4c, 0x82, 0x88, 0x87, 0xb1, 0xcd, 0xe3, 0x90, 0x3e, 0xd2, 0x33, 0x6e, 0x1d, 0x97, 0x49, 0xc0, 0x3a, 0x53, 0x53, 0xe8, 0x4a, 0x9f, 0x47, 0x5c, 0x2b, 0x20, 0x7c, 0x94, 0x64, 0x9e, @@ -973,44 +981,45 @@ var fileDescriptor_c9f7567a181f534f = []byte{ 0xe1, 0x3d, 0xdc, 0x3e, 0xe8, 0xb6, 0xfa, 0x9d, 0x5e, 0xd7, 0xea, 0x1b, 0x8f, 0x71, 0x69, 0x1a, 0xd4, 0x27, 0x43, 0xfd, 0x29, 0xc0, 0x85, 0x2e, 0xaa, 0x40, 0x31, 0x20, 0x51, 0x64, 0x45, 0xd4, 0x73, 0xb4, 0x25, 0x54, 0x05, 0x90, 0x66, 0x48, 0x03, 0xf7, 0x4c, 0x53, 0x66, 0xee, 0x81, 0xcf, - 0x47, 0x5a, 0x0e, 0xad, 0x41, 0x49, 0x9a, 0x6c, 0xe8, 0xf9, 0x21, 0xd5, 0xf2, 0xfa, 0x4f, 0x39, + 0x47, 0x5a, 0x0e, 0xad, 0x41, 0x49, 0x9a, 0x6c, 0xe8, 0xf9, 0x21, 0xd5, 0xf2, 0xfa, 0xef, 0x39, 0xc8, 0x1b, 0x01, 0x7b, 0xc7, 0x20, 0x9f, 0x6e, 0x40, 0xee, 0xd2, 0x3d, 0xf7, 0xc7, 0x41, 0xcc, 0xa9, 0x15, 0x7b, 0x8c, 0x47, 0x69, 0xeb, 0x95, 0x53, 0xf0, 0x40, 0x60, 0x68, 0x13, 0xae, 0xd2, 0x53, 0x1e, 0x12, 0x2b, 0x4b, 0x55, 0x25, 0xf5, 0x8a, 0x74, 0xb5, 0xe6, 0xf9, 0x06, 0x14, 0x6c, 0xc2, 0xe9, 0xd0, 0x0f, 0xcf, 0x6a, 0x2b, 0xf2, 0x82, 0x2e, 0xda, 0x97, 0xfd, 0x80, 0xda, 0xad, 0x94, 0x96, 0x7e, 0x28, 0x66, 0x61, 0xa8, 0x03, 0x15, 0x39, 0x18, 0x2c, 0x71, 0x6d, 0x99, 0x37, 0xac, 0xad, 0x4a, 0x9d, 0xfa, 0x02, 0x9d, 0xa6, 0xe0, 0xc9, 0x4b, 0x19, 0xa6, 0x32, 0xe5, 0xc1, - 0x14, 0x62, 0xde, 0x50, 0xff, 0x5b, 0x81, 0x6a, 0x76, 0x18, 0xbc, 0x71, 0x78, 0xca, 0xfb, 0x1f, - 0x1e, 0xba, 0x0f, 0x57, 0x2e, 0x34, 0xe8, 0x38, 0x10, 0x97, 0x35, 0xdd, 0x5a, 0x6d, 0xc6, 0x4b, - 0x71, 0xf4, 0x14, 0xaa, 0x21, 0x8d, 0x62, 0x97, 0xcf, 0xea, 0xc9, 0xbf, 0x47, 0x3d, 0x95, 0x24, - 0x36, 0x2d, 0x08, 0xdd, 0x80, 0x82, 0xb8, 0xbc, 0xf2, 0x2c, 0xe5, 0x8d, 0xc0, 0xab, 0x24, 0x60, - 0x5d, 0x32, 0xa6, 0xfa, 0x6f, 0x0a, 0x94, 0xe6, 0xe2, 0xd1, 0x2d, 0xd1, 0x44, 0xe2, 0xcd, 0x22, - 0xa1, 0x28, 0x33, 0x2f, 0x26, 0x54, 0x82, 0x18, 0xe1, 0x10, 0x7d, 0x29, 0x9a, 0x48, 0xba, 0x45, - 0xc6, 0xe9, 0x2d, 0x58, 0x94, 0xd3, 0x9e, 0x81, 0xf7, 0x4d, 0x6c, 0x89, 0xdd, 0xc0, 0xa9, 0xe2, - 0x76, 0xec, 0xd9, 0xa2, 0x7d, 0x1c, 0x7a, 0x4c, 0x44, 0x61, 0xc9, 0x84, 0x93, 0x17, 0x1b, 0x97, - 0x53, 0x30, 0x19, 0x70, 0x37, 0xa1, 0x40, 0x3d, 0xdb, 0x77, 0x44, 0xd9, 0x49, 0xbe, 0x33, 0x5b, - 0xff, 0x55, 0x81, 0xf2, 0x7c, 0x23, 0xa0, 0xbb, 0x42, 0x91, 0xd3, 0x70, 0xcc, 0x3c, 0x16, 0x71, - 0x66, 0xa7, 0x4d, 0x9c, 0x05, 0xc5, 0x47, 0xc4, 0xf5, 0x6d, 0xe2, 0xca, 0x94, 0x0b, 0x38, 0x31, - 0x90, 0x0e, 0xe5, 0x28, 0x1e, 0x44, 0x76, 0xc8, 0x02, 0xb1, 0xfb, 0x32, 0x99, 0x02, 0xce, 0x60, - 0x22, 0x99, 0x88, 0x13, 0x4e, 0x8f, 0x63, 0x57, 0x26, 0x53, 0xc1, 0x33, 0x1b, 0x35, 0xa0, 0x34, - 0x22, 0xde, 0x90, 0x79, 0x43, 0xf1, 0xcb, 0x50, 0x5b, 0x96, 0xe1, 0x90, 0x42, 0x46, 0xc0, 0x36, - 0x74, 0x28, 0x9a, 0x5f, 0xf7, 0xcd, 0xee, 0x7e, 0xa7, 0xd7, 0x45, 0x05, 0x50, 0xbb, 0xbd, 0xae, - 0xa9, 0x2d, 0xa1, 0x12, 0xac, 0x1a, 0xb8, 0xf5, 0xa4, 0x73, 0x68, 0x6a, 0xca, 0xc6, 0x0f, 0x0a, - 0x94, 0xe7, 0xbb, 0x06, 0x95, 0xa1, 0xd0, 0xee, 0xec, 0x1b, 0xcd, 0x1d, 0xb3, 0xad, 0x2d, 0x21, - 0x0d, 0xca, 0x8f, 0xcd, 0xbe, 0xd5, 0xdc, 0xe9, 0xb5, 0x9e, 0x76, 0x0f, 0x76, 0x35, 0x05, 0x5d, - 0x03, 0x6d, 0x86, 0x58, 0xcd, 0x23, 0x4b, 0xa0, 0x39, 0x74, 0x13, 0xae, 0xef, 0x9b, 0x7d, 0x6b, - 0xc7, 0xe8, 0x9b, 0xfb, 0x7d, 0xab, 0xd3, 0xb5, 0x76, 0xcd, 0xbe, 0xd1, 0x36, 0xfa, 0x86, 0x96, - 0x47, 0xd7, 0x01, 0x65, 0x7d, 0xcd, 0x5e, 0xfb, 0x48, 0x53, 0x85, 0xf6, 0xa1, 0x89, 0x3b, 0xdb, - 0x9d, 0x96, 0x21, 0x56, 0xd7, 0x96, 0x37, 0xbe, 0x57, 0xa0, 0x34, 0x77, 0x76, 0xa8, 0x08, 0xcb, - 0xe6, 0xee, 0x5e, 0xff, 0x28, 0x49, 0x44, 0x7a, 0xc4, 0x92, 0x06, 0x7e, 0xac, 0x29, 0xe8, 0x2a, - 0xac, 0x25, 0x48, 0xcb, 0xe8, 0xf6, 0xba, 0x9d, 0x96, 0xb1, 0xa3, 0xe5, 0x44, 0x76, 0x09, 0xd8, - 0xee, 0xc8, 0x92, 0x0c, 0x7c, 0xa4, 0xe5, 0x51, 0x03, 0x3e, 0xb8, 0x8c, 0x5a, 0x3d, 0x6c, 0xf5, - 0x70, 0xdb, 0xc4, 0x66, 0x5b, 0x53, 0xc5, 0x96, 0xb4, 0xcd, 0x6d, 0xe3, 0x60, 0xa7, 0xaf, 0xad, - 0x34, 0x9b, 0xbf, 0x9c, 0xd7, 0x95, 0xe7, 0xe7, 0x75, 0xe5, 0xc5, 0x79, 0x5d, 0xf9, 0xf3, 0xbc, - 0xae, 0xfc, 0xf8, 0xb2, 0xbe, 0xf4, 0xe2, 0x65, 0x7d, 0xe9, 0x8f, 0x97, 0xf5, 0xa5, 0x6f, 0xee, - 0x0e, 0x19, 0x1f, 0xc5, 0x83, 0x4d, 0xdb, 0x1f, 0x6f, 0x65, 0xfe, 0x73, 0x4f, 0x93, 0x3f, 0x5d, - 0xf1, 0x0d, 0x88, 0x06, 0x2b, 0xf2, 0xc7, 0xf5, 0xe1, 0xbf, 0x01, 0x00, 0x00, 0xff, 0xff, 0x61, - 0x6b, 0xc1, 0x70, 0x0b, 0x0b, 0x00, 0x00, + 0x14, 0x62, 0xde, 0x10, 0xdd, 0x02, 0xe0, 0x6c, 0x4c, 0xfd, 0x98, 0x5b, 0x63, 0x31, 0x2f, 0x45, + 0xd2, 0xc5, 0x14, 0xd9, 0x8d, 0xf4, 0xbf, 0x15, 0xa8, 0x66, 0x67, 0xc5, 0x1b, 0x67, 0xab, 0xbc, + 0xff, 0xd9, 0xa2, 0xfb, 0x70, 0xe5, 0x42, 0x83, 0x8e, 0x03, 0x71, 0x97, 0xd3, 0x9d, 0xd7, 0x66, + 0xbc, 0x14, 0x47, 0x4f, 0xa1, 0x1a, 0xd2, 0x28, 0x76, 0xf9, 0xac, 0xdc, 0xfc, 0x7b, 0x94, 0x5b, + 0x49, 0x62, 0xa7, 0xf5, 0xde, 0x80, 0x82, 0xb8, 0xdb, 0xf2, 0xa8, 0xe5, 0x85, 0xc1, 0xab, 0x24, + 0x60, 0x5d, 0x32, 0xa6, 0xfa, 0xaf, 0x0a, 0x94, 0xe6, 0xe2, 0xc5, 0xd6, 0xc8, 0xb1, 0x18, 0x5a, + 0x24, 0x14, 0x65, 0xe6, 0xc5, 0x00, 0x4b, 0x10, 0x23, 0x1c, 0xa2, 0x2f, 0x45, 0x8f, 0x49, 0xb7, + 0xc8, 0x38, 0xbd, 0x24, 0x8b, 0x72, 0xda, 0x33, 0xf0, 0xbe, 0x89, 0x2d, 0xb1, 0x1b, 0x38, 0x55, + 0xdc, 0x8e, 0x3d, 0x5b, 0x74, 0x97, 0x43, 0x8f, 0x89, 0x28, 0x2c, 0x19, 0x80, 0xf2, 0xde, 0xe3, + 0x72, 0x0a, 0x26, 0xf3, 0xef, 0x26, 0x14, 0xa8, 0x67, 0xfb, 0x8e, 0x28, 0x3b, 0xc9, 0x77, 0x66, + 0xeb, 0xbf, 0x28, 0x50, 0x9e, 0xef, 0x13, 0x74, 0x57, 0x28, 0x72, 0x1a, 0x8e, 0x99, 0xc7, 0x22, + 0xce, 0xec, 0xb4, 0xc7, 0xb3, 0xa0, 0xf8, 0xc6, 0xb8, 0xbe, 0x4d, 0x5c, 0x99, 0x72, 0x01, 0x27, + 0x06, 0xd2, 0xa1, 0x1c, 0xc5, 0x83, 0xc8, 0x0e, 0x59, 0x20, 0x76, 0x5f, 0x26, 0x53, 0xc0, 0x19, + 0x4c, 0x24, 0x13, 0x71, 0xc2, 0xe9, 0x71, 0xec, 0xca, 0x64, 0x2a, 0x78, 0x66, 0xa3, 0x06, 0x94, + 0x46, 0xc4, 0x1b, 0x32, 0x6f, 0x28, 0xfe, 0x28, 0x6a, 0xcb, 0x32, 0x1c, 0x52, 0xc8, 0x08, 0xd8, + 0x86, 0x0e, 0x45, 0xf3, 0xeb, 0xbe, 0xd9, 0xdd, 0xef, 0xf4, 0xba, 0xa8, 0x00, 0x6a, 0xb7, 0xd7, + 0x35, 0xb5, 0x25, 0x54, 0x82, 0x55, 0x03, 0xb7, 0x9e, 0x74, 0x0e, 0x4d, 0x4d, 0xd9, 0xf8, 0x41, + 0x81, 0xf2, 0x7c, 0xd7, 0xa0, 0x32, 0x14, 0xda, 0x9d, 0x7d, 0xa3, 0xb9, 0x63, 0xb6, 0xb5, 0x25, + 0xa4, 0x41, 0xf9, 0xb1, 0xd9, 0xb7, 0x9a, 0x3b, 0xbd, 0xd6, 0xd3, 0xee, 0xc1, 0xae, 0xa6, 0xa0, + 0x6b, 0xa0, 0xcd, 0x10, 0xab, 0x79, 0x64, 0x09, 0x34, 0x87, 0x6e, 0xc2, 0xf5, 0x7d, 0xb3, 0x6f, + 0xed, 0x18, 0x7d, 0x73, 0xbf, 0x6f, 0x75, 0xba, 0xd6, 0xae, 0xd9, 0x37, 0xda, 0x46, 0xdf, 0xd0, + 0xf2, 0xe8, 0x3a, 0xa0, 0xac, 0xaf, 0xd9, 0x6b, 0x1f, 0x69, 0xaa, 0xd0, 0x3e, 0x34, 0x71, 0x67, + 0xbb, 0xd3, 0x32, 0xc4, 0xea, 0xda, 0xf2, 0xc6, 0xf7, 0x0a, 0x94, 0xe6, 0xce, 0x0e, 0x15, 0x61, + 0xd9, 0xdc, 0xdd, 0xeb, 0x1f, 0x25, 0x89, 0x48, 0x8f, 0x58, 0xd2, 0xc0, 0x8f, 0x35, 0x05, 0x5d, + 0x85, 0xb5, 0x04, 0x69, 0x19, 0xdd, 0x5e, 0xb7, 0xd3, 0x32, 0x76, 0xb4, 0x9c, 0xc8, 0x2e, 0x01, + 0xdb, 0x1d, 0x59, 0x92, 0x81, 0x8f, 0xb4, 0x3c, 0x6a, 0xc0, 0x07, 0x97, 0x51, 0xab, 0x87, 0xad, + 0x1e, 0x6e, 0x9b, 0xd8, 0x6c, 0x6b, 0xaa, 0xd8, 0x92, 0xb6, 0xb9, 0x6d, 0x1c, 0xec, 0xf4, 0xb5, + 0x95, 0x66, 0xf3, 0xe7, 0xf3, 0xba, 0xf2, 0xfc, 0xbc, 0xae, 0xbc, 0x38, 0xaf, 0x2b, 0x7f, 0x9e, + 0xd7, 0x95, 0x1f, 0x5f, 0xd6, 0x97, 0x5e, 0xbc, 0xac, 0x2f, 0xfd, 0xf1, 0xb2, 0xbe, 0xf4, 0xcd, + 0xdd, 0x21, 0xe3, 0xa3, 0x78, 0xb0, 0x69, 0xfb, 0xe3, 0xad, 0xcc, 0x6f, 0xf0, 0x69, 0xf2, 0x23, + 0x2c, 0x3e, 0x11, 0xd1, 0x60, 0x45, 0xfe, 0xd7, 0x3e, 0xfc, 0x37, 0x00, 0x00, 0xff, 0xff, 0x4e, + 0x61, 0xe2, 0xf3, 0x2a, 0x0b, 0x00, 0x00, } func (this *ApiCollection) Equal(that interface{}) bool { @@ -1307,6 +1316,9 @@ func (this *Api) Equal(that interface{}) bool { if !this.BlockParsing.Equal(&that1.BlockParsing) { return false } + if this.TimeoutMs != that1.TimeoutMs { + return false + } return true } func (this *ParseDirective) Equal(that interface{}) bool { @@ -1828,6 +1840,11 @@ func (m *Api) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.TimeoutMs != 0 { + i = encodeVarintApiCollection(dAtA, i, uint64(m.TimeoutMs)) + i-- + dAtA[i] = 0x40 + } { size, err := m.BlockParsing.MarshalToSizedBuffer(dAtA[:i]) if err != nil { @@ -2252,6 +2269,9 @@ func (m *Api) Size() (n int) { n += 1 + l + sovApiCollection(uint64(l)) l = m.BlockParsing.Size() n += 1 + l + sovApiCollection(uint64(l)) + if m.TimeoutMs != 0 { + n += 1 + sovApiCollection(uint64(m.TimeoutMs)) + } return n } @@ -3606,6 +3626,25 @@ func (m *Api) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex + case 8: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field TimeoutMs", wireType) + } + m.TimeoutMs = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowApiCollection + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.TimeoutMs |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipApiCollection(dAtA[iNdEx:])